-
Notifications
You must be signed in to change notification settings - Fork 13
/
nodediff.go
98 lines (88 loc) · 2.44 KB
/
nodediff.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package nodesync
import (
"context"
"github.com/anyproto/any-sync/app/ldiff"
"golang.org/x/exp/slices"
"github.com/anyproto/any-sync-node/nodehead"
"github.com/anyproto/any-sync-node/nodesync/nodesyncproto"
)
type nodeRemoteDiff struct {
partId int
cl nodesyncproto.DRPCNodeSyncClient
}
func (n nodeRemoteDiff) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) {
protoRanges := make([]*nodesyncproto.PartitionSyncRange, len(ranges))
for i, r := range ranges {
protoRanges[i] = &nodesyncproto.PartitionSyncRange{
From: r.From,
To: r.To,
Elements: r.Elements,
}
}
req := &nodesyncproto.PartitionSyncRequest{
PartitionId: uint64(n.partId),
Ranges: protoRanges,
}
resp, err := n.cl.PartitionSync(ctx, req)
if err != nil {
return nil, err
}
results = slices.Grow(resBuf, len(resp.Results))[0:len(resp.Results)]
for i, res := range resp.Results {
var elements []ldiff.Element
if len(res.Elements) > 0 {
elements = make([]ldiff.Element, len(res.Elements))
for j, el := range res.Elements {
elements[j] = ldiff.Element{
Id: el.Id,
Head: el.Head,
}
}
}
results[i] = ldiff.RangeResult{
Hash: res.Hash,
Elements: elements,
Count: int(res.Count),
}
}
return
}
type nodeRemoteDiffHandler struct {
nodehead nodehead.NodeHead
}
func (n *nodeRemoteDiffHandler) PartitionSync(ctx context.Context, req *nodesyncproto.PartitionSyncRequest) (*nodesyncproto.PartitionSyncResponse, error) {
ld := n.nodehead.LDiff(int(req.PartitionId))
var ranges = make([]ldiff.Range, len(req.Ranges))
for i, r := range req.Ranges {
ranges[i] = ldiff.Range{
From: r.From,
To: r.To,
Elements: r.Elements,
}
}
res, err := ld.Ranges(ctx, ranges, nil)
if err != nil {
return nil, err
}
protoResults := make([]*nodesyncproto.PartitionSyncResult, len(res))
for i, r := range res {
var elements []*nodesyncproto.PartitionSyncResultElement
if len(r.Elements) > 0 {
elements = make([]*nodesyncproto.PartitionSyncResultElement, len(r.Elements))
for j, el := range r.Elements {
elements[j] = &nodesyncproto.PartitionSyncResultElement{
Id: el.Id,
Head: el.Head,
}
}
}
protoResults[i] = &nodesyncproto.PartitionSyncResult{
Hash: r.Hash,
Elements: elements,
Count: uint32(r.Count),
}
}
return &nodesyncproto.PartitionSyncResponse{
Results: protoResults,
}, nil
}