Skip to content

Commit

Permalink
fix eventstream.Unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
cupen committed Dec 15, 2020
1 parent f35fc72 commit 0726c84
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
11 changes: 1 addition & 10 deletions cluster/partition_manager.go
Expand Up @@ -37,29 +37,20 @@ func (pm *PartitionManager) Start() {
_, ok := m.(*ClusterTopologyEventV2)
return ok
})

pm.deadletterSub = system.EventStream.
Subscribe(func(ev interface{}) {
pm.onDeadLetterEvent(ev.(*actor.DeadLetterEvent))
}).
WithPredicate(func(m interface{}) bool {
_, ok := m.(*actor.DeadLetterEvent)
return ok
})
}

// Stop ...
func (pm *PartitionManager) Stop() {
system := pm.cluster.ActorSystem
system.EventStream.Unsubscribe(pm.topologySub)
system.EventStream.Unsubscribe(pm.deadletterSub)
pm.kinds.Range(func(k, v interface{}) bool {
kind := k.(string)
pk := v.(*PartitionKind)
plog.Info("Stopping partition", log.String("kind", kind), log.String("pk", pk.actorNames.Identity))
pk.stop()
return true
})
plog.Info("Stopped PartitionManager")
}

// PidOfIdentityActor ...
Expand Down
7 changes: 6 additions & 1 deletion eventstream/eventstream.go
Expand Up @@ -32,12 +32,17 @@ func (es *EventStream) Subscribe(fn func(evt interface{})) *Subscription {
}

func (es *EventStream) Unsubscribe(sub *Subscription) {
if sub.i == -1 {
if sub == nil || sub.i == -1 {
return
}

es.Lock()
defer es.Unlock()
// re-check, there was a twice unsubscribe somewhere.
if sub == nil || sub.i == -1 {
return
}

i := sub.i
l := len(es.subscriptions) - 1

Expand Down

0 comments on commit 0726c84

Please sign in to comment.