-
Notifications
You must be signed in to change notification settings - Fork 30
/
store_grpc_server_binlog.go
122 lines (91 loc) · 3.77 KB
/
store_grpc_server_binlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package store
import (
"fmt"
"github.com/chrislusf/glog"
"github.com/chrislusf/vasto/pb"
"github.com/dgryski/go-jump"
"golang.org/x/net/context"
"io"
"time"
)
// TailBinlog sends all data if PullUpdateRequest's TargetClusterSize==0,
// or sends all data belong to TargetShardId in cluster of TargetClusterSize
func (ss *storeServer) TailBinlog(request *pb.PullUpdateRequest, stream pb.VastoStore_TailBinlogServer) error {
glog.V(1).Infof("TailBinlog %v", request)
shard, found := ss.keyspaceShards.getShard(request.Keyspace, VastoShardId(request.ShardId))
if !found || shard.isShutdown {
return fmt.Errorf("shard: %s.%d not found", request.Keyspace, request.ShardId)
}
segment := uint32(request.Segment)
offset := int64(request.Offset)
limit := int(request.Limit)
// println("TailBinlog server, segment", segment, "offset", offset, "limit", limit)
if !shard.lm.HasSegment(segment) {
t := &pb.PullUpdateResponse{
OutOfSync: true,
}
if err := stream.Send(t); err != nil {
return err
}
start, stop := shard.lm.GetSegmentRange()
return fmt.Errorf("out of sync client reads segment %d offset %d, only has segment [%d,%d]",
segment, offset, start, stop)
}
targetShardId := int32(request.TargetShardId)
targetClusterSize := int(request.TargetClusterSize)
if targetClusterSize > 0 && targetShardId != int32(request.ShardId) {
limit *= targetClusterSize
}
defer func() {
glog.V(1).Infof("TailBinlog completed shard %v for %v", shard.String(), request.Origin)
}()
for {
// println("TailBinlog server reading entries, segment", segment, "offset", offset, "limit", limit)
// glog.V(2).Infof("TailBinlog shard %v %v read entries %d:%d", shard.String(), request.Origin, segment, offset)
entries, nextOffset, err := shard.lm.ReadEntries(segment, offset, limit)
if err == io.EOF {
segment += 1
} else if err != nil {
return fmt.Errorf("failed to read segment %d offset %d: %v", segment, offset, err)
} else if len(entries) <= 100 {
time.Sleep(100 * time.Millisecond)
entries, nextOffset, err = shard.lm.ReadEntries(segment, offset, limit)
if err == io.EOF {
segment += 1
} else if err != nil {
return fmt.Errorf("failed to read segment %d offset %d: %v", segment, offset, err)
}
}
// glog.V(2).Infof("shard %v read for %v: %d, @ %d:%d, next %d", shard.String(), request.Origin, len(entries), segment, offset, nextOffset)
t := &pb.PullUpdateResponse{
NextSegment: segment,
NextOffset: uint64(nextOffset),
}
for _, entry := range entries {
// glog.V(2).Infof("shard %v send0 %v: %v offset:%d", shard.String(), request.Origin, string(entry.Key), offset)
if targetClusterSize > 0 && jump.Hash(entry.GetPartitionHash(), targetClusterSize) != targetShardId {
// glog.V(2).Infof("shard %v send %v skipped: %v, hash:%v, targetClusterSize:%d, targetShardId:%d ", shard.String(), request.Origin, string(entry.Key), entry.PartitionHash, targetClusterSize, targetShardId)
continue
}
// glog.V(2).Infof("shard %v send %v: %v", shard.String(), request.Origin, string(entry.Key))
t.Entries = append(t.Entries, entry)
}
if err := stream.Send(t); err != nil {
glog.Errorf("TailBinlog shard %v send %v: %v", shard.String(), request.Origin, err)
return err
}
offset = nextOffset
}
}
func (ss *storeServer) CheckBinlog(ctx context.Context, request *pb.CheckBinlogRequest) (*pb.CheckBinlogResponse, error) {
node, found := ss.keyspaceShards.getShard(request.Keyspace, VastoShardId(request.ShardId))
if !found {
return nil, fmt.Errorf("checkbinlog: %s shard %d not found", request.Keyspace, request.ShardId)
}
earliestSegment, latestSegment := node.lm.GetSegmentRange()
return &pb.CheckBinlogResponse{
ShardId: request.ShardId,
EarliestSegment: earliestSegment,
LatestSegment: latestSegment,
}, nil
}