-
Notifications
You must be signed in to change notification settings - Fork 377
/
block_service.go
152 lines (125 loc) · 4.27 KB
/
block_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package client
import (
"context"
"fmt"
"github.com/cosmos/gogoproto/grpc"
blocksvc "github.com/cometbft/cometbft/api/cometbft/services/block/v1"
cmtproto "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cometbft/cometbft/types"
)
// Block data returned by the CometBFT BlockService gRPC API.
type Block struct {
BlockID *types.BlockID `json:"block_id"`
Block *types.Block `json:"block"`
}
func blockFromProto(pblockID *cmtproto.BlockID, pblock *cmtproto.Block) (*Block, error) {
blockID, err := types.BlockIDFromProto(pblockID)
if err != nil {
return nil, err
}
block, err := types.BlockFromProto(pblock)
if err != nil {
return nil, err
}
return &Block{
BlockID: blockID,
Block: block,
}, nil
}
// LatestHeightResult type used in GetLatestResult and send to the client
// via a channel.
type LatestHeightResult struct {
Height int64
Error error
}
type getLatestHeightConfig struct {
chSize uint
}
type GetLatestHeightOption func(*getLatestHeightConfig)
// GetLatestHeightChannelSize allows control over the channel size. If not used
// or the channel size is set to 0, an unbuffered channel will be created.
func GetLatestHeightChannelSize(sz uint) GetLatestHeightOption {
return func(opts *getLatestHeightConfig) {
opts.chSize = sz
}
}
// BlockServiceClient provides block information.
type BlockServiceClient interface {
// GetBlockByHeight attempts to retrieve the block associated with the
// given height.
GetBlockByHeight(ctx context.Context, height int64) (*Block, error)
// GetLatestHeight provides sends the latest committed block height to the
// resulting output channel as blocks are committed.
GetLatestHeight(ctx context.Context, opts ...GetLatestHeightOption) (<-chan LatestHeightResult, error)
}
type blockServiceClient struct {
client blocksvc.BlockServiceClient
}
func newBlockServiceClient(conn grpc.ClientConn) BlockServiceClient {
return &blockServiceClient{
client: blocksvc.NewBlockServiceClient(conn),
}
}
// GetBlockByHeight implements BlockServiceClient GetBlockByHeight.
func (c *blockServiceClient) GetBlockByHeight(ctx context.Context, height int64) (*Block, error) {
res, err := c.client.GetByHeight(ctx, &blocksvc.GetByHeightRequest{
Height: height,
})
if err != nil {
return nil, err
}
return blockFromProto(res.BlockId, res.Block)
}
// GetLatestHeight implements BlockServiceClient GetLatestHeight.
func (c *blockServiceClient) GetLatestHeight(ctx context.Context, opts ...GetLatestHeightOption) (<-chan LatestHeightResult, error) {
req := blocksvc.GetLatestHeightRequest{}
latestHeightClient, err := c.client.GetLatestHeight(ctx, &req)
if err != nil {
return nil, fmt.Errorf("error getting a stream for the latest height: %w", err)
}
cfg := &getLatestHeightConfig{}
for _, opt := range opts {
opt(cfg)
}
resultCh := make(chan LatestHeightResult, cfg.chSize)
go func(client blocksvc.BlockService_GetLatestHeightClient) {
defer close(resultCh)
for {
response, err := client.Recv()
if err != nil {
res := LatestHeightResult{Error: fmt.Errorf("error receiving the latest height from a stream: %w", err)}
select {
case <-ctx.Done():
case resultCh <- res:
}
return
}
res := LatestHeightResult{Height: response.Height}
select {
case <-ctx.Done():
return
case resultCh <- res:
default:
// Skip sending this result because the channel is full - the
// client will get the next one once the channel opens up again
}
}
}(latestHeightClient)
return resultCh, nil
}
type disabledBlockServiceClient struct{}
func newDisabledBlockServiceClient() BlockServiceClient {
return &disabledBlockServiceClient{}
}
// GetBlockByHeight implements BlockServiceClient GetBlockByHeight - disabled client.
func (*disabledBlockServiceClient) GetBlockByHeight(context.Context, int64) (*Block, error) {
panic("block service client is disabled")
}
// GetLatestBlock implements BlockServiceClient.
func (*disabledBlockServiceClient) GetLatestBlock(context.Context) (*Block, error) {
panic("block service client is disabled")
}
// GetLatestHeight implements BlockServiceClient GetLatestHeight - disabled client.
func (*disabledBlockServiceClient) GetLatestHeight(context.Context, ...GetLatestHeightOption) (<-chan LatestHeightResult, error) {
panic("block service client is disabled")
}