forked from hyperledger-archives/burrow
/
consumer.go
335 lines (275 loc) · 9.78 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package service
import (
"context"
"fmt"
"io"
"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/execution/evm/abi"
"github.com/hyperledger/burrow/rpc/rpcevents"
"github.com/hyperledger/burrow/rpc/rpcquery"
"github.com/hyperledger/burrow/vent/config"
"github.com/hyperledger/burrow/vent/logger"
"github.com/hyperledger/burrow/vent/sqldb"
"github.com/hyperledger/burrow/vent/sqlsol"
"github.com/hyperledger/burrow/vent/types"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
// Consumer contains basic configuration for consumer to run
type Consumer struct {
Config *config.VentConfig
Log *logger.Logger
Closing bool
DB *sqldb.SQLDB
GRPCConnection *grpc.ClientConn
// external events channel used for when vent is leveraged as a library
EventsChannel chan types.EventData
}
// NewConsumer constructs a new consumer configuration.
// The event channel will be passed a collection of rows generated from all of the events in a single block
// It will be closed by the consumer when it is finished
func NewConsumer(cfg *config.VentConfig, log *logger.Logger, eventChannel chan types.EventData) *Consumer {
return &Consumer{
Config: cfg,
Log: log,
Closing: false,
EventsChannel: eventChannel,
}
}
// Run connects to a grpc service and subscribes to log events,
// then gets tables structures, maps them & parse event data.
// Store data in SQL event tables, it runs forever
func (c *Consumer) Run(projection *sqlsol.Projection, abiSpec *abi.AbiSpec, stream bool) error {
var err error
c.Log.Info("msg", "Connecting to Burrow gRPC server")
c.GRPCConnection, err = grpc.Dial(c.Config.GRPCAddr, grpc.WithInsecure())
if err != nil {
return errors.Wrapf(err, "Error connecting to Burrow gRPC server at %s", c.Config.GRPCAddr)
}
defer c.GRPCConnection.Close()
defer close(c.EventsChannel)
// get the chain ID to compare with the one stored in the db
qCli := rpcquery.NewQueryClient(c.GRPCConnection)
chainStatus, err := qCli.Status(context.Background(), &rpcquery.StatusParam{})
if err != nil {
return errors.Wrapf(err, "Error getting chain status")
}
if len(projection.EventSpec) == 0 {
c.Log.Info("msg", "No events specifications found")
return nil
}
c.Log.Info("msg", "Connecting to SQL database")
connection := types.SQLConnection{
DBAdapter: c.Config.DBAdapter,
DBURL: c.Config.DBURL,
DBSchema: c.Config.DBSchema,
Log: c.Log,
ChainID: chainStatus.ChainID,
BurrowVersion: chainStatus.BurrowVersion,
}
c.DB, err = sqldb.NewSQLDB(connection)
if err != nil {
return fmt.Errorf("error connecting to SQL database: %v", err)
}
defer c.DB.Close()
c.Log.Info("msg", "Synchronizing config and database projection structures")
err = c.DB.SynchronizeDB(projection.Tables)
if err != nil {
return errors.Wrap(err, "Error trying to synchronize database")
}
// doneCh is used for sending a "done" signal from each goroutine to the main thread
// eventCh is used for sending received events to the main thread to be stored in the db
doneCh := make(chan struct{})
errCh := make(chan error, 1)
eventCh := make(chan types.EventData)
go func() {
defer func() {
close(doneCh)
}()
c.Log.Info("msg", "Getting last processed block number from SQL log table")
// NOTE [Silas]: I am preserving the comment below that dates from the early days of Vent. I have looked at the
// bosmarmot git history and I cannot see why the original author thought that it was the case that there was
// no way of knowing if the last block of events was committed since the block and its associated log is
// committed atomically in a transaction and this is a core part of he design of Vent - in order that it does not
// repeat
// [ORIGINAL COMMENT]
// right now there is no way to know if the last block of events was completely read
// so we have to begin processing from the last block number stored in database
// and update event data if already present
fromBlock, err := c.DB.GetLastBlockHeight()
if err != nil {
errCh <- errors.Wrapf(err, "Error trying to get last processed block number from SQL log table")
return
}
startingBlock := fromBlock
// Start the block after the last one successfully committed - apart from if this is the first block
// We include block 0 because it is where we currently place dump/restored transactions
if startingBlock > 0 {
startingBlock++
}
// setup block range to get needed blocks server side
cli := rpcevents.NewExecutionEventsClient(c.GRPCConnection)
var end *rpcevents.Bound
if stream {
end = rpcevents.StreamBound()
} else {
end = rpcevents.LatestBound()
}
request := &rpcevents.BlocksRequest{
BlockRange: rpcevents.NewBlockRange(rpcevents.AbsoluteBound(startingBlock), end),
}
// gets blocks in given range based on last processed block taken from database
stream, err := cli.Stream(context.Background(), request)
if err != nil {
errCh <- errors.Wrapf(err, "Error connecting to block stream")
return
}
// get blocks
c.Log.Debug("msg", "Waiting for blocks...")
err = rpcevents.ConsumeBlockExecutions(stream, c.makeBlockConsumer(projection, abiSpec, eventCh))
if err != nil {
if err == io.EOF {
c.Log.Info("msg", "EOF stream received...")
} else {
if c.Closing {
c.Log.Debug("msg", "GRPC connection closed")
} else {
errCh <- errors.Wrapf(err, "Error receiving blocks")
return
}
}
}
}()
for {
select {
// Process block events
case blk := <-eventCh:
err := c.commitBlock(projection, blk)
if err != nil {
c.Log.Info("msg", "error committing block", "err", err)
return err
}
// Await completion
case <-doneCh:
select {
// Select possible error
case err := <-errCh:
c.Log.Info("msg", "finished with error", "err", err)
return err
// Or fallback to success
default:
c.Log.Info("msg", "finished successfully")
return nil
}
}
}
}
func (c *Consumer) makeBlockConsumer(projection *sqlsol.Projection, abiSpec *abi.AbiSpec,
eventCh chan<- types.EventData) func(blockExecution *exec.BlockExecution) error {
return func(blockExecution *exec.BlockExecution) error {
if c.Closing {
return io.EOF
}
c.Log.Debug("msg", "Block received", "height", blockExecution.Height, "num_txs", len(blockExecution.TxExecutions))
// set new block number
fromBlock := blockExecution.Height
// create a fresh new structure to store block data at this height
blockData := sqlsol.NewBlockData(fromBlock)
if c.Config.DBBlockTx {
blkRawData, err := buildBlkData(projection.Tables, blockExecution)
if err != nil {
return errors.Wrapf(err, "Error building block raw data")
}
// set row in structure
blockData.AddRow(types.SQLBlockTableName, blkRawData)
}
// get transactions for a given block
for _, txe := range blockExecution.TxExecutions {
c.Log.Debug("msg", "Getting transaction", "TxHash", txe.TxHash, "num_events", len(txe.Events))
if c.Config.DBBlockTx {
txRawData, err := buildTxData(txe)
if err != nil {
return errors.Wrapf(err, "Error building tx raw data")
}
// set row in structure
blockData.AddRow(types.SQLTxTableName, txRawData)
}
// reverted transactions don't have to update event data tables
// so check that condition to filter them
if txe.Exception == nil {
// get events for a given transaction
for _, event := range txe.Events {
taggedEvent := event.Tagged()
// see which spec filter matches with the one in event data
for _, eventClass := range projection.EventSpec {
qry, err := eventClass.Query()
if err != nil {
return errors.Wrapf(err, "Error parsing query from filter string")
}
// there's a matching filter, add data to the rows
if qry.Matches(taggedEvent) {
c.Log.Info("msg", fmt.Sprintf("Matched event header: %v", event.Header),
"filter", eventClass.Filter)
// unpack, decode & build event data
eventData, err := buildEventData(projection, eventClass, event, abiSpec, c.Log)
if err != nil {
return errors.Wrapf(err, "Error building event data")
}
// set row in structure
blockData.AddRow(eventClass.TableName, eventData)
}
}
}
}
}
// upsert rows in specific SQL event tables and update block number
// store block data in SQL tables (if any)
if blockData.PendingRows(fromBlock) {
// gets block data to upsert
blk := blockData.Data
c.Log.Info("msg", fmt.Sprintf("Upserting rows in SQL tables %v", blk), "block", fromBlock)
eventCh <- blk
}
return nil
}
}
func (c *Consumer) commitBlock(projection *sqlsol.Projection, blockEvents types.EventData) error {
// upsert rows in specific SQL event tables and update block number
if err := c.DB.SetBlock(projection.Tables, blockEvents); err != nil {
return fmt.Errorf("error upserting rows in database: %v", err)
}
// send to the external events channel in a non-blocking manner
select {
case c.EventsChannel <- blockEvents:
default:
}
return nil
}
// Health returns the health status for the consumer
func (c *Consumer) Health() error {
if c.Closing {
return errors.New("closing service")
}
// check db status
if c.DB == nil {
return errors.New("database disconnected")
}
if err := c.DB.Ping(); err != nil {
return errors.New("database unavailable")
}
// check grpc connection status
if c.GRPCConnection == nil {
return errors.New("grpc disconnected")
}
if grpcState := c.GRPCConnection.GetState(); grpcState != connectivity.Ready {
return errors.New("grpc connection not ready")
}
return nil
}
// Shutdown gracefully shuts down the events consumer
func (c *Consumer) Shutdown() {
c.Log.Info("msg", "Shutting down vent consumer...")
c.Closing = true
c.GRPCConnection.Close()
}