From fe99a436e57b8059da6eafadad9520b029028d2c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 23:34:43 +0900 Subject: [PATCH 1/2] feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the Phase 3.D PR 4-B-2 → PR 5 chain by adding the §8 leadership-refusal safeguard and flipping htfifoCapabilityAdvertised to true. This is the final piece of the routing+leadership-refusal pair the §11 PR 4 contract requires before a binary is "marked htfifo-eligible". What changes raftengine: leader-acquired observer (mirror of leader-loss) - raftengine.Admin gains RegisterLeaderAcquiredCallback. Same contract as RegisterLeaderLossCallback (non-blocking, panic- contained, sentinel-pointer deregister) but fires on the previous!=Leader → status==Leader edge instead of leaving the leader role. - etcd backend: new leaderAcquiredCbs slice + mutex; fires from refreshStatus on the leader-acquired edge AFTER e.isLeader is published, so a callback that calls engine.State() observes StateLeader. - register / fire helpers extracted (registerLeaderCallback, gatherLeaderCallbacks) so the leader-loss and leader-acquired paths share one slot management implementation. The dupl lint warning that triggered on first draft is the test that keeps this consolidated. - Test coverage: leader_acquired_callback_test.go mirrors leader_loss_callback_test.go — panic containment, empty-list safety, deregister removal, deregister idempotence, nil-fn safety, nil-receiver safety, identical-fn disambiguation. main: SQS leadership-refusal hook - main_sqs_leadership_refusal.go: installSQSLeadershipRefusal + installSQSLeadershipRefusalAcrossGroups + partitionedGroupSet. - On install, if the engine is currently leader of a group hosting a partitioned queue and the binary lacks htfifo, the hook calls TransferLeadership immediately. The leader-acquired observer then keeps catching future transitions for the same group. - TransferLeadership runs in a goroutine because the leader-acquired callback contract is non-blocking — a synchronous admin RPC inside the callback would stall refreshStatus. - sqsLeadershipController is the small interface the helper accepts (subset of raftengine.Admin) so test doubles don't have to satisfy the full Admin surface. - run() wires installSQSLeadershipRefusalAcrossGroups after the coordinator is built; the composite deregister flows through cleanup. adapter: AdvertisesHTFIFO + flag flip - adapter.AdvertisesHTFIFO() reports the htfifo capability flag so main.go can read it without touching the package-private constant. - htfifoCapabilityAdvertised = false → true. Both the routing wiring (PR 4-B-2 #715) and the leadership-refusal hook (this PR) are now in the binary, so the design's "marked htfifo-eligible" bar is met. What's still gated PR 5 lifts the PartitionCount > 1 dormancy gate AND wires PollSQSHTFIFOCapability (PR 4-B-3a #721) into the CreateQueue gate in the same commit. Until PR 5 lands, no partitioned queue can land in production — the leadership-refusal hook is dormant in the happy-path runtime (every binary past this PR advertises htfifo, and the per-group early return keeps the hook out of the hot path). Self-review (per CLAUDE.md) 1. Data loss — control-plane only; no FSM/Pebble/retention path. The hook calls TransferLeadership which is itself an admin action with the same data-loss profile as a graceful manual transfer. No issue. 2. Concurrency / distributed failures — leader-acquired callback contract mirrors leader-loss (non-blocking, panic-contained, sentinel-pointer deregister). refuse() offloads the actual TransferLeadership to a goroutine so refreshStatus stays non-blocking. Multiple goroutines calling refuse() for the same group queue serially in raft's admin channel; the worst case is one redundant transfer attempt, which is idempotent on the raft side. No issue. 3. Performance — leader-acquired callbacks fire only on the transition edge (rare event); no per-request hot path cost. The early return on advertisesHTFIFO=true means production-binary hosts pay zero overhead. No issue. 4. Data consistency — the hook protects against the §8 downgrade scenario: a node rolled back to a pre-htfifo binary that still gets elected leader of a partitioned-queue shard would otherwise read/write under the legacy keyspace and silently corrupt the queue. The hook steps it down via TransferLeadership before any client request lands. No issue. 5. Test coverage — 7 raftengine observer tests (mirror of leader-loss panic / empty / deregister / idempotence / nil guards / sentinel-pointer disambiguation) + 11 main-side helper tests (htfifo no-op, no-partitioned-queue no-op, startup-already-leader refuses, startup-follower waits, per-acquisition fires, deregister propagates, transfer error logged, nil-admin safe, partitionedGroupSet flatten / empty / malformed). --- adapter/sqs.go | 56 +++- internal/raftengine/engine.go | 23 ++ internal/raftengine/etcd/engine.go | 193 +++++++++++--- .../etcd/leader_acquired_callback_test.go | 123 +++++++++ main.go | 14 + main_sqs_leadership_refusal.go | 200 ++++++++++++++ main_sqs_leadership_refusal_test.go | 252 ++++++++++++++++++ 7 files changed, 812 insertions(+), 49 deletions(-) create mode 100644 internal/raftengine/etcd/leader_acquired_callback_test.go create mode 100644 main_sqs_leadership_refusal.go create mode 100644 main_sqs_leadership_refusal_test.go diff --git a/adapter/sqs.go b/adapter/sqs.go index 8543910c..112153bb 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -60,14 +60,40 @@ const ( const sqsCapabilityHTFIFO = "htfifo" // htfifoCapabilityAdvertised gates whether this binary lists -// "htfifo" on /sqs_health. The flag is set to true only when the -// binary contains BOTH the routing-layer wiring AND the -// leadership-refusal safeguard from §8 — the design's "marked -// htfifo-eligible" bar (§11 PR 4). Lower-numbered PRs in the rollout -// keep this false so a partial deploy never advertises a capability -// it cannot safely back up. Phase 3.D PR 4-B flips this to true in -// the same commit that wires routing + leadership-refusal together. -const htfifoCapabilityAdvertised = false +// "htfifo" on /sqs_health. The §11 PR 4 contract requires BOTH +// the routing-layer wiring AND the leadership-refusal safeguard +// from §8 to be in place before this flag is true: +// +// - Routing wiring: kv.PartitionResolver + +// adapter.SQSPartitionResolver, merged via #715 (Phase 3.D +// PR 4-B-2). Partition-resolver-first dispatch in ShardRouter +// routes (queue, partition) keys to the operator-chosen Raft +// group; coordinator helpers (groupForKey, +// routeAndGroupForKey, groupMutations) consult the resolver +// before falling through to the byte-range engine; OCC read +// keys fail closed for recognised-but-unresolved partitioned +// keys. +// - Capability poller: PollSQSHTFIFOCapability, merged via +// #721 (Phase 3.D PR 4-B-3a). PR 5 will use this for the +// CreateQueue capability gate. +// - Leadership-refusal hook: +// raftengine.RegisterLeaderAcquiredCallback + +// main_sqs_leadership_refusal.go (Phase 3.D PR 4-B-3b, this +// PR). On startup AND on every leader-acquired transition, +// the hook refuses leadership of any Raft group hosting a +// partitioned queue when the binary lacks htfifo. +// +// Both pieces are now in the binary, so the flag flips to true. +// PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the +// CreateQueue capability poll in the same commit, at which point +// a partitioned queue can land in production and every node in +// the cluster must report htfifo for the gate to allow it. +// +// Stays a const (not a var) because the flag is build-time. A +// future runtime override (env var, --no-htfifo flag for +// graceful degradation) would reroute through +// adapter.AdvertisesHTFIFO() without changing the call sites. +const htfifoCapabilityAdvertised = true // sqsAdvertisedCapabilities returns the capability list emitted on // /sqs_health (JSON mode). Stable iteration order is significant — @@ -83,6 +109,20 @@ func sqsAdvertisedCapabilities() []string { return caps } +// AdvertisesHTFIFO reports whether this binary's /sqs_health +// endpoint lists the htfifo capability. Mirror of the package- +// internal htfifoCapabilityAdvertised constant, exposed for the +// SQS leadership-refusal hook in main.go that uses this signal +// to decide whether to refuse leadership of any Raft group hosting +// a partitioned FIFO queue. +// +// Stays a function (not an exported constant) so a future runtime +// override (env var, --no-htfifo flag for graceful degradation) +// can be threaded through here without changing the call site. +func AdvertisesHTFIFO() bool { + return htfifoCapabilityAdvertised +} + const ( sqsHealthMaxRequestBodyBytes = 1024 sqsMaxRequestBodyBytes = 1 << 20 diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index e9eedb56..9135b541 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -195,6 +195,29 @@ type Admin interface { RemoveServer(ctx context.Context, id string, prevIndex uint64) (uint64, error) TransferLeadership(ctx context.Context) error TransferLeadershipToServer(ctx context.Context, id string, address string) error + // RegisterLeaderAcquiredCallback registers fn to fire every + // time the local node's Raft state transitions INTO leader + // (initial election, re-election, transfer target completion). + // Callbacks fire on the previous!=Leader → status==Leader edge + // AFTER the engine has published isLeader, so a callback that + // calls engine.State() observes StateLeader. + // + // Use case: per-shard policy hooks that need to audit a + // freshly-acquired leadership ("am I still allowed to be + // leader of this group?"). The SQS HT-FIFO leadership-refusal + // hook (§8 of the split-queue FIFO design) hangs off this to + // TransferLeadership when the binary lacks the htfifo + // capability but a partitioned queue is mapped to this Raft + // group. + // + // Same non-blocking + panic-contained contract as + // LeaseProvider.RegisterLeaderLossCallback. A callback that + // needs to do real work (enumerate the catalog, call + // TransferLeadership) MUST offload to a goroutine. + // + // The returned function deregisters this specific registration + // and is safe to call multiple times. + RegisterLeaderAcquiredCallback(fn func()) (deregister func()) } type Engine interface { diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 678104f8..fe9c36aa 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -339,6 +339,24 @@ type Engine struct { leaderLossCbsMu sync.Mutex leaderLossCbs []leaderLossSlot + // leaderAcquiredCbsMu guards the slice of callbacks invoked when + // the node transitions INTO the leader role. Callbacks fire + // synchronously from refreshStatus on the previous!=Leader → + // status==Leader edge. The MUST-be-non-blocking contract is the + // same as leaderLossCbs — a slow callback would stall every + // other holder of the same engine. See + // RegisterLeaderAcquiredCallback for the full contract. + // + // The acquired-side mirror exists so per-shard policy hooks + // (SQS HT-FIFO leadership-refusal in §8 of the split-queue FIFO + // design) can audit "we just became leader, are we still + // allowed to be?" without polling. Pairing the slot with a + // sentinel pointer mirrors the leader-loss design and lets + // deregister identify THIS registration when the same fn is + // registered multiple times. + leaderAcquiredCbsMu sync.Mutex + leaderAcquiredCbs []leaderAcquiredSlot + pendingProposals map[uint64]proposalRequest pendingReads map[uint64]readRequest pendingConfigs map[uint64]adminRequest @@ -931,67 +949,152 @@ func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func()) { if e == nil || fn == nil { return func() {} } - // Allocate a unique sentinel pointer so the deregister closure can - // identify THIS specific registration even if the same fn is - // registered multiple times. + return registerLeaderCallback(&e.leaderLossCbsMu, &e.leaderLossCbs, fn) +} + +// leaderLossSlot pairs a registered callback with an id-only sentinel +// pointer so deregister can distinguish identical fn values. Aliased +// to leaderCallbackSlot so the leader-loss and leader-acquired slices +// share the same in-memory shape — the register / fire helpers are +// generic over this single slot type. +type leaderLossSlot = leaderCallbackSlot + +// fireLeaderLossCallbacks invokes all registered callbacks +// synchronously. The registered-callback contract requires each fn +// to be non-blocking (a lock-free lease-invalidate flag flip), so +// inline execution is safe and avoids spawning an unbounded number +// of goroutines per leader-loss event when many shards / coordinators +// are registered. +// +// A panicking callback is still contained (see +// invokeLeaderLossCallback) so a bug in one holder cannot break +// subsequent callbacks or crash the process. +func (e *Engine) fireLeaderLossCallbacks() { + for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { + e.invokeLeaderLossCallback(fn) + } +} + +// leaderCallbackSlot is the shared on-disk shape for leader-loss +// and leader-acquired callback registrations. The id pointer is a +// per-registration sentinel so deregister can target THIS specific +// entry even when the same fn is registered multiple times. +type leaderCallbackSlot struct { + id *struct{ fn func() } + fn func() +} + +// registerLeaderCallback installs fn into the (mu, cbs) callback +// slice and returns a deregister closure. Shared by leader-loss +// and leader-acquired registration so the slot-management / +// dangling-reference / sync.Once-deregister logic lives in one +// place. The two callback families differ only in which (mu, slice) +// pair they target — the firing semantics, the sentinel-pointer +// disambiguation, and the GC-safe slice truncation are identical. +func registerLeaderCallback(mu *sync.Mutex, cbs *[]leaderCallbackSlot, fn func()) (deregister func()) { + // Allocate a unique sentinel pointer so the deregister closure + // can identify THIS specific registration even if the same fn + // is registered multiple times. slot := &struct{ fn func() }{fn: fn} - e.leaderLossCbsMu.Lock() - e.leaderLossCbs = append(e.leaderLossCbs, leaderLossSlot{id: slot, fn: fn}) - e.leaderLossCbsMu.Unlock() + mu.Lock() + *cbs = append(*cbs, leaderCallbackSlot{id: slot, fn: fn}) + mu.Unlock() var once sync.Once return func() { once.Do(func() { - e.leaderLossCbsMu.Lock() - defer e.leaderLossCbsMu.Unlock() - for i, c := range e.leaderLossCbs { + mu.Lock() + defer mu.Unlock() + for i, c := range *cbs { if c.id != slot { continue } - // Remove without leaving a dangling reference at the - // tail of the underlying array. The removed slot's fn - // typically captures a *Coordinate; a plain - // `append(cbs[:i], cbs[i+1:]...)` would keep the old - // backing cell alive and prevent GC of the associated - // Coordinate until the engine itself is dropped. - last := len(e.leaderLossCbs) - 1 - copy(e.leaderLossCbs[i:], e.leaderLossCbs[i+1:]) - e.leaderLossCbs[last] = leaderLossSlot{} - e.leaderLossCbs = e.leaderLossCbs[:last] + // Remove without leaving a dangling reference at + // the tail of the underlying array. The removed + // slot's fn typically captures a *Coordinate; a + // plain `append(cbs[:i], cbs[i+1:]...)` would keep + // the old backing cell alive and prevent GC of the + // associated Coordinate until the engine itself is + // dropped. + last := len(*cbs) - 1 + copy((*cbs)[i:], (*cbs)[i+1:]) + (*cbs)[last] = leaderCallbackSlot{} + *cbs = (*cbs)[:last] return } }) } } -// leaderLossSlot pairs a registered callback with an id-only sentinel -// pointer so deregister can distinguish identical fn values. -type leaderLossSlot struct { - id *struct{ fn func() } - fn func() +// gatherLeaderCallbacks copies the live fn list out from under the +// mutex so callers can fire them without holding the lock. +// Mirrors the snapshot-then-fire pattern used by the per-callback +// invoke helpers. +func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { + mu.Lock() + out := make([]func(), len(cbs)) + for i, c := range cbs { + out[i] = c.fn + } + mu.Unlock() + return out } -// fireLeaderLossCallbacks invokes all registered callbacks -// synchronously. The registered-callback contract requires each fn -// to be non-blocking (a lock-free lease-invalidate flag flip), so -// inline execution is safe and avoids spawning an unbounded number -// of goroutines per leader-loss event when many shards / coordinators -// are registered. +// RegisterLeaderAcquiredCallback registers fn to fire every time +// the local node's Raft state transitions INTO leader (initial +// election win, re-election after partition heal, leadership +// transfer target completion). Callbacks fire on the +// previous!=Leader → status==Leader edge in refreshStatus, after +// e.isLeader has been published, so a callback that reads +// engine.State() observes StateLeader. // -// A panicking callback is still contained (see -// invokeLeaderLossCallback) so a bug in one holder cannot break -// subsequent callbacks or crash the process. -func (e *Engine) fireLeaderLossCallbacks() { - e.leaderLossCbsMu.Lock() - cbs := make([]func(), len(e.leaderLossCbs)) - for i, c := range e.leaderLossCbs { - cbs[i] = c.fn +// Use case: per-shard policy that needs to audit a freshly-acquired +// leadership ("am I still allowed to be leader of this group?"). +// SQS HT-FIFO leadership-refusal (§8 of the split-queue FIFO +// design) hangs off this hook to TransferLeadership when the +// binary lacks the htfifo capability but a partitioned queue is +// mapped to this Raft group. +// +// Callbacks run synchronously from refreshStatus and MUST be +// non-blocking — same contract as RegisterLeaderLossCallback. A +// callback wanting to do real work (e.g. enumerate the catalog, +// call TransferLeadership) MUST offload to a goroutine. +// +// A panic inside a callback is contained and logged so a bug in +// one holder cannot crash the engine or break other callbacks. +// +// The returned deregister function removes this specific +// registration and is safe to call multiple times. +func (e *Engine) RegisterLeaderAcquiredCallback(fn func()) (deregister func()) { + if e == nil || fn == nil { + return func() {} } - e.leaderLossCbsMu.Unlock() - for _, fn := range cbs { - e.invokeLeaderLossCallback(fn) + return registerLeaderCallback(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs, fn) +} + +// leaderAcquiredSlot is the leader-acquired companion to +// leaderLossSlot — both alias the shared leaderCallbackSlot so a +// single set of register / fire helpers serves both transitions. +type leaderAcquiredSlot = leaderCallbackSlot + +// fireLeaderAcquiredCallbacks invokes all registered callbacks +// synchronously. Same panic-containment + non-blocking contract +// as fireLeaderLossCallbacks. +func (e *Engine) fireLeaderAcquiredCallbacks() { + for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { + e.invokeLeaderAcquiredCallback(fn) } } +func (e *Engine) invokeLeaderAcquiredCallback(fn func()) { + defer func() { + if r := recover(); r != nil { + slog.Error("etcd raft engine: leader-acquired callback panic", + "recover", r) + } + }() + fn() +} + func (e *Engine) invokeLeaderLossCallback(fn func()) { defer func() { if r := recover(); r != nil { @@ -2512,6 +2615,14 @@ func (e *Engine) refreshStatus() { if status.State == raftengine.StateLeader { e.leaderOnce.Do(func() { close(e.leaderReady) }) } + if previous != raftengine.StateLeader && status.State == raftengine.StateLeader { + // Edge: the node has just acquired leadership. Fire the + // leader-acquired callbacks so per-shard policy hooks + // (SQS HT-FIFO leadership-refusal §8) can audit the + // transition before any client request lands. Same + // non-blocking contract as fireLeaderLossCallbacks. + e.fireLeaderAcquiredCallbacks() + } if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { e.failPending(errors.WithStack(errNotLeader)) // Drop the per-peer ack map so a future re-election cannot diff --git a/internal/raftengine/etcd/leader_acquired_callback_test.go b/internal/raftengine/etcd/leader_acquired_callback_test.go new file mode 100644 index 00000000..65131a75 --- /dev/null +++ b/internal/raftengine/etcd/leader_acquired_callback_test.go @@ -0,0 +1,123 @@ +package etcd + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestFireLeaderAcquiredCallbacks_ContainsPanic mirrors the +// leader-loss panic-containment test. A panicking callback MUST +// NOT take down the engine loop or block subsequent callbacks. +func TestFireLeaderAcquiredCallbacks_ContainsPanic(t *testing.T) { + t.Parallel() + + e := &Engine{} + var before, after atomic.Int32 + e.RegisterLeaderAcquiredCallback(func() { before.Add(1) }) + e.RegisterLeaderAcquiredCallback(func() { panic("policy hook bug") }) + e.RegisterLeaderAcquiredCallback(func() { after.Add(1) }) + + require.NotPanics(t, e.fireLeaderAcquiredCallbacks) + + require.Equal(t, int32(1), before.Load(), + "callbacks registered before the panicking one must have fired") + require.Equal(t, int32(1), after.Load(), + "callbacks registered after the panicking one must still fire") +} + +// TestFireLeaderAcquiredCallbacks_NoCallbacksIsSafe pins the +// empty-list fast path so refreshStatus can fire unconditionally +// without a guard. +func TestFireLeaderAcquiredCallbacks_NoCallbacksIsSafe(t *testing.T) { + t.Parallel() + e := &Engine{} + require.NotPanics(t, e.fireLeaderAcquiredCallbacks) +} + +// TestRegisterLeaderAcquiredCallback_DeregisterRemoves pins the +// returned deregister function: calling it removes the +// registration so subsequent fires no longer invoke fn. Mirrors +// the leader-loss deregister contract. +func TestRegisterLeaderAcquiredCallback_DeregisterRemoves(t *testing.T) { + t.Parallel() + + e := &Engine{} + var fired atomic.Int32 + deregister := e.RegisterLeaderAcquiredCallback(func() { fired.Add(1) }) + + e.fireLeaderAcquiredCallbacks() + require.Equal(t, int32(1), fired.Load(), + "first fire must invoke the registered callback") + + deregister() + + e.fireLeaderAcquiredCallbacks() + require.Equal(t, int32(1), fired.Load(), + "after deregister, fire must not invoke the callback again") +} + +// TestRegisterLeaderAcquiredCallback_DeregisterIdempotent pins +// that calling the returned deregister multiple times is safe and +// does not affect other registrations. +func TestRegisterLeaderAcquiredCallback_DeregisterIdempotent(t *testing.T) { + t.Parallel() + + e := &Engine{} + var fired atomic.Int32 + dereg1 := e.RegisterLeaderAcquiredCallback(func() { fired.Add(1) }) + e.RegisterLeaderAcquiredCallback(func() { fired.Add(10) }) + + dereg1() + dereg1() // second call must be a no-op + dereg1() // third call must be a no-op + + e.fireLeaderAcquiredCallbacks() + require.Equal(t, int32(10), fired.Load(), + "only the second callback survives — the first's deregister "+ + "must have removed exactly one entry, not all of them, "+ + "even when deregister is called repeatedly") +} + +// TestRegisterLeaderAcquiredCallback_NilFnIsSafe pins that +// passing nil for fn does not register anything and the returned +// deregister is a no-op. +func TestRegisterLeaderAcquiredCallback_NilFnIsSafe(t *testing.T) { + t.Parallel() + e := &Engine{} + dereg := e.RegisterLeaderAcquiredCallback(nil) + require.NotPanics(t, dereg) + require.NotPanics(t, e.fireLeaderAcquiredCallbacks) +} + +// TestRegisterLeaderAcquiredCallback_NilEngineIsSafe pins the +// typed-nil receiver guard so a coordinator constructed before +// the engine is wired does not crash on registration. +func TestRegisterLeaderAcquiredCallback_NilEngineIsSafe(t *testing.T) { + t.Parallel() + var e *Engine + dereg := e.RegisterLeaderAcquiredCallback(func() {}) + require.NotPanics(t, dereg) +} + +// TestRegisterLeaderAcquiredCallback_DistinguishesIdenticalFns +// pins that two registrations of the SAME function are treated as +// distinct slots — deregistering one leaves the other live. The +// sentinel-pointer design exists for exactly this case. +func TestRegisterLeaderAcquiredCallback_DistinguishesIdenticalFns(t *testing.T) { + t.Parallel() + + e := &Engine{} + var fired atomic.Int32 + fn := func() { fired.Add(1) } + dereg1 := e.RegisterLeaderAcquiredCallback(fn) + e.RegisterLeaderAcquiredCallback(fn) + + dereg1() + + e.fireLeaderAcquiredCallbacks() + require.Equal(t, int32(1), fired.Load(), + "deregistering one of two identical-fn registrations must "+ + "leave the other active — sentinel pointer disambiguates") +} diff --git a/main.go b/main.go index bd201ef9..da884fe1 100644 --- a/main.go +++ b/main.go @@ -320,6 +320,20 @@ func run() error { WithLeaseReadObserver(metricsRegistry.LeaseReadObserver()). WithSampler(keyVizSamplerForCoordinator(sampler)). WithPartitionResolver(buildSQSPartitionResolver(cfg.sqsFifoPartitionMap)) + + // SQS HT-FIFO §8 leadership-refusal: install per-group + // observers that step the local node down via + // TransferLeadership when it acquires (or already holds) + // leadership of a Raft group hosting a partitioned FIFO + // queue while the binary lacks the htfifo capability. The + // composite deregister flows through cleanup; it's a no-op + // when no group hosts a partitioned queue or when the + // binary advertises htfifo (the steady-state production + // case post-PR-4-B-3b). + leadershipRefusalDeregister := installSQSLeadershipRefusalAcrossGroups( + ctx, runtimes, cfg.sqsFifoPartitionMap, + sqsAdvertisesHTFIFO(), slog.Default()) + cleanup.Add(leadershipRefusalDeregister) distCatalog, err := setupDistributionCatalog(ctx, runtimes, cfg.engine) if err != nil { return err diff --git a/main_sqs_leadership_refusal.go b/main_sqs_leadership_refusal.go new file mode 100644 index 00000000..90d0a456 --- /dev/null +++ b/main_sqs_leadership_refusal.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "log/slog" + "strconv" + + "github.com/bootjp/elastickv/adapter" + "github.com/bootjp/elastickv/internal/raftengine" +) + +// sqsLeadershipController is the subset of raftengine.Admin the +// SQS leadership-refusal hook needs. Defined as a small interface +// (rather than taking raftengine.Admin directly) so the test +// double doesn't have to satisfy the full Admin surface. +type sqsLeadershipController interface { + State() raftengine.State + TransferLeadership(ctx context.Context) error + RegisterLeaderAcquiredCallback(fn func()) (deregister func()) +} + +// installSQSLeadershipRefusal registers a per-group leader-acquired +// observer that refuses leadership of any Raft group hosting a +// partitioned FIFO queue when this binary does NOT advertise the +// htfifo capability. Implements §8 of +// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. +// +// # What it protects against +// +// A node rolled BACK to a binary that lacks the HT-FIFO data +// plane but is still elected leader of a Raft group hosting a +// partitioned queue would (1) scan the legacy single-prefix +// keyspace and report no messages (false-empty reads), and (2) +// accept SendMessage writes under the legacy keyspace, hiding +// them from the partition-aware fanout reader. The §11 PR 2 +// "PartitionCount > 1 rejected" gate prevents NEW partitioned +// queues from being created in such a cluster, but does nothing +// for queues created BEFORE the rollback. The leadership-refusal +// hook closes that gap by stepping the affected node down via +// TransferLeadership; the cluster picks a peer that still +// advertises htfifo, and the partitioned queue stays correct. +// +// When it does NOT fire +// +// - The binary advertises htfifo (advertisesHTFIFO=true). The +// happy-path runtime — every binary past PR 4-B-3b advertises. +// - No partitioned queue maps to gid (partitionedGroups[gid] is +// false). Groups that only host non-partitioned data have no +// partitioned-keyspace contract to break. +// +// Both no-op cases return a no-op deregister so callers can defer +// uniformly. +// +// Lifecycle +// +// - Startup check: if engine is currently leader at install time, +// refuse() runs immediately. Otherwise the hook waits for the +// next leader-acquired transition. +// - Per-acquisition: RegisterLeaderAcquiredCallback fires on +// every previous!=Leader → status==Leader edge. +// +// # Concurrency +// +// refuse() offloads TransferLeadership to a goroutine because the +// leader-acquired callback contract is non-blocking — a synchronous +// admin RPC inside the callback would stall refreshStatus. The +// goroutine uses ctx so a coordinator shutdown cancels any +// in-flight transfer. +func installSQSLeadershipRefusal( + ctx context.Context, + admin sqsLeadershipController, + gid uint64, + partitionedGroups map[uint64]bool, + advertisesHTFIFO bool, + logger *slog.Logger, +) func() { + if admin == nil || !partitionedGroups[gid] || advertisesHTFIFO { + return func() {} + } + if logger == nil { + logger = slog.Default() + } + refuse := func() { + logger.Warn("sqs: refusing leadership — partitioned queue requires htfifo capability", + "group", gid) + // Non-blocking by contract — TransferLeadership submits + // an admin request that may block on the raft loop. A + // nested goroutine is the documented pattern for the + // callback contract. + go func() { + if err := admin.TransferLeadership(ctx); err != nil { + logger.Warn("sqs: TransferLeadership failed", + "group", gid, "err", err) + } + }() + } + if admin.State() == raftengine.StateLeader { + // Startup: this node is already leader. Refuse now so + // the cluster picks an htfifo-capable peer immediately + // rather than waiting for a future re-election. + refuse() + } + return admin.RegisterLeaderAcquiredCallback(refuse) +} + +// partitionedGroupSet flattens the operator's --sqsFifoPartitionMap +// into a set of group IDs that have at least one partitioned FIFO +// queue. The leadership-refusal hook consults this set per group +// to decide whether the policy check applies. +// +// parseSQSFifoGroupList canonicalises group references as uint64 +// strings at config-load time, so the ParseUint call here cannot +// fail in production. A malformed entry is logged-and-skipped +// rather than panicked; the affected group simply won't get the +// refusal hook (the broader config validation in +// validateSQSFifoPartitionMap would have already rejected an +// unknown group, so reaching this branch implies a programmer +// bypassed validation — which is a test-only concern). +func partitionedGroupSet(partitionMap map[string]sqsFifoQueueRouting, logger *slog.Logger) map[uint64]bool { + if len(partitionMap) == 0 { + return nil + } + if logger == nil { + logger = slog.Default() + } + out := make(map[uint64]bool) + for queue, routing := range partitionMap { + for _, groupRef := range routing.groups { + id, err := strconv.ParseUint(groupRef, 10, 64) + if err != nil { + logger.Warn("sqs: leadership-refusal: skipping non-uint64 group reference (config validation bypass?)", + "queue", queue, "group_ref", groupRef, "err", err) + continue + } + out[id] = true + } + } + return out +} + +// sqsAdvertisesHTFIFO reports whether this binary's /sqs_health +// endpoint lists the htfifo capability. Wraps the package-internal +// adapter constant so main.go's leadership-refusal install site +// reads the canonical source of truth without exporting the +// constant itself. +func sqsAdvertisesHTFIFO() bool { + return adapter.AdvertisesHTFIFO() +} + +// installSQSLeadershipRefusalAcrossGroups iterates every shard +// runtime and installs the SQS leadership-refusal hook for any +// group hosting a partitioned FIFO queue. Returns a composite +// deregister that fires every per-group deregister so the +// caller can defer it on shutdown. +// +// The hook is a no-op for groups that don't host a partitioned +// queue, and for binaries that advertise htfifo. The composite +// deregister is therefore safe to defer unconditionally — if +// there is nothing to refuse, every per-group deregister is a +// no-op and the outer wrapper returns immediately. +func installSQSLeadershipRefusalAcrossGroups( + ctx context.Context, + runtimes []*raftGroupRuntime, + partitionMap map[string]sqsFifoQueueRouting, + advertisesHTFIFO bool, + logger *slog.Logger, +) func() { + partGroups := partitionedGroupSet(partitionMap, logger) + if len(partGroups) == 0 { + return func() {} + } + deregisters := make([]func(), 0, len(runtimes)) + for _, rt := range runtimes { + if rt == nil || rt.engine == nil { + continue + } + admin, ok := rt.engine.(sqsLeadershipController) + if !ok { + // Engine implementation lacks Admin surface — log and + // skip rather than refusing to start. This branch is + // hit only by test doubles or future engines without + // the leader-acquired observer; the etcd engine + // satisfies the interface by construction. + if logger != nil { + logger.Warn("sqs: skipping leadership-refusal install for group "+ + "— engine does not implement leader-acquired observer", + "group", rt.spec.id) + } + continue + } + dereg := installSQSLeadershipRefusal( + ctx, admin, rt.spec.id, partGroups, advertisesHTFIFO, logger) + deregisters = append(deregisters, dereg) + } + return func() { + for _, d := range deregisters { + d() + } + } +} diff --git a/main_sqs_leadership_refusal_test.go b/main_sqs_leadership_refusal_test.go new file mode 100644 index 00000000..15692e38 --- /dev/null +++ b/main_sqs_leadership_refusal_test.go @@ -0,0 +1,252 @@ +package main + +import ( + "context" + "errors" + "log/slog" + "sync/atomic" + "testing" + "time" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/stretchr/testify/require" +) + +// fakeLeadershipController is a sqsLeadershipController test +// double. It records TransferLeadership invocations and exposes +// the registered leader-acquired callback so tests can fire it +// manually (the real engine fires it from refreshStatus on a +// state transition; tests don't need a real raft loop). +type fakeLeadershipController struct { + state raftengine.State + transferCalls atomic.Int32 + transferErr error + registeredCb func() + deregisterCalls atomic.Int32 + registerCalls atomic.Int32 + transferRecvCancel chan struct{} +} + +func (f *fakeLeadershipController) State() raftengine.State { + return f.state +} + +func (f *fakeLeadershipController) TransferLeadership(_ context.Context) error { + f.transferCalls.Add(1) + if f.transferRecvCancel != nil { + close(f.transferRecvCancel) + } + return f.transferErr +} + +func (f *fakeLeadershipController) RegisterLeaderAcquiredCallback(fn func()) func() { + f.registerCalls.Add(1) + f.registeredCb = fn + return func() { f.deregisterCalls.Add(1) } +} + +// awaitTransferCalls waits up to 1s for at least n TransferLeadership +// calls to land. Needed because refuse() offloads to a goroutine — +// a synchronous assertion would race the goroutine. +func (f *fakeLeadershipController) awaitTransferCalls(t *testing.T, n int32) { + t.Helper() + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if f.transferCalls.Load() >= n { + return + } + time.Sleep(5 * time.Millisecond) + } + require.GreaterOrEqual(t, f.transferCalls.Load(), n, + "expected at least %d TransferLeadership call(s)", n) +} + +// TestInstallSQSLeadershipRefusal_HTFIFOCapableNoOp pins the +// happy-path early return: a binary that ADVERTISES htfifo never +// needs to refuse leadership. The hook must NOT register a +// callback, and the returned deregister must be a safe no-op. +func TestInstallSQSLeadershipRefusal_HTFIFOCapableNoOp(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateLeader} + dereg := installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, // partitioned queue on this group + true, // binary HAS htfifo + slog.Default(), + ) + require.Zero(t, admin.transferCalls.Load(), + "htfifo-capable binary must not refuse leadership at startup") + require.Zero(t, admin.registerCalls.Load(), + "htfifo-capable binary must not register a leader-acquired callback") + require.NotPanics(t, dereg) +} + +// TestInstallSQSLeadershipRefusal_NoPartitionedQueueNoOp pins the +// other early-return: a group with NO partitioned queues mapped +// to it doesn't need the policy hook either, even when the binary +// lacks htfifo. +func TestInstallSQSLeadershipRefusal_NoPartitionedQueueNoOp(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateLeader} + dereg := installSQSLeadershipRefusal( + context.Background(), admin, 99, + map[uint64]bool{7: true, 8: true}, // group 99 NOT in set + false, // binary lacks htfifo + slog.Default(), + ) + require.Zero(t, admin.transferCalls.Load(), + "group with no partitioned queue mapping must not be refused") + require.Zero(t, admin.registerCalls.Load()) + require.NotPanics(t, dereg) +} + +// TestInstallSQSLeadershipRefusal_StartupAlreadyLeaderRefuses pins +// the startup branch: install at a moment when the engine is +// already StateLeader — refuse() must fire immediately so the +// cluster steps the unsafe leader down without waiting for a +// future re-election. +func TestInstallSQSLeadershipRefusal_StartupAlreadyLeaderRefuses(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateLeader} + _ = installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, // binary lacks htfifo + slog.Default(), + ) + admin.awaitTransferCalls(t, 1) + require.Equal(t, int32(1), admin.registerCalls.Load(), + "the per-acquisition observer must also be registered "+ + "so future re-elections trigger the same refusal") +} + +// TestInstallSQSLeadershipRefusal_StartupFollowerWaits pins the +// startup-follower branch: install at a moment when the engine is +// NOT leader — refuse() must NOT fire yet. The callback must be +// registered so a future leader-acquisition triggers refusal. +func TestInstallSQSLeadershipRefusal_StartupFollowerWaits(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateFollower} + _ = installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + require.Zero(t, admin.transferCalls.Load(), + "follower must not be refused at install time") + require.Equal(t, int32(1), admin.registerCalls.Load(), + "per-acquisition observer must still be registered") +} + +// TestInstallSQSLeadershipRefusal_AcquisitionTriggersRefuse pins +// the per-acquisition path: a node that becomes leader AFTER +// install must be refused via the leader-acquired callback. +func TestInstallSQSLeadershipRefusal_AcquisitionTriggersRefuse(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateFollower} + _ = installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + require.NotNil(t, admin.registeredCb, + "callback must be registered for the per-acquisition path") + + // Simulate refreshStatus firing the observer after the node + // became leader. + admin.registeredCb() + admin.awaitTransferCalls(t, 1) +} + +// TestInstallSQSLeadershipRefusal_DeregisterPropagates pins that +// the returned deregister flows through to the engine's +// deregister hook. Coordinators with shorter lifetimes than the +// engine MUST call this to avoid accumulating dead callbacks. +func TestInstallSQSLeadershipRefusal_DeregisterPropagates(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateFollower} + dereg := installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + dereg() + require.Equal(t, int32(1), admin.deregisterCalls.Load()) +} + +// TestInstallSQSLeadershipRefusal_TransferErrorLogged pins the +// error path: TransferLeadership returning an error must NOT +// crash anything; refuse() logs and moves on. The callback can +// fire again at the next leader-acquired event. +func TestInstallSQSLeadershipRefusal_TransferErrorLogged(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{ + state: raftengine.StateLeader, + transferErr: errors.New("simulated transfer failure"), + } + require.NotPanics(t, func() { + _ = installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + }) + admin.awaitTransferCalls(t, 1) +} + +// TestInstallSQSLeadershipRefusal_NilAdminIsSafe pins the typed- +// nil guard: a missing controller must not crash. Returns a +// no-op deregister so callers can defer uniformly. +func TestInstallSQSLeadershipRefusal_NilAdminIsSafe(t *testing.T) { + t.Parallel() + dereg := installSQSLeadershipRefusal( + context.Background(), nil, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + require.NotPanics(t, dereg) +} + +// TestPartitionedGroupSet_FlattensRouting pins that +// partitionedGroupSet collapses --sqsFifoPartitionMap into the +// {gid → bool} set the leadership-refusal hook consumes. +func TestPartitionedGroupSet_FlattensRouting(t *testing.T) { + t.Parallel() + in := map[string]sqsFifoQueueRouting{ + "orders.fifo": {partitionCount: 4, groups: []string{"10", "11", "12", "13"}}, + "events.fifo": {partitionCount: 2, groups: []string{"20", "21"}}, + } + got := partitionedGroupSet(in, slog.Default()) + require.Equal(t, map[uint64]bool{ + 10: true, 11: true, 12: true, 13: true, + 20: true, 21: true, + }, got) +} + +// TestPartitionedGroupSet_EmptyReturnsNil pins the empty-input +// fast path. An operator running a non-partitioned cluster should +// not pay for an empty map allocation. +func TestPartitionedGroupSet_EmptyReturnsNil(t *testing.T) { + t.Parallel() + require.Nil(t, partitionedGroupSet(nil, slog.Default())) + require.Nil(t, partitionedGroupSet(map[string]sqsFifoQueueRouting{}, slog.Default())) +} + +// TestPartitionedGroupSet_SkipsMalformedGroupRef pins the +// defensive log-and-skip branch: a group reference that escaped +// canonicalisation (test seeding the map directly) is logged but +// does not panic. The valid groups still end up in the set. +func TestPartitionedGroupSet_SkipsMalformedGroupRef(t *testing.T) { + t.Parallel() + in := map[string]sqsFifoQueueRouting{ + "q.fifo": {partitionCount: 2, groups: []string{"42", "not-a-uint64"}}, + } + got := partitionedGroupSet(in, slog.Default()) + require.Equal(t, map[uint64]bool{42: true}, got, + "malformed group ref is skipped; valid one survives") +} From 634fbcf9548fbf43c33982a83bbba0ea0577959e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 23:46:14 +0900 Subject: [PATCH 2/2] fix(sqs): leader-callback race + panic log + TOCTOU + branch coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #723 round 1 review caught four items. This commit lands all four (one is a real concurrency bug I introduced when refactoring the leader-callback duplication). 1) P1 (Codex) / HIGH (Gemini) — data race in gatherLeaderCallbacks My round 1 refactor extracted gatherLeaderCallbacks(mu, cbs []leaderCallbackSlot). Passing the slice by value means the slice header (pointer, length, capacity) is dereferenced at the call site — i.e. BEFORE mu.Lock() runs inside the helper. Concurrent registerLeaderCallback running on another goroutine could mutate the header while the caller is reading it, triggering -race detector failures or undefined behaviour. Fix: gatherLeaderCallbacks now takes *[]leaderCallbackSlot (pointer to slice). The slice is dereferenced INSIDE the locked section, closing the race. Both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks updated to pass &e.leader*Cbs. This regresses to the pre-refactor "lock-then-copy" semantics the original RegisterLeaderLossCallback had. The lesson: passing a slice by value to a "thread-safe" helper does not give you mutex safety, because the function-argument evaluation happens before the function body locks anything. 2) MEDIUM (Gemini) / Claude finding 1 — invokeLeaderAcquiredCallback log fields Mirror invokeLeaderLossCallback's slog.Error fields: slog.String("node_id", e.localID) slog.Uint64("raft_node_id", e.nodeID) slog.Any("panic", r) slog.String("stack", string(debug.Stack())) Without these, an SQS leadership-refusal hook panicking in production would leave operators with only the recovered value to grep on. Same fields the leader-loss path has, so cross- family triage is consistent. 3) Claude finding 2 — TOCTOU window in startup leader check installSQSLeadershipRefusal previously did: if admin.State() == StateLeader { refuse() } return admin.RegisterLeaderAcquiredCallback(refuse) Window: the engine could win an election between State() and RegisterLeaderAcquiredCallback returning. refreshStatus would fire fireLeaderAcquiredCallbacks before refuse is in the slice, and the hook would miss that acquisition. Fix: post-registration State() re-check. If the install-time read returned follower but the post-registration read returns leader, refuse() fires for the in-flight election. refuse() is idempotent (TransferLeadership becomes a no-op once a transfer is already in flight), so a double-invocation across the boundary is safe. 4) Claude finding 3 — installSQSLeadershipRefusalAcrossGroups branches Three new tests: - TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters: multi-group install + composite deregister fan-out. - TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp: empty partition map → no per-group install runs. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes: nil-runtime / nil-engine guard in the iterator. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver: type-assertion-miss log path (engine that doesn't satisfy sqsLeadershipController). Plus TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered which pins fix #3 directly: a fake controller whose State() flips during RegisterLeaderAcquiredCallback exercises the post-registration re-check. 5) Audit per the lessons-learned discipline The semantic change here is the gatherLeaderCallbacks signature. grep -rn confirmed only fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks call it — both are local to this file, both updated together. No external callers to audit. --- internal/raftengine/etcd/engine.go | 33 +++-- main_sqs_leadership_refusal.go | 27 +++- main_sqs_leadership_refusal_test.go | 199 ++++++++++++++++++++++++++-- 3 files changed, 236 insertions(+), 23 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index fe9c36aa..e4a8d760 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -970,7 +970,7 @@ type leaderLossSlot = leaderCallbackSlot // invokeLeaderLossCallback) so a bug in one holder cannot break // subsequent callbacks or crash the process. func (e *Engine) fireLeaderLossCallbacks() { - for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { + for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) { e.invokeLeaderLossCallback(fn) } } @@ -1029,13 +1029,20 @@ func registerLeaderCallback(mu *sync.Mutex, cbs *[]leaderCallbackSlot, fn func() // mutex so callers can fire them without holding the lock. // Mirrors the snapshot-then-fire pattern used by the per-callback // invoke helpers. -func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { +// +// Takes a *pointer* to the slice (not the slice itself) so the +// header (pointer, length, capacity) is read INSIDE the locked +// section. Passing the slice by value would dereference the +// header at the call site — i.e. before mu.Lock() — racing with +// any concurrent registerLeaderCallback. The pointer parameter +// closes that race (codex P1 / gemini high on PR #723). +func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() { mu.Lock() - out := make([]func(), len(cbs)) - for i, c := range cbs { + defer mu.Unlock() + out := make([]func(), len(*cbs)) + for i, c := range *cbs { out[i] = c.fn } - mu.Unlock() return out } @@ -1080,7 +1087,7 @@ type leaderAcquiredSlot = leaderCallbackSlot // synchronously. Same panic-containment + non-blocking contract // as fireLeaderLossCallbacks. func (e *Engine) fireLeaderAcquiredCallbacks() { - for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { + for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) { e.invokeLeaderAcquiredCallback(fn) } } @@ -1088,8 +1095,18 @@ func (e *Engine) fireLeaderAcquiredCallbacks() { func (e *Engine) invokeLeaderAcquiredCallback(fn func()) { defer func() { if r := recover(); r != nil { - slog.Error("etcd raft engine: leader-acquired callback panic", - "recover", r) + // Mirror the leader-loss panic log shape so operator + // triage of either family produces the same fields. + // Without node identity and the stack, an SQS + // leadership-refusal hook panicking in production + // would leave only the recovered value to grep on + // (gemini medium / claude finding 1 on PR #723). + slog.Error("etcd raft engine: leader-acquired callback panicked", + slog.String("node_id", e.localID), + slog.Uint64("raft_node_id", e.nodeID), + slog.Any("panic", r), + slog.String("stack", string(debug.Stack())), + ) } }() fn() diff --git a/main_sqs_leadership_refusal.go b/main_sqs_leadership_refusal.go index 90d0a456..aa6bfeec 100644 --- a/main_sqs_leadership_refusal.go +++ b/main_sqs_leadership_refusal.go @@ -94,13 +94,30 @@ func installSQSLeadershipRefusal( } }() } - if admin.State() == raftengine.StateLeader { - // Startup: this node is already leader. Refuse now so - // the cluster picks an htfifo-capable peer immediately - // rather than waiting for a future re-election. + // TOCTOU safety: read State() BEFORE registering the + // observer, fire refuse() if already leader, then register, + // then re-check State() once more. Without the second check + // the engine could win a Raft election in the narrow window + // between the first State() read and RegisterLeaderAcquiredCallback + // returning — refreshStatus would fire fireLeaderAcquiredCallbacks + // before refuse is in the slice, the hook would miss that + // acquisition, and the node would stay leader of a + // partitioned-queue group until the next election (claude + // finding 2 on PR #723). The second check after registration + // closes the window: refuse() is idempotent (TransferLeadership + // is a no-op once a transfer is already in flight), so a + // double-invocation across the boundary is safe. + wasLeaderBefore := admin.State() == raftengine.StateLeader + if wasLeaderBefore { refuse() } - return admin.RegisterLeaderAcquiredCallback(refuse) + deregister := admin.RegisterLeaderAcquiredCallback(refuse) + if !wasLeaderBefore && admin.State() == raftengine.StateLeader { + // Election landed during the registration window. + // refuse() the post-registration state too. + refuse() + } + return deregister } // partitionedGroupSet flattens the operator's --sqsFifoPartitionMap diff --git a/main_sqs_leadership_refusal_test.go b/main_sqs_leadership_refusal_test.go index 15692e38..0310ebc0 100644 --- a/main_sqs_leadership_refusal_test.go +++ b/main_sqs_leadership_refusal_test.go @@ -12,13 +12,40 @@ import ( "github.com/stretchr/testify/require" ) +// raftengine.Engine surface bolted onto fakeLeadershipController +// so it can stand in for runtime.engine in +// installSQSLeadershipRefusalAcrossGroups tests. The methods +// return zero values — these tests only exercise the +// leader-refusal branches. +func (f *fakeLeadershipController) Propose(_ context.Context, _ []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (f *fakeLeadershipController) Leader() raftengine.LeaderInfo { return raftengine.LeaderInfo{} } +func (f *fakeLeadershipController) VerifyLeader(_ context.Context) error { return nil } +func (f *fakeLeadershipController) LinearizableRead(_ context.Context) (uint64, error) { + return 0, nil +} +func (f *fakeLeadershipController) Status() raftengine.Status { return raftengine.Status{} } +func (f *fakeLeadershipController) Configuration(_ context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (f *fakeLeadershipController) Close() error { return nil } + // fakeLeadershipController is a sqsLeadershipController test // double. It records TransferLeadership invocations and exposes // the registered leader-acquired callback so tests can fire it // manually (the real engine fires it from refreshStatus on a // state transition; tests don't need a real raft loop). +// +// stateAfterRegister, when non-zero, is published as the next +// State() answer immediately after RegisterLeaderAcquiredCallback +// runs. This simulates the TOCTOU window the helper's +// post-registration State() re-check protects against — between +// the install-time State() read and the registration completing, +// the engine wins an election. type fakeLeadershipController struct { state raftengine.State + stateAfterRegister raftengine.State transferCalls atomic.Int32 transferErr error registeredCb func() @@ -42,23 +69,34 @@ func (f *fakeLeadershipController) TransferLeadership(_ context.Context) error { func (f *fakeLeadershipController) RegisterLeaderAcquiredCallback(fn func()) func() { f.registerCalls.Add(1) f.registeredCb = fn + if f.stateAfterRegister != "" { + // Simulate an election landing during the registration + // window — the next State() the helper reads will return + // StateLeader even though the install-time read returned + // follower. + f.state = f.stateAfterRegister + } return func() { f.deregisterCalls.Add(1) } } -// awaitTransferCalls waits up to 1s for at least n TransferLeadership -// calls to land. Needed because refuse() offloads to a goroutine — -// a synchronous assertion would race the goroutine. -func (f *fakeLeadershipController) awaitTransferCalls(t *testing.T, n int32) { +// awaitTransferCallsAtLeastOne waits up to 1s for at least one +// TransferLeadership call to land. Needed because refuse() +// offloads to a goroutine — a synchronous assertion would race +// the goroutine. The TOCTOU test exercises the path where two +// refuse() calls fire (one before registration, one after); even +// in that case the first transfer landing is sufficient +// observation that refuse() ran. +func (f *fakeLeadershipController) awaitTransferCallsAtLeastOne(t *testing.T) { t.Helper() deadline := time.Now().Add(time.Second) for time.Now().Before(deadline) { - if f.transferCalls.Load() >= n { + if f.transferCalls.Load() >= 1 { return } time.Sleep(5 * time.Millisecond) } - require.GreaterOrEqual(t, f.transferCalls.Load(), n, - "expected at least %d TransferLeadership call(s)", n) + require.GreaterOrEqual(t, f.transferCalls.Load(), int32(1), + "expected at least one TransferLeadership call") } // TestInstallSQSLeadershipRefusal_HTFIFOCapableNoOp pins the @@ -114,7 +152,7 @@ func TestInstallSQSLeadershipRefusal_StartupAlreadyLeaderRefuses(t *testing.T) { false, // binary lacks htfifo slog.Default(), ) - admin.awaitTransferCalls(t, 1) + admin.awaitTransferCallsAtLeastOne(t) require.Equal(t, int32(1), admin.registerCalls.Load(), "the per-acquisition observer must also be registered "+ "so future re-elections trigger the same refusal") @@ -157,7 +195,7 @@ func TestInstallSQSLeadershipRefusal_AcquisitionTriggersRefuse(t *testing.T) { // Simulate refreshStatus firing the observer after the node // became leader. admin.registeredCb() - admin.awaitTransferCalls(t, 1) + admin.awaitTransferCallsAtLeastOne(t) } // TestInstallSQSLeadershipRefusal_DeregisterPropagates pins that @@ -195,7 +233,7 @@ func TestInstallSQSLeadershipRefusal_TransferErrorLogged(t *testing.T) { slog.Default(), ) }) - admin.awaitTransferCalls(t, 1) + admin.awaitTransferCallsAtLeastOne(t) } // TestInstallSQSLeadershipRefusal_NilAdminIsSafe pins the typed- @@ -212,6 +250,147 @@ func TestInstallSQSLeadershipRefusal_NilAdminIsSafe(t *testing.T) { require.NotPanics(t, dereg) } +// TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered pins the +// post-registration State() re-check (claude finding 2 on PR +// #723). If the engine wins an election in the narrow window +// between the install-time State() read and +// RegisterLeaderAcquiredCallback returning, refreshStatus may +// fire fireLeaderAcquiredCallbacks before the refuse() callback +// is in the slice — the hook would miss that acquisition. The +// helper closes the gap by re-checking State() after registration +// and firing refuse() if the node turned leader during the +// window. +// +// The test triggers this by configuring stateAfterRegister so +// the fake controller flips to StateLeader exactly when +// RegisterLeaderAcquiredCallback is called. The post-fix code +// must call refuse() despite the install-time State() returning +// follower. +func TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{ + state: raftengine.StateFollower, + stateAfterRegister: raftengine.StateLeader, + } + _ = installSQSLeadershipRefusal( + context.Background(), admin, 7, + map[uint64]bool{7: true}, + false, + slog.Default(), + ) + admin.awaitTransferCallsAtLeastOne(t) + require.Equal(t, int32(1), admin.registerCalls.Load(), + "observer must still be registered for future transitions") +} + +// TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters +// pins the across-groups composite installer: a non-empty +// partition map produces N per-group installs, and the returned +// composite deregister fires every per-group deregister. +func TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters(t *testing.T) { + t.Parallel() + admin1 := &fakeLeadershipController{state: raftengine.StateFollower} + admin2 := &fakeLeadershipController{state: raftengine.StateFollower} + runtimes := []*raftGroupRuntime{ + {spec: groupSpec{id: 10}, engine: admin1}, + {spec: groupSpec{id: 11}, engine: admin2}, + } + partitionMap := map[string]sqsFifoQueueRouting{ + "q.fifo": {partitionCount: 2, groups: []string{"10", "11"}}, + } + dereg := installSQSLeadershipRefusalAcrossGroups( + context.Background(), runtimes, partitionMap, + false, // binary lacks htfifo + slog.Default(), + ) + require.Equal(t, int32(1), admin1.registerCalls.Load(), + "group 10 must get a refusal-observer registration") + require.Equal(t, int32(1), admin2.registerCalls.Load(), + "group 11 must get a refusal-observer registration") + + dereg() + require.Equal(t, int32(1), admin1.deregisterCalls.Load(), + "composite deregister must propagate to group 10") + require.Equal(t, int32(1), admin2.deregisterCalls.Load(), + "composite deregister must propagate to group 11") +} + +// TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp +// pins the early return for non-partitioned clusters — no +// per-group install runs, deregister is a safe no-op. +func TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateFollower} + runtimes := []*raftGroupRuntime{ + {spec: groupSpec{id: 10}, engine: admin}, + } + dereg := installSQSLeadershipRefusalAcrossGroups( + context.Background(), runtimes, nil, // empty partition map + false, + slog.Default(), + ) + require.Zero(t, admin.registerCalls.Load(), + "empty partition map must not trigger any registration") + require.NotPanics(t, dereg) +} + +// TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes +// pins the defensive nil-runtime / nil-engine guard. A test that +// constructs a sparse runtimes slice (e.g. for a stub) must not +// crash the iterator. +func TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes(t *testing.T) { + t.Parallel() + admin := &fakeLeadershipController{state: raftengine.StateFollower} + runtimes := []*raftGroupRuntime{ + nil, // entire entry nil + {spec: groupSpec{id: 11}, engine: nil}, // engine nil + {spec: groupSpec{id: 10}, engine: admin}, // valid + } + dereg := installSQSLeadershipRefusalAcrossGroups( + context.Background(), runtimes, + map[string]sqsFifoQueueRouting{ + "q.fifo": {partitionCount: 1, groups: []string{"10"}}, + }, + false, + slog.Default(), + ) + require.Equal(t, int32(1), admin.registerCalls.Load(), + "valid runtime must still get the registration despite "+ + "nil entries earlier in the slice") + require.NotPanics(t, dereg) +} + +// engineWithoutAcquiredObserver is a runtime engine implementation +// that satisfies enough of raftengine.Engine to live in +// raftGroupRuntime but does NOT implement +// sqsLeadershipController (missing RegisterLeaderAcquiredCallback). +// Used to pin the type-assertion-failure log branch. +type engineWithoutAcquiredObserver struct { + raftengine.Engine +} + +// TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver +// pins the type-assertion-miss branch. An engine that doesn't +// satisfy sqsLeadershipController must be logged and skipped +// rather than crashing the iterator. +func TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver(t *testing.T) { + t.Parallel() + runtimes := []*raftGroupRuntime{ + {spec: groupSpec{id: 10}, engine: &engineWithoutAcquiredObserver{}}, + } + require.NotPanics(t, func() { + dereg := installSQSLeadershipRefusalAcrossGroups( + context.Background(), runtimes, + map[string]sqsFifoQueueRouting{ + "q.fifo": {partitionCount: 1, groups: []string{"10"}}, + }, + false, + slog.Default(), + ) + dereg() + }) +} + // TestPartitionedGroupSet_FlattensRouting pins that // partitionedGroupSet collapses --sqsFifoPartitionMap into the // {gid → bool} set the leadership-refusal hook consumes.