/
events_token.go
81 lines (70 loc) · 1.73 KB
/
events_token.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package web3
import (
"context"
"github.com/bacalhau-project/generic-dcn/pkg/system"
"github.com/bacalhau-project/generic-dcn/pkg/web3/bindings/token"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog/log"
)
type TokenEventChannels struct {
transferChan chan *token.TokenTransfer
transferSubs []func(token.TokenTransfer)
}
func NewTokenEventChannels() *TokenEventChannels {
return &TokenEventChannels{
transferChan: make(chan *token.TokenTransfer),
transferSubs: []func(token.TokenTransfer){},
}
}
func (t *TokenEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
return err
}
var transferSub event.Subscription
connectTransferSub := func() (event.Subscription, error) {
log.Debug().
Str("token->connect", "Transfer").
Msgf("")
return sdk.Contracts.Token.WatchTransfer(
&bind.WatchOpts{Start: &blockNumber, Context: ctx},
t.transferChan,
[]common.Address{},
[]common.Address{},
)
}
transferSub, err = connectTransferSub()
if err != nil {
return err
}
go func() {
<-ctx.Done()
transferSub.Unsubscribe()
}()
for {
select {
case event := <-t.transferChan:
log.Debug().
Str("token->event", "Transfer").
Msgf("%+v", event)
for _, handler := range t.transferSubs {
go handler(*event)
}
case err := <-transferSub.Err():
transferSub.Unsubscribe()
transferSub, err = connectTransferSub()
if err != nil {
return err
}
}
}
}
func (t *TokenEventChannels) SubscribeTransfer(handler func(token.TokenTransfer)) {
t.transferSubs = append(t.transferSubs, handler)
}