-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
events_called.go
633 lines (521 loc) · 18.2 KB
/
events_called.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
package events
import (
"context"
"math"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
)
const NoTimeout = math.MaxInt64
const NoHeight = abi.ChainEpoch(-1)
type triggerID = uint64
// msgH is the block height at which a message was present / event has happened
type msgH = abi.ChainEpoch
// triggerH is the block height at which the listener will be notified about the
// message (msgH+confidence)
type triggerH = abi.ChainEpoch
type eventData interface{}
// EventHandler arguments:
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
// wait for has already happened in tipset `ts`
//
// If `done` is true, timeout won't be triggered
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
// may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
// Keep track of information for an event handler
type handlerInfo struct {
confidence int
timeout abi.ChainEpoch
disabled bool // TODO: GC after gcConfidence reached
handle EventHandler
revert RevertHandler
}
// When a change occurs, a queuedEvent is created and put into a queue
// until the required confidence is reached
type queuedEvent struct {
trigger triggerID
prevH abi.ChainEpoch
h abi.ChainEpoch
data eventData
called bool
}
// Manages chain head change events, which may be forward (new tipset added to
// chain) or backward (chain branch discarded in favour of heavier branch)
type hcEvents struct {
cs EventAPI
tsc *tipSetCache
ctx context.Context
gcConfidence uint64
lastTs *types.TipSet
lk sync.Mutex
ctr triggerID
triggers map[triggerID]*handlerInfo
// maps block heights to events
// [triggerH][msgH][event]
confQueue map[triggerH]map[msgH][]*queuedEvent
// [msgH][triggerH]
revertQueue map[msgH][]triggerH
// [timeoutH+confidence][triggerID]{calls}
timeouts map[abi.ChainEpoch]map[triggerID]int
messageEvents
watcherEvents
}
func newHCEvents(ctx context.Context, cs EventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents {
e := hcEvents{
ctx: ctx,
cs: cs,
tsc: tsc,
gcConfidence: gcConfidence,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerID]*handlerInfo{},
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
}
e.messageEvents = newMessageEvents(ctx, &e, cs)
e.watcherEvents = newWatcherEvents(ctx, &e, cs)
return &e
}
// Called when there is a change to the head with tipsets to be
// reverted / applied
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
e.handleReverts(ts)
e.lastTs = ts
}
for _, ts := range app {
// Check if the head change caused any state changes that we were
// waiting for
stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, e.lastTs, ts)
}
// Check if the head change included any new message calls
newCalls, err := e.messageEvents.checkNewCalls(ts)
if err != nil {
return err
}
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, ts)
}
}
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
}
// Update the latest known tipset
e.lastTs = ts
}
return nil
}
func (e *hcEvents) handleReverts(ts *types.TipSet) {
reverts, ok := e.revertQueue[ts.Height()]
if !ok {
return // nothing to do
}
for _, triggerH := range reverts {
toRevert := e.confQueue[triggerH][ts.Height()]
for _, event := range toRevert {
if !event.called {
continue // event wasn't apply()-ied yet
}
trigger := e.triggers[event.trigger]
if err := trigger.revert(e.ctx, ts); err != nil {
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err)
}
}
delete(e.confQueue[triggerH], ts.Height())
}
delete(e.revertQueue, ts.Height())
}
// Queue up events until the chain has reached a height that reflects the
// desired confidence
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
trigger := e.triggers[trigID]
prevH := NoHeight
if prevTs != nil {
prevH = prevTs.Height()
}
appliedH := ts.Height()
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
byOrigH, ok := e.confQueue[triggerH]
if !ok {
byOrigH = map[abi.ChainEpoch][]*queuedEvent{}
e.confQueue[triggerH] = byOrigH
}
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
trigger: trigID,
prevH: prevH,
h: appliedH,
data: data,
})
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
}
// Apply any events that were waiting for this chain height for confidence
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
byOrigH, ok := e.confQueue[height]
if !ok {
return // no triggers at this height
}
for origH, events := range byOrigH {
triggerTs, err := e.tsc.get(origH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height)
}
for _, event := range events {
if event.called {
continue
}
trigger := e.triggers[event.trigger]
if trigger.disabled {
continue
}
// Previous tipset - this is relevant for example in a state change
// from one tipset to another
var prevTs *types.TipSet
if event.prevH != NoHeight {
prevTs, err = e.tsc.get(event.prevH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height)
continue
}
}
more, err := trigger.handle(event.data, prevTs, triggerTs, height)
if err != nil {
log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
continue // don't revert failed calls
}
event.called = true
touts, ok := e.timeouts[trigger.timeout]
if ok {
touts[event.trigger]++
}
trigger.disabled = !more
}
}
}
// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
if !ok {
return // nothing to do
}
for triggerID, calls := range triggers {
if calls > 0 {
continue // don't timeout if the method was called
}
trigger := e.triggers[triggerID]
if trigger.disabled {
continue
}
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
}
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
continue // don't revert failed calls
}
trigger.disabled = !more // allows messages after timeout
}
}
// Listen for an event
// - CheckFunc: immediately checks if the event already occurred
// - EventHandler: called when the event has occurred, after confidence tipsets
// - RevertHandler: called if the chain head changes causing the event to revert
// - confidence: wait this many tipsets before calling EventHandler
// - timeout: at this chain height, timeout on waiting for this event
func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
e.lk.Lock()
defer e.lk.Unlock()
// Check if the event has already occurred
ts, err := e.tsc.best()
if err != nil {
return 0, xerrors.Errorf("error getting best tipset: %w", err)
}
done, more, err := check(ts)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
}
if done {
timeout = NoTimeout
}
// Create a trigger for the event
id := e.ctr
e.ctr++
e.triggers[id] = &handlerInfo{
confidence: confidence,
timeout: timeout + abi.ChainEpoch(confidence),
disabled: !more,
handle: hnd,
revert: rev,
}
// If there's a timeout, set up a timeout check at that height
if timeout != NoTimeout {
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
}
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
}
return id, nil
}
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
// to listen for changes
type headChangeAPI interface {
onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
}
// watcherEvents watches for a state change
type watcherEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID]StateMatchFunc
}
func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) watcherEvents {
return watcherEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]StateMatchFunc),
}
}
// Run each of the matchers against the previous and current state to see if
// there's a change
func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData {
we.lk.RLock()
defer we.lk.RUnlock()
res := make(map[triggerID]eventData)
for tid, matchFn := range we.matchers {
ok, data, err := matchFn(oldState, newState)
if err != nil {
log.Errorf("event diff fn failed: %s", err)
continue
}
if ok {
res[tid] = data
}
}
return res
}
// StateChange represents a change in state
type StateChange interface{}
// StateChangeHandler arguments:
// `oldTs` is the state "from" tipset
// `newTs` is the state "to" tipset
// `states` is the change in state
// `curH`-`ts.Height` = `confidence`
type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error)
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
// StateChanged registers a callback which is triggered when a specified state
// change occurs or a timeout is reached.
//
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
// returns two booleans - `done`, and `more`.
//
// * `done` should be true when some on-chain state change we are waiting
// for has happened. When `done` is set to true, timeout trigger is disabled.
//
// * `more` should be false when we don't want to receive new notifications
// through StateChangeHandler. Note that notifications may still be delivered to
// RevertHandler
//
// * `StateChangeHandler` is called when the specified state change was observed
// on-chain, and a confidence threshold was reached, or the specified `timeout`
// height was reached with no state change observed. When this callback is
// invoked on a timeout, `oldState` and `newState` are set to nil.
// This callback returns a boolean specifying whether further notifications
// should be sent, like `more` return param from `CheckFunc` above.
//
// * `RevertHandler` is called after apply handler, when we drop the tipset
// containing the message. The tipset passed as the argument is the tipset
// that is being dropped. Note that the event dropped may be re-applied
// in a different tipset in small amount of time.
//
// * `StateMatchFunc` is called against each tipset state. If there is a match,
// the state change is queued up until the confidence interval has elapsed (and
// `StateChangeHandler` is called)
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
states, ok := data.(StateChange)
if data != nil && !ok {
panic("expected StateChange")
}
return scHnd(prevTs, ts, states, height)
}
id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
we.lk.Lock()
defer we.lk.Unlock()
we.matchers[id] = mf
return nil
}
// messageEvents watches for message calls to actors
type messageEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID]MsgMatchFunc
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) messageEvents {
blsMsgCache, _ := lru.NewARC(500)
return messageEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
blockMsgLk: sync.Mutex{},
blockMsgCache: blsMsgCache,
}
}
// Check if there are any new actor calls
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) {
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return nil, err
}
me.lk.RLock()
defer me.lk.RUnlock()
// For each message in the tipset
res := make(map[triggerID][]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts
// Run each trigger's matcher against the message
for tid, matchFn := range me.matchers {
matched, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
// If there was a match, include the message in the results for the
// trigger
if matched {
res[tid] = append(res[tid], msg)
}
}
})
return res, nil
}
// Get the messages in a tipset
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
me.blockMsgLk.Lock()
msgsI, ok := me.blockMsgCache.Get(tsb.Cid())
var err error
if !ok {
msgsI, err = me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
me.blockMsgLk.Unlock()
continue
}
me.blockMsgCache.Add(tsb.Cid(), msgsI)
}
me.blockMsgLk.Unlock()
msgs := msgsI.(*api.BlockMessages)
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
if ok {
continue
}
seen[m.Cid()] = struct{}{}
consume(m)
}
for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()]
if ok {
continue
}
seen[m.Message.Cid()] = struct{}{}
consume(&m.Message)
}
}
}
// MsgHandler arguments:
// `ts` is the tipset, in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
// Called registers a callback which is triggered when a specified method is
// called on an actor, or a timeout is reached.
//
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
// returns two booleans - `done`, and `more`.
//
// * `done` should be true when some on-chain action we are waiting for has
// happened. When `done` is set to true, timeout trigger is disabled.
//
// * `more` should be false when we don't want to receive new notifications
// through MsgHandler. Note that notifications may still be delivered to
// RevertHandler
//
// * `MsgHandler` is called when the specified event was observed on-chain,
// and a confidence threshold was reached, or the specified `timeout` height
// was reached with no events observed. When this callback is invoked on a
// timeout, `msg` is set to nil. This callback returns a boolean specifying
// whether further notifications should be sent, like `more` return param
// from `CheckFunc` above.
//
// * `RevertHandler` is called after apply handler, when we drop the tipset
// containing the message. The tipset passed as the argument is the tipset
// that is being dropped. Note that the message dropped may be re-applied
// in a different tipset in small amount of time.
//
// * `MsgMatchFunc` is called against each message. If there is a match, the
// message is queued up until the confidence interval has elapsed (and
// `MsgHandler` is called)
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
msg, ok := data.(*types.Message)
if data != nil && !ok {
panic("expected msg")
}
ml, err := me.cs.StateSearchMsg(me.ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
if err != nil {
return false, err
}
if ml == nil {
return msgHnd(msg, nil, ts, height)
}
return msgHnd(msg, &ml.Receipt, ts, height)
}
id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
me.lk.Lock()
defer me.lk.Unlock()
me.matchers[id] = mf
return nil
}
// Convenience function for checking and matching messages
func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
}