-
Notifications
You must be signed in to change notification settings - Fork 30
/
cluster_client.go
116 lines (92 loc) · 2.91 KB
/
cluster_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package vs
import (
"fmt"
"github.com/chrislusf/vasto/pb"
"github.com/chrislusf/vasto/topology"
"github.com/chrislusf/vasto/topology/clusterlistener"
"sync"
)
// ClusterClient is used to access the keyspace in current data center.
type ClusterClient struct {
keyspace string
ClusterListener *clusterlistener.ClusterListener
WriteConfig
AccessConfig
}
// Clone creates a new instance of ClusterClient, mostly to adjust the write config and access config.
func (c *ClusterClient) Clone() *ClusterClient {
return &ClusterClient{
keyspace: c.keyspace,
ClusterListener: c.ClusterListener,
WriteConfig: c.WriteConfig,
AccessConfig: c.AccessConfig,
}
}
// GetCluster get access to topology.Cluster for cluster topology information.
func (c *ClusterClient) GetCluster() (*topology.Cluster, error) {
cluster, found := c.ClusterListener.GetCluster(c.keyspace)
if !found {
return nil, fmt.Errorf("no keyspace %s", c.keyspace)
}
return cluster, nil
}
// sendRequestsToOneShard send the requests to one partition
// assuming the requests going to the same shard
func (c *ClusterClient) sendRequestsToOneShard(shardId int, requests []*pb.Request) (results []*pb.Response, err error) {
conn, err := c.ClusterListener.GetConnectionByShardId(c.keyspace, shardId, c.Replica)
if err != nil {
return nil, err
}
responses, err := pb.SendRequests(conn, &pb.Requests{
Keyspace: c.keyspace,
Requests: requests,
})
conn.Close()
if err != nil {
return nil, fmt.Errorf("shard %d process error: %v", shardId, err)
}
results = responses.Responses
return
}
// BatchProcess devides requests, groups them by the destination, and sends to the partitions by batch.
// Expert usage expected.
func (c *ClusterClient) BatchProcess(requests []*pb.Request,
processResultFunc func([]*pb.Response, error) error) error {
cluster, err := c.GetCluster()
if err != nil {
return err
}
shardIdToRequests := make(map[uint32][]*pb.Request)
for _, req := range requests {
req.ShardId = uint32(cluster.FindShardId(req.GetPartitionHash()))
shardIdToRequests[req.ShardId] = append(shardIdToRequests[req.ShardId], req)
}
err = mapEachShard(shardIdToRequests, func(shardId uint32, requests []*pb.Request) error {
responses, err := c.sendRequestsToOneShard(int(shardId), requests)
if err != nil {
return fmt.Errorf("shard %d process error: %v", shardId, err)
}
if processResultFunc != nil {
return processResultFunc(responses, err)
}
return nil
})
if err != nil {
return fmt.Errorf("process error: %v", err)
}
return nil
}
func mapEachShard(buckets map[uint32][]*pb.Request, eachFunc func(uint32, []*pb.Request) error) (err error) {
var wg sync.WaitGroup
for shardId, requests := range buckets {
wg.Add(1)
go func(shardId uint32, requests []*pb.Request) {
defer wg.Done()
if eachErr := eachFunc(shardId, requests); eachErr != nil {
err = eachErr
}
}(shardId, requests)
}
wg.Wait()
return
}