/
service.go
97 lines (80 loc) · 2.18 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package syncer
import (
"context"
"errors"
"github.com/KalyCoinProject/kalychain/network/grpc"
"github.com/KalyCoinProject/kalychain/syncer/proto"
"github.com/KalyCoinProject/kalychain/types"
"github.com/golang/protobuf/ptypes/empty"
)
var (
ErrBlockNotFound = errors.New("block not found")
)
type syncPeerService struct {
proto.UnimplementedSyncPeerServer
blockchain Blockchain // reference to the blockchain module
network Network // reference to the network module
stream *grpc.GrpcStream // reference to the grpc stream
}
func NewSyncPeerService(
network Network,
blockchain Blockchain,
) SyncPeerService {
return &syncPeerService{
blockchain: blockchain,
network: network,
}
}
// Start starts syncPeerService
func (s *syncPeerService) Start() {
s.setupGRPCServer()
}
// Close closes syncPeerService
func (s *syncPeerService) Close() error {
return s.stream.Close()
}
// setupGRPCServer setup GRPC server
func (s *syncPeerService) setupGRPCServer() {
s.stream = grpc.NewGrpcStream()
proto.RegisterSyncPeerServer(s.stream.GrpcServer(), s)
s.stream.Serve()
s.network.RegisterProtocol(syncerProto, s.stream)
}
// GetBlocks is a gRPC endpoint to return blocks from the specific height via stream
func (s *syncPeerService) GetBlocks(
req *proto.GetBlocksRequest,
stream proto.SyncPeer_GetBlocksServer,
) error {
// from to latest
for i := req.From; i <= s.blockchain.Header().Number; i++ {
block, ok := s.blockchain.GetBlockByNumber(i, true)
if !ok {
return ErrBlockNotFound
}
resp := toProtoBlock(block)
// if client closes stream, context.Canceled is given
if err := stream.Send(resp); err != nil {
break
}
}
return nil
}
// GetStatus is a gRPC endpoint to return the latest block number as a node status
func (s *syncPeerService) GetStatus(
ctx context.Context,
req *empty.Empty,
) (*proto.SyncPeerStatus, error) {
var number uint64
if header := s.blockchain.Header(); header != nil {
number = header.Number
}
return &proto.SyncPeerStatus{
Number: number,
}, nil
}
// toProtoBlock converts type.Block -> proto.Block
func toProtoBlock(block *types.Block) *proto.Block {
return &proto.Block{
Block: block.MarshalRLP(),
}
}