forked from hyperledger-archives/burrow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
block_consumer.go
129 lines (107 loc) · 4 KB
/
block_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package service
import (
"io"
"github.com/hyperledger/burrow/event/query"
"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/logging/structure"
"github.com/hyperledger/burrow/vent/chain"
"github.com/hyperledger/burrow/vent/sqlsol"
"github.com/hyperledger/burrow/vent/types"
"github.com/pkg/errors"
)
func NewBlockConsumer(chainID string, projection *sqlsol.Projection, opt sqlsol.SpecOpt, getEventSpec EventSpecGetter,
eventCh chan<- types.EventData, doneCh chan struct{}, logger *logging.Logger) func(block chain.Block) error {
logger = logger.WithScope("makeBlockConsumer")
var blockHeight uint64
return func(block chain.Block) error {
if finished(doneCh) {
return io.EOF
}
// set new block number
blockHeight = block.GetHeight()
txs := block.GetTxs()
logger.TraceMsg("Block received",
"height", blockHeight,
"num_txs", len(txs))
// create a fresh new structure to store block data at this height
blockData := sqlsol.NewBlockData(blockHeight)
if opt.Enabled(sqlsol.Block) {
blkRawData, err := buildBlkData(projection.Tables, block)
if err != nil {
return errors.Wrapf(err, "Error building block raw data")
}
// set row in structure
blockData.AddRow(tables.Block, blkRawData)
}
for _, txe := range txs {
events := txe.GetEvents()
logger.TraceMsg("Getting transaction", "TxHash", txe.GetHash(), "num_events", len(events))
if opt.Enabled(sqlsol.Tx) {
txRawData, err := buildTxData(txe)
if err != nil {
return errors.Wrapf(err, "Error building tx raw data")
}
// set row in structure
blockData.AddRow(tables.Tx, txRawData)
}
// reverted transactions don't have to update event data tables
// so check that condition to filter them
if txe.GetException() == nil {
txOrigin := txe.GetOrigin()
if txOrigin == nil {
// This is an original transaction from the current chain so we build its origin from context
txOrigin = &chain.Origin{
ChainID: chainID,
Height: block.GetHeight(),
Index: txe.GetIndex(),
}
}
for _, event := range events {
var tagged query.Tagged = event
eventID := exec.SolidityEventID(event.GetTopics())
eventSpec, eventSpecErr := getEventSpec(eventID, event.GetAddress())
if eventSpecErr != nil {
logger.InfoMsg("could not get ABI for solidity event",
structure.ErrorKey, eventSpecErr,
"event_id", eventID,
"address", event.GetAddress())
} else {
// Since we have the event ABI we will allow matching on ABI fields
tagged = query.TagsFor(event, query.TaggedPrefix("Event", eventSpec))
}
// see which spec filter matches with the one in event data
for _, eventClass := range projection.Spec {
qry, err := eventClass.Query()
if err != nil {
return errors.Wrapf(err, "Error parsing query from filter string")
}
// there's a matching filter, add data to the rows
if qry.Matches(tagged) {
if eventSpecErr != nil {
return errors.Wrapf(eventSpecErr, "could not get ABI for solidity event matching "+
"projection filter \"%s\" with id %v at address %v",
eventClass.Filter, eventID, event.GetAddress())
}
logger.InfoMsg("Matched event", "event_id", eventID, "filter", eventClass.Filter)
// unpack, decode & build event data
eventData, err := buildEventData(projection, eventClass, event, txOrigin, eventSpec, logger)
if err != nil {
return errors.Wrapf(err, "Error building event data")
}
// set row in structure
blockData.AddRow(eventClass.TableName, eventData)
}
}
}
}
}
// upsert rows in specific SQL event tables and update block number
// store block data in SQL tables (if any)
for name, rows := range blockData.Data.Tables {
logger.InfoMsg("Upserting rows in SQL table", "height", blockHeight, "table", name, "action", "UPSERT", "rows", rows)
}
eventCh <- blockData.Data
return nil
}
}