This repository has been archived by the owner on May 13, 2022. It is now read-only.
/
consumer.go
323 lines (275 loc) · 9.2 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
package service
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/hyperledger/burrow/rpc"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/rpc/rpcevents"
"github.com/hyperledger/burrow/rpc/rpcquery"
"github.com/hyperledger/burrow/vent/config"
"github.com/hyperledger/burrow/vent/sqldb"
"github.com/hyperledger/burrow/vent/sqlsol"
"github.com/hyperledger/burrow/vent/types"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
// Consumer contains basic configuration for consumer to run
type Consumer struct {
Config *config.VentConfig
Logger *logging.Logger
DB *sqldb.SQLDB
GRPCConnection *grpc.ClientConn
// external events channel used for when vent is leveraged as a library
EventsChannel chan types.EventData
Done chan struct{}
shutdownOnce sync.Once
Status
}
// Status announcement
type Status struct {
LastProcessedHeight uint64
Burrow *rpc.ResultStatus
}
// NewConsumer constructs a new consumer configuration.
// The event channel will be passed a collection of rows generated from all of the events in a single block
// It will be closed by the consumer when it is finished
func NewConsumer(cfg *config.VentConfig, log *logging.Logger, eventChannel chan types.EventData) *Consumer {
return &Consumer{
Config: cfg,
Logger: log,
EventsChannel: eventChannel,
Done: make(chan struct{}),
}
}
// Run connects to a grpc service and subscribes to log events,
// then gets tables structures, maps them & parse event data.
// Store data in SQL event tables, it runs forever
func (c *Consumer) Run(projection *sqlsol.Projection, stream bool) error {
var err error
c.Logger.InfoMsg("Connecting to Burrow gRPC server")
c.GRPCConnection, err = grpc.Dial(c.Config.GRPCAddr, grpc.WithInsecure())
if err != nil {
return errors.Wrapf(err, "Error connecting to Burrow gRPC server at %s", c.Config.GRPCAddr)
}
defer c.GRPCConnection.Close()
defer close(c.EventsChannel)
// get the chain ID to compare with the one stored in the db
qCli := rpcquery.NewQueryClient(c.GRPCConnection)
c.Status.Burrow, err = qCli.Status(context.Background(), &rpcquery.StatusParam{})
if err != nil {
return errors.Wrapf(err, "Error getting chain status")
}
abiProvider, err := NewAbiProvider(c.Config.AbiFileOrDirs, rpcquery.NewQueryClient(c.GRPCConnection), c.Logger)
if err != nil {
return errors.Wrapf(err, "Error loading ABIs")
}
if len(projection.Spec) == 0 {
c.Logger.InfoMsg("No events specifications found")
return nil
}
c.Logger.InfoMsg("Connecting to SQL database")
connection := types.SQLConnection{
DBAdapter: c.Config.DBAdapter,
DBURL: c.Config.DBURL,
DBSchema: c.Config.DBSchema,
Log: c.Logger,
}
c.DB, err = sqldb.NewSQLDB(connection)
if err != nil {
return fmt.Errorf("error connecting to SQL database: %v", err)
}
defer c.DB.Close()
err = c.DB.Init(c.Burrow.ChainID, c.Burrow.BurrowVersion)
if err != nil {
return fmt.Errorf("could not clean tables after ChainID change: %v", err)
}
c.Logger.InfoMsg("Synchronizing config and database projection structures")
err = c.DB.SynchronizeDB(c.Burrow.ChainID, projection.Tables)
if err != nil {
return errors.Wrap(err, "Error trying to synchronize database")
}
// doneCh is used for sending a "done" signal from each goroutine to the main thread
// eventCh is used for sending received events to the main thread to be stored in the db
errCh := make(chan error, 1)
eventCh := make(chan types.EventData)
go func() {
defer func() {
c.Shutdown()
}()
go c.announceEvery(c.Done)
c.Logger.InfoMsg("Getting last processed block number from SQL log table")
// NOTE [Silas]: I am preserving the comment below that dates from the early days of Vent. I have looked at the
// bosmarmot git history and I cannot see why the original author thought that it was the case that there was
// no way of knowing if the last block of events was committed since the block and its associated log is
// committed atomically in a transaction and this is a core part of he design of Vent - in order that it does not
// repeat
// [ORIGINAL COMMENT]
// right now there is no way to know if the last block of events was completely read
// so we have to begin processing from the last block number stored in database
// and update event data if already present
fromBlock, err := c.DB.LastBlockHeight(c.Burrow.ChainID)
if err != nil {
errCh <- errors.Wrapf(err, "Error trying to get last processed block number")
return
}
startingBlock := fromBlock
// Start the block after the last one successfully committed - apart from if this is the first block
// We include block 0 because it is where we currently place dump/restored transactions
if startingBlock > 0 {
startingBlock++
}
// setup block range to get needed blocks server side
cli := rpcevents.NewExecutionEventsClient(c.GRPCConnection)
var end *rpcevents.Bound
if stream {
end = rpcevents.StreamBound()
} else {
end = rpcevents.LatestBound()
}
request := &rpcevents.BlocksRequest{
BlockRange: rpcevents.NewBlockRange(rpcevents.AbsoluteBound(startingBlock), end),
}
// gets blocks in given range based on last processed block taken from database
stream, err := cli.Stream(context.Background(), request)
if err != nil {
errCh <- errors.Wrapf(err, "Error connecting to block stream")
return
}
// get blocks
c.Logger.TraceMsg("Waiting for blocks...")
err = rpcevents.ConsumeBlockExecutions(stream,
NewBlockConsumer(projection, c.Config.SpecOpt, abiProvider.GetEventAbi, eventCh, c.Done, c.Logger))
if err != nil {
if err == io.EOF {
c.Logger.InfoMsg("EOF stream received...")
} else {
if finished(c.Done) {
c.Logger.TraceMsg("GRPC connection closed")
} else {
errCh <- errors.Wrapf(err, "Error receiving blocks")
return
}
}
}
}()
for {
select {
// Process block events
case blk := <-eventCh:
c.Status.LastProcessedHeight = blk.BlockHeight
err := c.commitBlock(projection, blk)
if err != nil {
c.Logger.InfoMsg("error committing block", "err", err)
return err
}
// Await completion
case <-c.Done:
select {
// Select possible error
case err := <-errCh:
c.Logger.InfoMsg("finished with error", "err", err)
return err
// Or fallback to success
default:
c.Logger.InfoMsg("finished successfully")
return nil
}
}
}
}
func (c *Consumer) commitBlock(projection *sqlsol.Projection, blockEvents types.EventData) error {
// upsert rows in specific SQL event tables and update block number
if err := c.DB.SetBlock(c.Burrow.ChainID, projection.Tables, blockEvents); err != nil {
return fmt.Errorf("error upserting rows in database: %v", err)
}
// send to the external events channel in a non-blocking manner
select {
case c.EventsChannel <- blockEvents:
default:
}
return nil
}
// Health returns the health status for the consumer
func (c *Consumer) Health() error {
if finished(c.Done) {
return errors.New("closing service")
}
// check db status
if c.DB == nil {
return errors.New("database disconnected")
}
if err := c.DB.Ping(); err != nil {
return errors.New("database unavailable")
}
// check grpc connection status
if c.GRPCConnection == nil {
return errors.New("grpc disconnected")
}
if grpcState := c.GRPCConnection.GetState(); grpcState != connectivity.Ready {
return errors.New("grpc connection not ready")
}
return nil
}
// Shutdown gracefully shuts down the events consumer
func (c *Consumer) Shutdown() {
c.shutdownOnce.Do(func() {
c.Logger.InfoMsg("Shutting down vent consumer...")
close(c.Done)
c.GRPCConnection.Close()
})
}
func (c *Consumer) updateStatus(qcli rpcquery.QueryClient) {
stat, err := qcli.Status(context.Background(), &rpcquery.StatusParam{})
if err != nil {
c.Logger.InfoMsg("could not get blockchain status", "err", err)
return
}
c.Status.Burrow = stat
}
func (c *Consumer) statusMessage() []interface{} {
var catchUpRatio float64
if c.Burrow.SyncInfo.LatestBlockHeight > 0 {
catchUpRatio = float64(c.LastProcessedHeight) / float64(c.Burrow.SyncInfo.LatestBlockHeight)
}
return []interface{}{
"msg", "status",
"last_processed_height", c.LastProcessedHeight,
"fraction_caught_up", catchUpRatio,
"burrow_latest_block_height", c.Burrow.SyncInfo.LatestBlockHeight,
"burrow_latest_block_duration", c.Burrow.SyncInfo.LatestBlockDuration,
"burrow_latest_block_hash", c.Burrow.SyncInfo.LatestBlockHash,
"burrow_latest_app_hash", c.Burrow.SyncInfo.LatestAppHash,
"burrow_latest_block_time", c.Burrow.SyncInfo.LatestBlockTime,
"burrow_latest_block_seen_time", c.Burrow.SyncInfo.LatestBlockSeenTime,
"burrow_node_info", c.Burrow.NodeInfo,
"burrow_catching_up", c.Burrow.CatchingUp,
}
}
func (c *Consumer) announceEvery(doneCh <-chan struct{}) {
if c.Config.AnnounceEvery != 0 {
qcli := rpcquery.NewQueryClient(c.GRPCConnection)
ticker := time.NewTicker(c.Config.AnnounceEvery)
for {
select {
case <-ticker.C:
c.updateStatus(qcli)
c.Logger.InfoMsg("Announcement", c.statusMessage()...)
case <-doneCh:
ticker.Stop()
return
}
}
}
}
func finished(doneCh chan struct{}) bool {
select {
case <-doneCh:
return true
default:
return false
}
}