This repository has been archived by the owner on Nov 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
watcher.go
105 lines (86 loc) · 2.4 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package watcher
import (
"context"
"encoding/json"
"github.com/everstake/teztracker/services"
"github.com/everstake/teztracker/services/mailer"
"github.com/everstake/teztracker/services/watcher/pusher"
"github.com/everstake/teztracker/services/watcher/tasks"
"github.com/everstake/teztracker/ws"
"github.com/everstake/teztracker/ws/models"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"time"
)
const eventsChannel = "events"
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
l *pq.Listener
tasks map[models.EventType]tasks.EventExecutor
pushers []eventPusher
}
type eventPusher interface {
Push(event models.EventType, data interface{}) error
}
func NewWatcher(connection string, hub *ws.Hub, provider services.Provider, mail mailer.Mail) *Watcher {
ctx, cancel := context.WithCancel(context.Background())
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
log.Errorf("ListenerEventType error: %s", err)
}
}
listener := pq.NewListener(connection, 10*time.Second, time.Minute, reportProblem)
err := listener.Listen(eventsChannel)
if err != nil {
panic(err)
}
return &Watcher{
ctx: ctx,
cancel: cancel,
l: listener,
tasks: map[models.EventType]tasks.EventExecutor{
models.EventTypeBlock: tasks.NewBlockTask(provider),
models.EventTypeOperation: tasks.NewOperationTask(provider),
models.EventTypeAccountCreated: tasks.NewAccountTask(provider),
models.EventTypeAssetOperation: tasks.NewAssetOperation(provider),
},
pushers: []eventPusher{pusher.NewWSPusher(hub), pusher.NewEmailPusher(mail, provider)},
}
}
type DBEvent struct {
Table string //"table": "blocks",
Action string //"action": "INSERT",
Data interface{} //"data": {}
}
func (w Watcher) Start() {
for {
select {
case <-w.ctx.Done():
w.l.Close()
return
case n := <-w.l.Notify:
var ev DBEvent
err := json.Unmarshal([]byte(n.Extra), &ev)
if err != nil {
log.Error(err)
}
handler, ok := w.tasks[models.EventType(ev.Table)]
if !ok {
log.Errorf("Unknown event: %s", ev.Table)
continue
}
data, err := handler.GetEventData(ev.Data)
if err != nil {
log.Errorf("GetEventData error: %s", err)
continue
}
for _, p := range w.pushers {
err = p.Push(models.EventType(ev.Table), data)
if err != nil {
log.Errorf("Watcher: push: %s", err)
}
}
}
}
}