forked from ava-labs/avalanchego
-
Notifications
You must be signed in to change notification settings - Fork 3
/
shared_memory_server.go
112 lines (98 loc) · 2.64 KB
/
shared_memory_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package gsharedmemory
import (
"context"
"github.com/MetalBlockchain/metalgo/chains/atomic"
"github.com/MetalBlockchain/metalgo/database"
"github.com/MetalBlockchain/metalgo/ids"
sharedmemorypb "github.com/MetalBlockchain/metalgo/proto/pb/sharedmemory"
)
var _ sharedmemorypb.SharedMemoryServer = (*Server)(nil)
// Server is shared memory that is managed over RPC.
type Server struct {
sharedmemorypb.UnsafeSharedMemoryServer
sm atomic.SharedMemory
db database.Database
}
// NewServer returns shared memory connected to remote shared memory
func NewServer(sm atomic.SharedMemory, db database.Database) *Server {
return &Server{
sm: sm,
db: db,
}
}
func (s *Server) Get(
_ context.Context,
req *sharedmemorypb.GetRequest,
) (*sharedmemorypb.GetResponse, error) {
peerChainID, err := ids.ToID(req.PeerChainId)
if err != nil {
return nil, err
}
values, err := s.sm.Get(peerChainID, req.Keys)
return &sharedmemorypb.GetResponse{
Values: values,
}, err
}
func (s *Server) Indexed(
_ context.Context,
req *sharedmemorypb.IndexedRequest,
) (*sharedmemorypb.IndexedResponse, error) {
peerChainID, err := ids.ToID(req.PeerChainId)
if err != nil {
return nil, err
}
values, lastTrait, lastKey, err := s.sm.Indexed(
peerChainID,
req.Traits,
req.StartTrait,
req.StartKey,
int(req.Limit),
)
return &sharedmemorypb.IndexedResponse{
Values: values,
LastTrait: lastTrait,
LastKey: lastKey,
}, err
}
func (s *Server) Apply(
_ context.Context,
req *sharedmemorypb.ApplyRequest,
) (*sharedmemorypb.ApplyResponse, error) {
requests := make(map[ids.ID]*atomic.Requests, len(req.Requests))
for _, request := range req.Requests {
peerChainID, err := ids.ToID(request.PeerChainId)
if err != nil {
return nil, err
}
r := &atomic.Requests{
RemoveRequests: request.RemoveRequests,
PutRequests: make([]*atomic.Element, len(request.PutRequests)),
}
for i, put := range request.PutRequests {
r.PutRequests[i] = &atomic.Element{
Key: put.Key,
Value: put.Value,
Traits: put.Traits,
}
}
requests[peerChainID] = r
}
batches := make([]database.Batch, len(req.Batches))
for i, reqBatch := range req.Batches {
batch := s.db.NewBatch()
for _, putReq := range reqBatch.Puts {
if err := batch.Put(putReq.Key, putReq.Value); err != nil {
return nil, err
}
}
for _, deleteReq := range reqBatch.Deletes {
if err := batch.Delete(deleteReq.Key); err != nil {
return nil, err
}
}
batches[i] = batch
}
return &sharedmemorypb.ApplyResponse{}, s.sm.Apply(requests, batches...)
}