/
events_jobcreator.go
78 lines (67 loc) · 1.75 KB
/
events_jobcreator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package web3
import (
"context"
"github.com/bacalhau-project/generic-dcn/pkg/system"
"github.com/bacalhau-project/generic-dcn/pkg/web3/bindings/jobcreator"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog/log"
)
type JobCreatorEventChannels struct {
jobAddedChan chan *jobcreator.JobcreatorJobAdded
jobAddedSubs []func(jobcreator.JobcreatorJobAdded)
}
func NewJobCreatorEventChannels() *JobCreatorEventChannels {
return &JobCreatorEventChannels{
jobAddedChan: make(chan *jobcreator.JobcreatorJobAdded),
jobAddedSubs: []func(jobcreator.JobcreatorJobAdded){},
}
}
func (s *JobCreatorEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
return err
}
var jobAddedSub event.Subscription
connectJobAddedSub := func() (event.Subscription, error) {
log.Debug().
Str("jobcreator->connect", "JobAdded").
Msgf("")
return sdk.Contracts.JobCreator.WatchJobAdded(
&bind.WatchOpts{Start: &blockNumber, Context: ctx},
s.jobAddedChan,
)
}
jobAddedSub, err = connectJobAddedSub()
if err != nil {
return err
}
go func() {
<-ctx.Done()
jobAddedSub.Unsubscribe()
}()
for {
select {
case event := <-s.jobAddedChan:
log.Debug().
Str("storage->event", "DealStateChange").
Msgf("%+v", event)
for _, handler := range s.jobAddedSubs {
go handler(*event)
}
case err := <-jobAddedSub.Err():
jobAddedSub.Unsubscribe()
jobAddedSub, err = connectJobAddedSub()
if err != nil {
return err
}
}
}
}
func (t *JobCreatorEventChannels) SubscribeJobAdded(handler func(jobcreator.JobcreatorJobAdded)) {
t.jobAddedSubs = append(t.jobAddedSubs, handler)
}