Skip to content

Commit

Permalink
Merge pull request #8441 from filecoin-project/frrist/observer-deregi…
Browse files Browse the repository at this point in the history
…ster

chore: events: implement event observer deregister method
  • Loading branch information
magik6k committed Apr 6, 2022
2 parents 9f98d0a + 20bf46f commit f652dd3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
32 changes: 32 additions & 0 deletions chain/events/events_test.go
Expand Up @@ -1469,3 +1469,35 @@ func TestReconnect(t *testing.T) {
fcs.advance(0, 5, 2, nil, 0, 1, 3)
require.True(t, fcs.callNumber["ChainGetPath"] == 4)
}

func TestUnregister(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fcs := newFakeCS(t)

events, err := NewEvents(ctx, fcs)
require.NoError(t, err)

tsObs := &testObserver{t: t}
events.Observe(tsObs)

// observer receives heads as the chain advances
fcs.advance(0, 1, 0, nil)
headBeforeDeregister := events.lastTs
require.Equal(t, tsObs.head, headBeforeDeregister)

// observer unregistered successfully
found := events.Unregister(tsObs)
require.True(t, found)

// observer stops receiving heads as the chain advances
fcs.advance(0, 1, 0, nil)
require.Equal(t, tsObs.head, headBeforeDeregister)
require.NotEqual(t, tsObs.head, events.lastTs)

// unregistering an invalid observer returns false
dneObs := &testObserver{t: t}
found = events.Unregister(dneObs)
require.False(t, found)
}
24 changes: 24 additions & 0 deletions chain/events/observer.go
Expand Up @@ -253,3 +253,27 @@ func (o *observer) Observe(obs TipSetObserver) *types.TipSet {
o.observers = append(o.observers, obs)
return o.head
}

// Unregister unregisters an observer. Returns true if we successfully removed the observer.
//
// NOTE: The observer _may_ be called after being removed. Observers MUST handle this case
// internally.
func (o *observer) Unregister(obs TipSetObserver) (found bool) {
o.lk.Lock()
defer o.lk.Unlock()
// We _copy_ the observers list because we may be concurrently reading it from a headChange
// handler.
//
// This should happen infrequently, so it's fine if we spend a bit of time here.
newObservers := make([]TipSetObserver, 0, len(o.observers))
for _, existingObs := range o.observers {
if existingObs == obs {
found = true
continue
}
newObservers = append(newObservers, existingObs)
}

o.observers = newObservers
return found
}

0 comments on commit f652dd3

Please sign in to comment.