From f7e72f9ae07d8a6ad1417b0edf02bc76adfe674e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 20 Jul 2023 16:00:12 +0200 Subject: [PATCH] liveness: allow registering callbacks after start I discovered[^1] a deadlock scenario when multiple nodes in the cluster restart with additional stores that need to be bootstrapped. In that case, liveness must be running when the StoreIDs are allocated, but it is not. Trying to address this problem, I realized that when an auxiliary Store is bootstrapped, it will create a new replicateQueue, which will register a new callback into NodeLiveness. But if liveness must be started at this point to fix #106706, we'll run into the assertion that checks that we don't register callbacks on a started node liveness. Something's got to give: we will allow registering callbacks at any given point in time, and they'll get an initial set of notifications synchronously. I audited the few users of RegisterCallback and this seems OK with all of them. [^1]: https://github.com/cockroachdb/cockroach/issues/106706#issuecomment-1640254715 Epic: None Release note: None --- pkg/kv/kvserver/liveness/liveness.go | 59 +++++++++++++++------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index f158f8e56675..7dfc92c866b0 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -280,9 +280,12 @@ type NodeLiveness struct { nodeDialer *nodedialer.Dialer engineSyncs *singleflight.Group - // onIsLive is a callback registered by stores prior to starting liveness. - // It fires when a node transitions from not live to live. - onIsLive []IsLiveCallback // see RegisterCallback + // onIsLiveMu holds callback registered by stores. + // They fire when a node transitions from not live to live. + onIsLiveMu struct { + syncutil.Mutex + callbacks []IsLiveCallback + } // see RegisterCallback // onSelfHeartbeat is invoked after every successful heartbeat // of the local liveness instance's heartbeat loop. @@ -548,15 +551,8 @@ func (nl *NodeLiveness) cacheUpdated(old livenesspb.Liveness, new livenesspb.Liv // Need to use a different signal to determine if liveness changed. now := nl.clock.Now() if !old.IsLive(now) && new.IsLive(now) { - // NB: If we are not started, we don't use the onIsLive callbacks since they - // can still change. This is a bit of a tangled mess since the startup of - // liveness requires the stores to be started, but stores can't start until - // liveness can run. Ideally we could cache all these updates and call - // onIsLive as part of start. - if nl.started.Get() { - for _, fn := range nl.onIsLive { - fn(new) - } + for _, fn := range nl.callbacks() { + fn(new) } } if !old.Membership.Decommissioned() && new.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { @@ -639,15 +635,6 @@ func (nl *NodeLiveness) Start(ctx context.Context) { retryOpts.Closer = nl.stopper.ShouldQuiesce() nl.started.Set(true) - // We may have received some liveness records from Gossip prior to Start being - // called. We need to go through and notify all the callers of them now. - for _, entry := range nl.ScanNodeVitalityFromCache() { - if entry.IsLive(livenesspb.IsAliveNotification) { - for _, fn := range nl.onIsLive { - fn(entry.GetInternalLiveness()) - } - } - } _ = nl.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) { ambient := nl.ambientCtx @@ -746,6 +733,22 @@ func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness livenesspb.Liven return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */) } +func (nl *NodeLiveness) callbacks() []IsLiveCallback { + nl.onIsLiveMu.Lock() + defer nl.onIsLiveMu.Unlock() + return append([]IsLiveCallback(nil), nl.onIsLiveMu.callbacks...) +} + +func (nl *NodeLiveness) notifyIsAliveCallbacks(fns []IsLiveCallback) { + for _, entry := range nl.ScanNodeVitalityFromCache() { + if entry.IsLive(livenesspb.IsAliveNotification) { + for _, fn := range fns { + fn(entry.GetInternalLiveness()) + } + } + } +} + func (nl *NodeLiveness) heartbeatInternal( ctx context.Context, oldLiveness livenesspb.Liveness, incrementEpoch bool, ) (err error) { @@ -1077,13 +1080,15 @@ func (nl *NodeLiveness) Metrics() Metrics { return nl.metrics } -// RegisterCallback registers a callback to be invoked any time a -// node's IsLive() state changes to true. This must be called before Start. +// RegisterCallback registers a callback to be invoked any time a node's +// IsLive() state changes to true. The provided callback will be invoked +// synchronously from RegisterCallback if the node is currently live. func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { - if nl.started.Get() { - log.Fatalf(context.TODO(), "RegisterCallback called after Start") - } - nl.onIsLive = append(nl.onIsLive, cb) + nl.onIsLiveMu.Lock() + nl.onIsLiveMu.callbacks = append(nl.onIsLiveMu.callbacks, cb) + nl.onIsLiveMu.Unlock() + + nl.notifyIsAliveCallbacks([]IsLiveCallback{cb}) } // updateLiveness does a conditional put on the node liveness record for the