This repository has been archived by the owner on May 13, 2022. It is now read-only.
/
rowbuilder.go
184 lines (161 loc) · 6.26 KB
/
rowbuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package service
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"unicode/utf8"
"github.com/hyperledger/burrow/execution/evm/abi"
"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/vent/sqlsol"
"github.com/hyperledger/burrow/vent/types"
"github.com/pkg/errors"
)
// buildEventData builds event data from transactions
func buildEventData(projection *sqlsol.Projection, eventClass *types.EventClass, event *exec.Event,
txOrigin *exec.Origin, evAbi *abi.EventSpec, logger *logging.Logger) (types.EventDataRow, error) {
// a fresh new row to store column/value data
row := make(map[string]interface{})
// get header & log data for the given event
eventHeader := event.GetHeader()
eventLog := event.GetLog()
// decode event data using the provided abi specification
decodedData, err := decodeEvent(eventHeader, eventLog, txOrigin, evAbi)
if err != nil {
return types.EventDataRow{}, errors.Wrapf(err, "Error decoding event (filter: %s)", eventClass.Filter)
}
logger.InfoMsg("Decoded event", decodedData)
rowAction := types.ActionUpsert
// for each data element, maps to SQL columnName and gets its value
// if there is no matching column for the item, it doesn't need to be stored in db
for fieldName, value := range decodedData {
// Can't think of case where we will get a key that is empty, but if we ever did we should not treat
// it as a delete marker when the delete marker field in unset
if eventClass.DeleteMarkerField != "" && eventClass.DeleteMarkerField == fieldName {
rowAction = types.ActionDelete
}
fieldMapping := eventClass.GetFieldMapping(fieldName)
if fieldMapping == nil {
continue
}
column, err := projection.GetColumn(eventClass.TableName, fieldMapping.ColumnName)
if err == nil {
if fieldMapping.BytesToString {
if bs, ok := value.(*[]byte); ok {
str := sanitiseBytesForString(*bs, logger)
row[column.Name] = interface{}(str)
continue
}
}
row[column.Name] = value
} else {
logger.TraceMsg("could not get column", "err", err)
}
}
return types.EventDataRow{Action: rowAction, RowData: row, EventClass: eventClass}, nil
}
// buildBlkData builds block data from block stream
func buildBlkData(tbls types.EventTables, block *exec.BlockExecution) (types.EventDataRow, error) {
// a fresh new row to store column/value data
row := make(map[string]interface{})
// block raw data
if _, ok := tbls[tables.Block]; ok {
blockHeader, err := json.Marshal(block.Header)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("could not marshal BlockHeader in block %v", block)
}
row[columns.Height] = fmt.Sprintf("%v", block.Height)
row[columns.BlockHeader] = string(blockHeader)
} else {
return types.EventDataRow{}, fmt.Errorf("table: %s not found in table structure %v", tables.Block, tbls)
}
return types.EventDataRow{Action: types.ActionUpsert, RowData: row}, nil
}
// buildTxData builds transaction data from tx stream
func buildTxData(txe *exec.TxExecution) (types.EventDataRow, error) {
// transaction raw data
envelope, err := json.Marshal(txe.Envelope)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal envelope in tx %v: %v", txe, err)
}
events, err := json.Marshal(txe.Events)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal events in tx %v: %v", txe, err)
}
result, err := json.Marshal(txe.Result)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal result in tx %v: %v", txe, err)
}
receipt, err := json.Marshal(txe.Receipt)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal receipt in tx %v: %v", txe, err)
}
exception, err := json.Marshal(txe.Exception)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal exception in tx %v: %v", txe, err)
}
origin, err := json.Marshal(txe.Origin)
if err != nil {
return types.EventDataRow{}, fmt.Errorf("couldn't marshal origin in tx %v: %v", txe, err)
}
return types.EventDataRow{
Action: types.ActionUpsert,
RowData: map[string]interface{}{
columns.Height: txe.Height,
columns.TxHash: txe.TxHash.String(),
columns.TxIndex: txe.Index,
columns.TxType: txe.TxType.String(),
columns.Envelope: string(envelope),
columns.Events: string(events),
columns.Result: string(result),
columns.Receipt: string(receipt),
columns.Origin: string(origin),
columns.Exception: string(exception),
},
}, nil
}
func sanitiseBytesForString(bs []byte, l *logging.Logger) string {
str, err := UTF8StringFromBytes(bs)
if err != nil {
l.InfoMsg("buildEventData() received invalid bytes for utf8 string - proceeding with sanitised version",
"err", err)
}
// The only null bytes in utf8 are for the null code point/character so this is fine in general
return strings.Trim(str, "\x00")
}
// Checks whether the bytes passed are valid utf8 string bytes. If they are not returns a sanitised string version of the
// bytes with offending sequences replaced by the utf8 replacement/error rune and an error indicating the offending
// byte sequences and their position. Note: always returns a valid string regardless of error.
func UTF8StringFromBytes(bs []byte) (string, error) {
// Provide fast path for good strings
if utf8.Valid(bs) {
return string(bs), nil
}
buf := new(bytes.Buffer)
var runeErrs []string
// This loops over runs (code points and unlike range of string gives us index of code point (i.e. utf8 char)
// not bytes, which we want for error message
var offset int
// Iterate over character indices (not byte indices)
for i := 0; i < len(bs); i++ {
r, n := utf8.DecodeRune(bs[offset:])
buf.WriteRune(r)
if r == utf8.RuneError {
runeErrs = append(runeErrs, fmt.Sprintf("0x% X (at index %d)", bs[offset:offset+n], i))
}
offset += n
}
str := buf.String()
errHeader := fmt.Sprintf("bytes purported to represent the string '%s'", str)
switch len(runeErrs) {
case 0:
// should not happen
return str, fmt.Errorf("bytes appear to be invalid utf8 but do not contain invalid code points")
case 1:
return str, fmt.Errorf("%s contain invalid utf8 byte sequence: %s", errHeader, runeErrs[0])
default:
return str, fmt.Errorf("%s contain invalid utf8 byte sequences: %s", errHeader,
strings.Join(runeErrs, ", "))
}
}