/
watcher.go
160 lines (143 loc) · 4.56 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package transactions
import (
"context"
"github.com/alephium/wormhole-fork/node/pkg/common"
"github.com/alephium/wormhole-fork/node/pkg/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
type Watcher struct {
configs *common.BridgeConfig
blockTxsC chan []*BlockTransactions
transactions *mongo.Collection
logger *zap.Logger
}
func NewWatcher(logger *zap.Logger, configs *common.BridgeConfig, db *mongo.Database, blockTxsC chan []*BlockTransactions) *Watcher {
return &Watcher{
logger: logger,
configs: configs,
blockTxsC: blockTxsC,
transactions: db.Collection("transactions"),
}
}
func (w *Watcher) GetLatestEventIndexAlephium(ctx context.Context) (*uint32, error) {
return w.GetLatestEventIndex(ctx, vaa.ChainIDAlephium)
}
func (w *Watcher) GetLatestEventIndexEth(ctx context.Context) (*uint32, error) {
return w.GetLatestEventIndexEvm(ctx, vaa.ChainIDEthereum, 64)
}
func (w *Watcher) GetLatestEventIndexBsc(ctx context.Context) (*uint32, error) {
return w.GetLatestEventIndexEvm(ctx, vaa.ChainIDBSC, 15)
}
func (w *Watcher) GetLatestEventIndexEvm(ctx context.Context, chainId vaa.ChainID, confirmations uint32) (*uint32, error) {
eventIndex, err := w.GetLatestEventIndex(ctx, chainId)
if err != nil {
return nil, err
}
var fromIndex uint32
if *eventIndex <= confirmations {
fromIndex = 1
} else {
fromIndex = *eventIndex - confirmations
}
return &fromIndex, nil
}
func (w *Watcher) GetLatestEventIndex(ctx context.Context, emitterChain vaa.ChainID) (*uint32, error) {
filter := bson.D{{Key: "emitterChain", Value: emitterChain}}
opts := options.FindOne().SetSort(bson.D{{Key: "eventIndex", Value: -1}})
var result TransactionUpdate
err := w.transactions.FindOne(ctx, filter, opts).Decode(&result)
if err == mongo.ErrNoDocuments {
eventIndex := getDefaultEventIndex(emitterChain, w.configs.Network)
return &eventIndex, nil
}
if err != nil {
return nil, err
}
// returns `result.EventIndex` instead of `result.EventIndex + 1` is to avoid possible errors in the last bulk write
return &result.EventIndex, nil
}
func (w *Watcher) Run() func(context.Context) error {
return func(ctx context.Context) error {
errC := make(chan error)
go w.handleTxs(ctx, errC)
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
}
}
func (w *Watcher) handleTxs(ctx context.Context, errC chan error) {
for {
select {
case <-ctx.Done():
return
case blockTxsList := <-w.blockTxsC:
for _, blockTxs := range blockTxsList {
w.logger.Info(
"received new txs",
zap.String("chainId", blockTxs.chainId.String()),
zap.Uint32("blockHeight", blockTxs.blockNumber),
zap.String("blockHash", blockTxs.blockHash),
zap.Int("txsNumber", len(blockTxs.txs)),
)
models := make([]mongo.WriteModel, len(blockTxs.txs))
for i, tx := range blockTxs.txs {
doc := tx.toDoc(blockTxs.blockNumber, blockTxs.blockHash, blockTxs.blockTimestamp)
update := bson.D{{Key: "$set", Value: doc}}
filter := bson.D{{Key: "_id", Value: doc.ID}}
models[i] = mongo.NewUpdateOneModel().SetUpdate(update).SetUpsert(true).SetFilter(filter)
}
opts := options.BulkWrite().SetOrdered(true)
if _, err := w.transactions.BulkWrite(ctx, models, opts); err != nil {
w.logger.Error("failed to insert transactions", zap.Error(err))
errC <- err
return
}
}
}
}
}
func getDefaultEventIndex(emitterChain vaa.ChainID, networkId common.NetworkId) uint32 {
switch emitterChain {
case vaa.ChainIDAlephium:
return getAlephiumDefaultEventIndex()
case vaa.ChainIDEthereum:
return getEthereumDefaultEventIndex(networkId)
case vaa.ChainIDBSC:
return getBSCDefaultEventIndex(networkId)
default:
panic("invalid emitter chain")
}
}
func getAlephiumDefaultEventIndex() uint32 {
return 0
}
func getEthereumDefaultEventIndex(networkId common.NetworkId) uint32 {
switch networkId {
case common.DEVNET:
return 0
case common.TESTNET:
return 9876190 // the block height of the contract deployment tx
case common.MAINNET:
return 18469092 // the block height of the contract deployment tx
default:
panic("invalid network id")
}
}
func getBSCDefaultEventIndex(networkId common.NetworkId) uint32 {
switch networkId {
case common.DEVNET:
return 0
case common.TESTNET:
return 30048254 // the block height of the contract deployment tx
case common.MAINNET:
panic("bsc event index not specified") // TODO: the block height of the contract deployment tx
default:
panic("invalid network id")
}
}