/
bus.go
114 lines (93 loc) · 3.2 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package monitor
import (
"sync"
"time"
"github.com/MadBase/MadNet/logging"
"github.com/MadBase/MadNet/rbus"
"github.com/sirupsen/logrus"
)
// Bus basic functionality required for monitor system
type Bus interface {
StartLoop() (chan<- bool, error)
StopLoop() error
Register(serviceName string, capacity uint16) (<-chan rbus.Request, error)
Request(serviceName string, timeOut time.Duration, request interface{}) (rbus.Response, error)
}
type monitorBus struct {
sync.WaitGroup
svcs *Services
rbus rbus.Rbus
logger *logrus.Logger
processEventsChan <-chan rbus.Request
cancelChan chan bool
}
// These are pseudo-constants for registering services on bus
var (
// SvcProcessEvents Uses the current state to process more events
SvcWatchEthereum = "SvcWatchEthereum"
// SvcEndpointInSync Checks if the Ethereum endpoint is ready for use
SvcEndpointInSync = "SvcEndpointInSync"
// SvcGetEvents Checks known contracts for recently emitted events
SvcGetEvents = "SvcGetEvents"
// SvcGetValidators Calls contract to get current list of validators
SvcGetValidators = "SvcGetValidators"
// SvcGetSnapShot Calls contract to get snapshots
SvcGetSnapShot = "SvcGetSnapShot"
)
// NewBus setups an rbus for monitoring services
func NewBus(rb rbus.Rbus, svcs *Services) (Bus, error) {
// Each exposed service gets it's own channel
processEventsChan, err := rb.Register(SvcWatchEthereum, 5)
if err != nil {
return nil, err
}
return &monitorBus{
logger: logging.GetLogger("monitor_bus"),
svcs: svcs,
rbus: rb,
processEventsChan: processEventsChan}, nil
}
// StartLoop spawns a gofunc to look for services requests
func (bus *monitorBus) StartLoop() (chan<- bool, error) {
bus.cancelChan = make(chan bool, 10)
go func() {
cancelled := false
for !cancelled {
select {
case cancelled = <-bus.cancelChan:
bus.logger.Infof("Received cancel message: %v", cancelled)
case processEventsRequest := <-bus.processEventsChan:
err := bus.SvcProcessEvents(processEventsRequest)
if err != nil {
bus.logger.Warnf("Failed to process events: %v", err)
}
}
}
bus.logger.Infof("Shutting down...")
bus.Wait()
}()
return bus.cancelChan, nil
}
// StopLoop sends a message on the cancel channel to exit monitoring loop
func (bus *monitorBus) StopLoop() error {
bus.cancelChan <- true
return nil
}
// Register wraps rbus.Register to make the API more consistent
func (bus *monitorBus) Register(serviceName string, capacity uint16) (<-chan rbus.Request, error) {
return bus.rbus.Register(serviceName, capacity)
}
// Request wraps rbus.Request to make the API more consistent
func (bus *monitorBus) Request(serviceName string, timeOut time.Duration, request interface{}) (rbus.Response, error) {
return bus.rbus.Request(serviceName, timeOut, request)
}
// SvcProcessEvents Exposes the ProcessEvents method on the bus
func (bus *monitorBus) SvcProcessEvents(request rbus.Request) error {
var state = request.Request().(*State)
err := bus.svcs.WatchEthereum(state)
if err != nil {
request.Respond(err)
}
request.Respond(state) // TODO Is this required? I only care about error or not, state mutation just happens
return nil
}