/
tokenmeta.go
113 lines (94 loc) · 3.06 KB
/
tokenmeta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package tokenmeta
import (
"encoding/json"
"fmt"
pbstatedb "github.com/dfuse-io/dfuse-eosio/pb/dfuse/eosio/statedb/v1"
"github.com/dfuse-io/bstream"
pbabicodec "github.com/dfuse-io/dfuse-eosio/pb/dfuse/eosio/abicodec/v1"
pbcodec "github.com/dfuse-io/dfuse-eosio/pb/dfuse/eosio/codec/v1"
pbtokenmeta "github.com/dfuse-io/dfuse-eosio/pb/dfuse/eosio/tokenmeta/v1"
"github.com/dfuse-io/dfuse-eosio/tokenmeta/cache"
pbblockmeta "github.com/dfuse-io/pbgo/dfuse/blockmeta/v1"
"github.com/dfuse-io/shutter"
"github.com/eoscanada/eos-go"
"go.uber.org/zap"
)
const AccountsTable eos.TableName = eos.TableName("accounts")
const StatTable eos.TableName = eos.TableName("stat")
const EOSStakeTable eos.TableName = eos.TableName("delband")
var maxStateDBRetry = 5
type TokenMeta struct {
*shutter.Shutter
source bstream.Source
cache cache.Cache
abiCodecCli pbabicodec.DecoderClient
abisCache map[string]*abiItem
saveEveryNBlock uint32
stateClient pbstatedb.StateClient
blockmeta pbblockmeta.BlockIDClient
}
func NewTokenMeta(
cache cache.Cache,
abiCodecCli pbabicodec.DecoderClient,
saveEveryNBlock uint32,
stateClient pbstatedb.StateClient,
blockmeta pbblockmeta.BlockIDClient,
) *TokenMeta {
if blkTime := cache.GetHeadBlockTime(); !blkTime.IsZero() {
HeadTimeDrift.SetBlockTime(blkTime)
}
return &TokenMeta{
Shutter: shutter.New(),
cache: cache,
abisCache: map[string]*abiItem{},
abiCodecCli: abiCodecCli,
saveEveryNBlock: saveEveryNBlock,
stateClient: stateClient,
blockmeta: blockmeta,
}
}
func (t *TokenMeta) decodeDBOpToRow(data []byte, tableName eos.TableName, contract eos.AccountName, blocknum uint32) (json.RawMessage, error) {
abi, err := t.getABI(contract, blocknum)
if err != nil {
return nil, fmt.Errorf("cannot get ABI: %w", err)
}
return decodeTableRow(data, tableName, abi)
}
func (i *TokenMeta) Launch() error {
zlog.Info("launching pipeline")
go i.source.Run()
<-i.source.Terminated()
zlog.Info("source is done")
zlog.Info("export cache")
err := i.cache.SaveToFile()
if err != nil {
zlog.Error("error exporting cache on shutdown", zap.Error(err))
}
if err := i.source.Err(); err != nil {
zlog.Error("source shutdown with error", zap.Error(err))
return err
}
return nil
}
func shouldProcessDbop(dbop *pbcodec.DBOp, actionMatcher pbcodec.FilteringActionMatcher) bool {
if !actionMatcher.Matched(dbop.ActionIndex) {
return false
}
return dbop.TableName == string(AccountsTable) || dbop.TableName == string(StatTable)
}
func shouldProcessAction(actTrace *pbcodec.ActionTrace, actionMatcher pbcodec.FilteringActionMatcher) bool {
// TODO should I do this check? when does actionMatcher know if it is system action
if !actionMatcher.Matched(actTrace.ExecutionIndex) {
return false
}
if actTrace.Receiver != "eosio" || actTrace.Action.Account != "eosio" {
return false
}
return actTrace.Action.Name == "setabi"
}
func TokenToEOSSymbol(e *pbtokenmeta.Token) *eos.Symbol {
return &eos.Symbol{
Precision: uint8(e.Precision),
Symbol: e.Symbol,
}
}