-
Notifications
You must be signed in to change notification settings - Fork 651
/
shared_memory_client.go
102 lines (90 loc) · 2.48 KB
/
shared_memory_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package gsharedmemory
import (
"context"
"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
sharedmemorypb "github.com/ava-labs/avalanchego/proto/pb/sharedmemory"
)
var _ atomic.SharedMemory = (*Client)(nil)
// Client is atomic.SharedMemory that talks over RPC.
type Client struct {
client sharedmemorypb.SharedMemoryClient
}
// NewClient returns shared memory connected to remote shared memory
func NewClient(client sharedmemorypb.SharedMemoryClient) *Client {
return &Client{client: client}
}
func (c *Client) Get(peerChainID ids.ID, keys [][]byte) ([][]byte, error) {
resp, err := c.client.Get(context.Background(), &sharedmemorypb.GetRequest{
PeerChainId: peerChainID[:],
Keys: keys,
})
if err != nil {
return nil, err
}
return resp.Values, nil
}
func (c *Client) Indexed(
peerChainID ids.ID,
traits [][]byte,
startTrait,
startKey []byte,
limit int,
) (
[][]byte,
[]byte,
[]byte,
error,
) {
resp, err := c.client.Indexed(context.Background(), &sharedmemorypb.IndexedRequest{
PeerChainId: peerChainID[:],
Traits: traits,
StartTrait: startTrait,
StartKey: startKey,
Limit: int32(limit),
})
if err != nil {
return nil, nil, nil, err
}
return resp.Values, resp.LastTrait, resp.LastKey, nil
}
func (c *Client) Apply(requests map[ids.ID]*atomic.Requests, batches ...database.Batch) error {
req := &sharedmemorypb.ApplyRequest{
Requests: make([]*sharedmemorypb.AtomicRequest, 0, len(requests)),
Batches: make([]*sharedmemorypb.Batch, len(batches)),
}
for key, value := range requests {
key := key
chainReq := &sharedmemorypb.AtomicRequest{
RemoveRequests: value.RemoveRequests,
PutRequests: make([]*sharedmemorypb.Element, len(value.PutRequests)),
PeerChainId: key[:],
}
for i, v := range value.PutRequests {
chainReq.PutRequests[i] = &sharedmemorypb.Element{
Key: v.Key,
Value: v.Value,
Traits: v.Traits,
}
}
req.Requests = append(req.Requests, chainReq)
}
for i, batch := range batches {
batch := batch.Inner()
fb := filteredBatch{
writes: make(map[string][]byte),
}
if err := batch.Replay(&fb); err != nil {
return err
}
req.Batches[i] = &sharedmemorypb.Batch{
Puts: fb.PutRequests(),
Deletes: fb.DeleteRequests(),
}
}
_, err := c.client.Apply(context.Background(), req)
return err
}