Skip to content

Commit

Permalink
fix: avoid potential hang when starting event listener
Browse files Browse the repository at this point in the history
It was possible for NewEvents to never return, blocked on waiting for a WaitGroup to be done.
The call to Done was in a goroutine that could exit before reaching the Done call.

Replace the WaitGroup with a channel that is closed to signal that initialisation is complete.
Also, while we are waiting on the channel, wait on the context so we can exit clealy if the
context is canceled.
  • Loading branch information
iand authored and codefather-filestar committed Jun 21, 2021
1 parent a1a4a31 commit 8ddbbb6
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
var log = logging.Logger("events")

// HeightHandler `curH`-`ts.Height` = `confidence`
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
type (
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
confidence int
Expand All @@ -48,7 +50,7 @@ type Events struct {
tsc *tipSetCache
lk sync.Mutex

ready sync.WaitGroup
ready chan struct{}
readyOnce sync.Once

heightEvents
Expand Down Expand Up @@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
}

e.ready.Add(1)

go e.listenHeadChanges(ctx)

e.ready.Wait()

// TODO: cleanup/gc goroutine
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}

return e
}
Expand All @@ -111,13 +114,21 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}

cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
var cur []*api.HeadChange
var ok bool

// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}

if len(cur) != 1 {
Expand All @@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

e.readyOnce.Do(func() {
e.lastTs = cur[0].Val

e.ready.Done()
// Signal that we have seen first tipset
close(e.ready)
})

for notif := range notifs {
Expand Down

0 comments on commit 8ddbbb6

Please sign in to comment.