forked from ava-labs/ortelius
/
processor.go
224 lines (188 loc) · 5.2 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// (c) 2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package stream
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/corpetty/ortelius/services"
"github.com/corpetty/ortelius/cfg"
)
var (
processorFailureRetryInterval = 200 * time.Millisecond
// ErrUnknownProcessorType is returned when encountering a client type with no
// known implementation
ErrUnknownProcessorType = errors.New("unknown processor type")
// ErrNoMessage is no message
ErrNoMessage = errors.New("no message")
)
// ProcessorFactory takes in configuration and returns a stream Processor
type ProcessorFactory func(*services.Control, cfg.Config, string, string, int, int) (Processor, error)
// Processor handles writing and reading to/from the event stream
type Processor interface {
ProcessNextMessage() error
Close() error
Failure()
Success()
ID() string
}
type ProcessorFactoryChainDB func(*services.Control, cfg.Config, string, string) (ProcessorDB, error)
type ProcessorFactoryInstDB func(*services.Control, cfg.Config) (ProcessorDB, error)
type ProcessorDB interface {
Process(*services.Connections, *services.TxPool) error
Close() error
ID() string
Topic() []string
}
// ProcessorManager supervises the Processor lifecycle; it will use the given
// configuration and ProcessorFactory to keep a Processor active
type ProcessorManager struct {
conf cfg.Config
sc *services.Control
factory ProcessorFactory
// Concurrency control
quitCh chan struct{}
doneCh chan struct{}
idx int
maxIdx int
}
// NewProcessorManager creates a new *ProcessorManager ready for listening
func NewProcessorManager(sc *services.Control, conf cfg.Config, factory ProcessorFactory, idx int, maxIdx int) *ProcessorManager {
return &ProcessorManager{
conf: conf,
sc: sc,
factory: factory,
quitCh: make(chan struct{}),
doneCh: make(chan struct{}),
idx: idx,
maxIdx: maxIdx,
}
}
// Listen sets a client to listen for and handle incoming messages
func (c *ProcessorManager) Listen() error {
// Create a backend for each chain we want to watch and wait for them to exit
wg := &sync.WaitGroup{}
wg.Add(len(c.conf.Chains))
for _, chainConfig := range c.conf.Chains {
go func(chainConfig cfg.Chain) {
c.sc.Log.Info("Started worker manager for chain %s", chainConfig.ID)
defer c.sc.Log.Info("Exiting worker manager for chain %s", chainConfig.ID)
defer wg.Done()
// Keep running the worker until we're asked to stop
var err error
for !c.isStopping() {
err = c.runProcessor(chainConfig)
// If there was an error we want to log it, and iff we are not stopping
// we want to add a retry delay.
if err != nil {
c.sc.Log.Error("Error running worker: %v", err)
}
if c.isStopping() {
return
}
if err != nil {
<-time.After(processorFailureRetryInterval)
}
}
}(chainConfig)
}
// Wait for all workers to finish
wg.Wait()
c.sc.Log.Info("All workers stopped")
close(c.doneCh)
return nil
}
// Close tells the workers to shutdown and waits for them to all stop
func (c *ProcessorManager) Close() error {
close(c.quitCh)
<-c.doneCh
return nil
}
// isStopping returns true iff quitCh has been signaled
func (c *ProcessorManager) isStopping() bool {
select {
case <-c.quitCh:
return true
default:
return false
}
}
// runProcessor starts the processing loop for the backend and closes it when
// finished
func (c *ProcessorManager) runProcessor(chainConfig cfg.Chain) error {
if c.isStopping() {
c.sc.Log.Info("Not starting worker for chain %s because we're stopping", chainConfig.ID)
return nil
}
c.sc.Log.Info("Starting worker for chain %s", chainConfig.ID)
defer c.sc.Log.Info("Exiting worker for chain %s", chainConfig.ID)
// Create a backend to get messages from
backend, err := c.factory(c.sc, c.conf, chainConfig.VMType, chainConfig.ID, c.idx, c.maxIdx)
if err != nil {
return err
}
defer backend.Close()
// Create a closure that processes the next message from the backend
var (
successes int
failures int
nomsg int
processNextMessage = func() error {
err := backend.ProcessNextMessage()
switch err {
case nil:
successes++
backend.Success()
return nil
// This error is expected when the upstream service isn't producing
case context.DeadlineExceeded:
nomsg++
c.sc.Log.Debug("context deadline exceeded")
return nil
case ErrNoMessage:
nomsg++
c.sc.Log.Debug("no message")
return nil
case io.EOF:
c.sc.Log.Error("EOF")
return io.EOF
default:
failures++
backend.Failure()
c.sc.Log.Error("Unknown error: %v", err)
return err
}
}
)
id := backend.ID()
t := time.NewTicker(30 * time.Second)
tdoneCh := make(chan struct{})
defer func() {
t.Stop()
close(tdoneCh)
}()
// Log run statistics periodically until asked to stop
go func() {
for {
select {
case <-t.C:
c.sc.Log.Info("IProcessor %s successes=%d failures=%d nomsg=%d", id, successes, failures, nomsg)
if c.isStopping() {
return
}
case <-tdoneCh:
return
}
}
}()
// Process messages until asked to stop
for !c.isStopping() {
err := processNextMessage()
if err != nil {
return err
}
}
return nil
}