-
Notifications
You must be signed in to change notification settings - Fork 17
/
general_worker.go
136 lines (114 loc) · 2.61 KB
/
general_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package periodic
import (
"context"
"strings"
"time"
"github.com/baking-bad/bcdhub/internal/teztnets"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
)
// GeneralWorker -
type GeneralWorker struct {
rpc *teztnets.RPC
schedule string
cron *cron.Cron
handler ChangedHandler
urls map[string]string
}
// NewGeneralWorker -
func NewGeneralWorker(cfg Config, handler ChangedHandler) (*GeneralWorker, error) {
w := &GeneralWorker{
cron: cron.New(
cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)),
),
schedule: cfg.Schedule,
handler: handler,
urls: make(map[string]string),
}
rpc, err := teztnets.New(cfg.InfoBaseURL)
if err != nil {
return w, err
}
w.rpc = rpc
return w, nil
}
// Start -
func (w *GeneralWorker) Start(ctx context.Context) {
if w.handler == nil {
return
}
if _, err := w.checkNetwork(ctx); err != nil {
log.Err(err).Msg("failed to receive periodic network info")
return
}
if _, err := w.cron.AddFunc(
w.schedule,
w.handleScheduleEvent(ctx),
); err != nil {
log.Err(err).Msg("failed to run cron function")
return
}
w.cron.Start()
}
// Close -
func (w *GeneralWorker) Close() error {
w.cron.Stop()
return nil
}
func (w *GeneralWorker) handleScheduleEvent(ctx context.Context) func() {
return func() {
log.Info().Msg("trying to receive new rpc url")
changed, err := w.checkNetwork(ctx)
if err != nil {
log.Err(err).Msg("failed to receive periodic network info")
}
if changed {
return
}
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
changed, err := w.checkNetwork(ctx)
if err != nil {
log.Err(err).Msg("failed to receive periodic network info")
}
if changed {
return
}
}
}
}
}
func (w *GeneralWorker) checkNetwork(ctx context.Context) (bool, error) {
info, err := w.rpc.Teztnets(ctx)
if err != nil {
return false, err
}
var changed bool
for name, data := range info {
parts := strings.Split(name, "-")
if len(parts) == 0 {
continue
}
network := parts[0]
if current := w.urls[network]; current != data.RPCURL {
if err := w.handler(ctx, network, data.RPCURL); err != nil {
log.Err(err).Str("network", network).Msg("failed to apply new rpc url")
}
w.urls[network] = data.RPCURL
log.Info().Str("network", network).Str("url", data.RPCURL).Msg("new url was found")
changed = true
}
}
return changed, nil
}
// URLs -
func (w *GeneralWorker) URLs() map[string]string {
return w.urls
}