From ebd1eb8e3f5a6a6efa1513d804be00c8554e92fa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 06:48:43 +0000 Subject: [PATCH 1/8] Initial plan From b6e0d8bf32991b86d90fff67125f9548ed09dc65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 06:58:47 +0000 Subject: [PATCH 2/8] refactor: gate ShouldReport on enabled, move unmatchedSecondaries to struct, remove nolint, deterministic tests Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> --- proxy/proxy_test.go | 16 +++-- proxy/sentry.go | 8 ++- proxy/shadow_pubsub.go | 130 +++++++++++------------------------- proxy/shadow_pubsub_test.go | 86 ++++++++++-------------- 4 files changed, 94 insertions(+), 146 deletions(-) diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 44d9e59f..fdc13028 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -291,31 +291,39 @@ func TestSentryReporterDisabled(t *testing.T) { r.CaptureException(nil, "test", nil) r.CaptureDivergence(Divergence{}) r.Flush(0) + // ShouldReport always returns false when disabled (no side-effects on lastReport). + assert.False(t, r.ShouldReport("any-fingerprint")) + assert.Empty(t, r.lastReport, "disabled reporter must not modify lastReport") } func TestShouldReportCooldown(t *testing.T) { + now := time.Now() r := &SentryReporter{ + enabled: true, lastReport: make(map[string]time.Time), cooldown: 50 * time.Millisecond, + nowFunc: func() time.Time { return now }, } assert.True(t, r.ShouldReport("fp1")) assert.False(t, r.ShouldReport("fp1")) // within cooldown - time.Sleep(60 * time.Millisecond) + now = now.Add(60 * time.Millisecond) // advance past cooldown assert.True(t, r.ShouldReport("fp1")) // cooldown elapsed } func TestShouldReportEvictsExpired(t *testing.T) { + now := time.Now() r := &SentryReporter{ + enabled: true, lastReport: make(map[string]time.Time), cooldown: 1 * time.Millisecond, + nowFunc: func() time.Time { return now }, } - // Fill to maxReportEntries + // Fill to maxReportEntries — all entries already expired for i := range maxReportEntries { - r.lastReport[string(rune(i))] = time.Now().Add(-time.Hour) + r.lastReport[string(rune(i))] = now.Add(-time.Hour) } - time.Sleep(2 * time.Millisecond) assert.True(t, r.ShouldReport("new-fp")) assert.Less(t, len(r.lastReport), maxReportEntries) } diff --git a/proxy/sentry.go b/proxy/sentry.go index c4190ca0..2d363763 100644 --- a/proxy/sentry.go +++ b/proxy/sentry.go @@ -26,6 +26,7 @@ type SentryReporter struct { hub *sentry.Hub logger *slog.Logger cooldown time.Duration + nowFunc func() time.Time // injectable clock; defaults to time.Now mu sync.Mutex lastReport map[string]time.Time // fingerprint → last report time @@ -39,6 +40,7 @@ func NewSentryReporter(dsn string, environment string, sampleRate float64, logge r := &SentryReporter{ logger: logger, cooldown: defaultReportCooldown, + nowFunc: time.Now, lastReport: make(map[string]time.Time), } if dsn == "" { @@ -105,11 +107,15 @@ func (r *SentryReporter) CaptureDivergence(div Divergence) { // ShouldReport checks if this fingerprint has been reported recently (cooldown-based). // Evicts expired entries when the map reaches maxReportEntries to bound memory usage. // Returns false (drops the report) if the map is still at capacity after eviction. +// Returns false immediately if Sentry reporting is disabled. func (r *SentryReporter) ShouldReport(fingerprint string) bool { + if !r.enabled { + return false + } r.mu.Lock() defer r.mu.Unlock() - now := time.Now() + now := r.nowFunc() // Evict expired entries if map grows too large if len(r.lastReport) >= maxReportEntries { diff --git a/proxy/shadow_pubsub.go b/proxy/shadow_pubsub.go index 9eaefa46..3fa580f4 100644 --- a/proxy/shadow_pubsub.go +++ b/proxy/shadow_pubsub.go @@ -26,18 +26,6 @@ type secondaryPending struct { payload string } -// unmatchedSecondaries buffers unmatched secondary messages per shadowPubSub -// instance. This allows us to avoid reporting DivExtraData immediately when a -// secondary arrives before the corresponding primary (e.g. due to network -// jitter) and instead only report once the comparison window has elapsed. -// Each key maps to a slice to handle duplicate secondary messages correctly. -var unmatchedSecondaries = struct { - sync.Mutex - data map[*shadowPubSub]map[msgKey][]secondaryPending -}{ - data: make(map[*shadowPubSub]map[msgKey][]secondaryPending), -} - // pendingMsg records a message awaiting its counterpart from the other source. type pendingMsg struct { pattern string @@ -65,24 +53,29 @@ type shadowPubSub struct { window time.Duration nowFunc func() time.Time // injectable clock; defaults to time.Now - mu sync.Mutex - pending map[msgKey][]pendingMsg // primary messages awaiting secondary match - closed bool - started bool - startOnce sync.Once - done chan struct{} + mu sync.Mutex + pending map[msgKey][]pendingMsg // primary messages awaiting secondary match + // unmatchedSecondaries buffers secondary messages that arrived before a + // corresponding primary. Protected by mu. Each key maps to a slice to + // handle duplicate messages correctly. + unmatchedSecondaries map[msgKey][]secondaryPending + closed bool + started bool + startOnce sync.Once + done chan struct{} } func newShadowPubSub(backend PubSubBackend, metrics *ProxyMetrics, sentry *SentryReporter, logger *slog.Logger, window time.Duration) *shadowPubSub { return &shadowPubSub{ - secondary: backend.NewPubSub(context.Background()), - metrics: metrics, - sentry: sentry, - logger: logger, - window: window, - nowFunc: time.Now, - pending: make(map[msgKey][]pendingMsg), - done: make(chan struct{}), + secondary: backend.NewPubSub(context.Background()), + metrics: metrics, + sentry: sentry, + logger: logger, + window: window, + nowFunc: time.Now, + pending: make(map[msgKey][]pendingMsg), + unmatchedSecondaries: make(map[msgKey][]secondaryPending), + done: make(chan struct{}), } } @@ -166,14 +159,7 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) { // exists for the given key and is still within the comparison window. If one is // found it is consumed and true is returned. Caller must hold sp.mu. func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time) bool { - unmatchedSecondaries.Lock() - defer unmatchedSecondaries.Unlock() - - secBySP, ok := unmatchedSecondaries.data[sp] - if !ok { - return false - } - secs, ok := secBySP[key] + secs, ok := sp.unmatchedSecondaries[key] if !ok || len(secs) == 0 { return false } @@ -185,12 +171,9 @@ func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time // Consume this secondary — remove it from the slice. secs = append(secs[:i], secs[i+1:]...) if len(secs) == 0 { - delete(secBySP, key) - if len(secBySP) == 0 { - delete(unmatchedSecondaries.data, sp) - } + delete(sp.unmatchedSecondaries, key) } else { - secBySP[key] = secs + sp.unmatchedSecondaries[key] = secs } return true } @@ -210,10 +193,6 @@ func (sp *shadowPubSub) Close() { if started { <-sp.done } - // Clean up buffered unmatched secondaries to prevent memory leak. - unmatchedSecondaries.Lock() - delete(unmatchedSecondaries.data, sp) - unmatchedSecondaries.Unlock() } // compareLoop reads from the secondary channel and matches messages. @@ -244,6 +223,7 @@ func (sp *shadowPubSub) compareLoop(ch <-chan *redis.Message) { // Collects divergence info under lock and reports after releasing it. func (sp *shadowPubSub) matchSecondary(msg *redis.Message) { sp.mu.Lock() + defer sp.mu.Unlock() key := msgKeyFromMessage(msg) now := sp.nowFunc() @@ -256,7 +236,6 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) { } else { sp.pending[key] = entries[1:] } - sp.mu.Unlock() return } // The oldest pending primary is already past the window. Let the periodic @@ -264,20 +243,9 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) { // so it can be reported as DivExtraData if it also remains unmatched. } - sp.mu.Unlock() - // No matching primary message within the window. Buffer the secondary and only // report DivExtraData if it remains unmatched after the comparison window. - unmatchedSecondaries.Lock() - defer unmatchedSecondaries.Unlock() - - perInstance, ok := unmatchedSecondaries.data[sp] - if !ok { - perInstance = make(map[msgKey][]secondaryPending) - unmatchedSecondaries.data[sp] = perInstance - } - - perInstance[key] = append(perInstance[key], secondaryPending{ + sp.unmatchedSecondaries[key] = append(sp.unmatchedSecondaries[key], secondaryPending{ timestamp: now, channel: msg.Channel, payload: msg.Payload, @@ -291,22 +259,11 @@ func (sp *shadowPubSub) sweepExpired() { sp.mu.Lock() now := sp.nowFunc() - unmatchedSecondaries.Lock() - // Fetch without creating to avoid unnecessary allocation on every tick for idle instances. - perInstance := unmatchedSecondaries.data[sp] - // Expire old buffered secondaries first so they cannot be consumed as a // "match" during reconciliation (prevents bypassing the comparison window). - if perInstance != nil { - divergences = sweepExpiredSecondaries(now, sp.window, perInstance, divergences) - } - divergences = sp.reconcilePrimaries(now, perInstance, divergences) - - if len(perInstance) == 0 { - delete(unmatchedSecondaries.data, sp) - } + divergences = sweepExpiredSecondaries(now, sp.window, sp.unmatchedSecondaries, divergences) + divergences = sp.reconcilePrimaries(now, sp.unmatchedSecondaries, divergences) - unmatchedSecondaries.Unlock() sp.mu.Unlock() for _, d := range divergences { @@ -316,7 +273,7 @@ func (sp *shadowPubSub) sweepExpired() { // reconcilePrimaries matches pending primaries against buffered secondaries, // reporting expired unmatched primaries as divergences. -// Caller must hold sp.mu and unmatchedSecondaries.Lock(). +// Caller must hold sp.mu. func (sp *shadowPubSub) reconcilePrimaries(now time.Time, secBuf map[msgKey][]secondaryPending, out []divergenceEvent) []divergenceEvent { for key, entries := range sp.pending { var remaining []pendingMsg @@ -385,26 +342,20 @@ func (sp *shadowPubSub) sweepAll() { } delete(sp.pending, key) } - sp.mu.Unlock() - // Also drain any buffered unmatched secondaries for this instance. - unmatchedSecondaries.Lock() - if perInstance, ok := unmatchedSecondaries.data[sp]; ok { - for key, secs := range perInstance { - for _, sec := range secs { - divergences = append(divergences, divergenceEvent{ - channel: sec.channel, - payload: sec.payload, - pattern: key.Pattern, - kind: DivExtraData, - isPattern: key.Pattern != "", - }) - } - delete(perInstance, key) + for key, secs := range sp.unmatchedSecondaries { + for _, sec := range secs { + divergences = append(divergences, divergenceEvent{ + channel: sec.channel, + payload: sec.payload, + pattern: key.Pattern, + kind: DivExtraData, + isPattern: key.Pattern != "", + }) } - delete(unmatchedSecondaries.data, sp) + delete(sp.unmatchedSecondaries, key) } - unmatchedSecondaries.Unlock() + sp.mu.Unlock() for _, d := range divergences { sp.reportDivergence(d) @@ -429,11 +380,10 @@ func (sp *shadowPubSub) reportDivergence(d divergenceEvent) { } var primary, secondary any - switch d.kind { //nolint:exhaustive // only two kinds apply to pub/sub shadow - case DivExtraData: + if d.kind == DivExtraData { primary = nil secondary = d.payload - default: + } else { primary = d.payload secondary = nil } diff --git a/proxy/shadow_pubsub_test.go b/proxy/shadow_pubsub_test.go index af598fbc..67894c35 100644 --- a/proxy/shadow_pubsub_test.go +++ b/proxy/shadow_pubsub_test.go @@ -42,13 +42,14 @@ func newTestShadowPubSub(window time.Duration) *shadowPubSub { func newTestShadowPubSubWithClock(window time.Duration, nowFunc func() time.Time) *shadowPubSub { return &shadowPubSub{ - metrics: newTestMetrics(), - sentry: newTestSentry(), - logger: testLogger, - window: window, - nowFunc: nowFunc, - pending: make(map[msgKey][]pendingMsg), - done: make(chan struct{}), + metrics: newTestMetrics(), + sentry: newTestSentry(), + logger: testLogger, + window: window, + nowFunc: nowFunc, + pending: make(map[msgKey][]pendingMsg), + unmatchedSecondaries: make(map[msgKey][]secondaryPending), + done: make(chan struct{}), } } @@ -84,11 +85,6 @@ func TestShadowPubSub_MissingOnSecondary(t *testing.T) { func TestShadowPubSub_ExtraOnSecondary(t *testing.T) { clock := newTestClock() sp := newTestShadowPubSubWithClock(10*time.Millisecond, clock.Now) - defer func() { - unmatchedSecondaries.Lock() - delete(unmatchedSecondaries.data, sp) - unmatchedSecondaries.Unlock() - }() sp.matchSecondary(&redis.Message{Channel: "ch1", Payload: "extra"}) @@ -173,52 +169,50 @@ func TestShadowPubSub_CompareLoopExitsOnChannelClose(t *testing.T) { func TestShadowPubSub_DuplicateSecondaryBuffered(t *testing.T) { sp := newTestShadowPubSub(10 * time.Millisecond) - defer func() { - // Clean up without calling Close (test has no real secondary connection). - unmatchedSecondaries.Lock() - delete(unmatchedSecondaries.data, sp) - unmatchedSecondaries.Unlock() - }() // Two identical secondary messages arrive before any primary. sp.matchSecondary(&redis.Message{Channel: "ch1", Payload: "dup"}) sp.matchSecondary(&redis.Message{Channel: "ch1", Payload: "dup"}) - unmatchedSecondaries.Lock() key := msgKey{Channel: "ch1", Payload: "dup"} - secs := unmatchedSecondaries.data[sp][key] + sp.mu.Lock() + secs := sp.unmatchedSecondaries[key] assert.Len(t, secs, 2, "both duplicate secondaries should be buffered") - unmatchedSecondaries.Unlock() + sp.mu.Unlock() // Now one primary arrives — should consume one buffered secondary. sp.RecordPrimary(&redis.Message{Channel: "ch1", Payload: "dup"}) sp.sweepExpired() // reconcile - unmatchedSecondaries.Lock() - secs = unmatchedSecondaries.data[sp][key] - unmatchedSecondaries.Unlock() + sp.mu.Lock() + secs = sp.unmatchedSecondaries[key] + sp.mu.Unlock() // One secondary remains buffered (only one primary consumed one). assert.Len(t, secs, 1, "one duplicate should remain after matching one primary") } func TestShadowPubSub_CloseCleanupUnmatchedSecondaries(t *testing.T) { - // Close() now tolerates nil secondary, so no mock client is needed. + // Verify that sweepAll (called when the compare loop exits on Close) drains + // the per-struct unmatchedSecondaries buffer and reports DivExtraData. sp := newTestShadowPubSub(1 * time.Second) // Buffer a secondary message. sp.matchSecondary(&redis.Message{Channel: "ch1", Payload: "leaked"}) - unmatchedSecondaries.Lock() - _, exists := unmatchedSecondaries.data[sp] - unmatchedSecondaries.Unlock() - assert.True(t, exists, "secondary should be buffered before Close") + sp.mu.Lock() + _, buffered := sp.unmatchedSecondaries[msgKey{Channel: "ch1", Payload: "leaked"}] + sp.mu.Unlock() + assert.True(t, buffered, "secondary should be buffered before sweep") + + // sweepAll drains the buffer and reports the unmatched secondary as DivExtraData. + sp.sweepAll() - sp.Close() + sp.mu.Lock() + assert.Empty(t, sp.unmatchedSecondaries, "sweepAll should drain unmatchedSecondaries") + sp.mu.Unlock() - unmatchedSecondaries.Lock() - _, exists = unmatchedSecondaries.data[sp] - unmatchedSecondaries.Unlock() - assert.False(t, exists, "Close should clean up unmatchedSecondaries entry") + extra := counterValue(sp.metrics.PubSubShadowDivergences.WithLabelValues("extra_data")) + assert.Equal(t, float64(1), extra, "buffered secondary should be reported as extra_data by sweepAll") } func TestShadowPubSub_CompareLoopMatchesFromChannel(t *testing.T) { @@ -252,19 +246,14 @@ func TestShadowPubSub_CompareLoopMatchesFromChannel(t *testing.T) { func TestShadowPubSub_SecondaryBeforePrimaryImmediateReconcile(t *testing.T) { clock := newTestClock() sp := newTestShadowPubSubWithClock(1*time.Second, clock.Now) - defer func() { - unmatchedSecondaries.Lock() - delete(unmatchedSecondaries.data, sp) - unmatchedSecondaries.Unlock() - }() - // Secondary arrives first — it should be buffered in unmatchedSecondaries. + // Secondary arrives first — it should be buffered in sp.unmatchedSecondaries. sp.matchSecondary(&redis.Message{Channel: "ch1", Payload: "early"}) key := msgKey{Channel: "ch1", Payload: "early"} - unmatchedSecondaries.Lock() - secs := unmatchedSecondaries.data[sp][key] - unmatchedSecondaries.Unlock() + sp.mu.Lock() + secs := sp.unmatchedSecondaries[key] + sp.mu.Unlock() assert.Len(t, secs, 1, "secondary should be buffered before primary arrives") // Primary arrives within the window — RecordPrimary should immediately consume @@ -277,9 +266,9 @@ func TestShadowPubSub_SecondaryBeforePrimaryImmediateReconcile(t *testing.T) { sp.mu.Unlock() // The buffered secondary entry must have been consumed. - unmatchedSecondaries.Lock() - secs = unmatchedSecondaries.data[sp][key] - unmatchedSecondaries.Unlock() + sp.mu.Lock() + secs = sp.unmatchedSecondaries[key] + sp.mu.Unlock() assert.Empty(t, secs, "buffered secondary should be consumed immediately by RecordPrimary") // Advance past the window and sweep — no divergences should be reported. @@ -322,11 +311,6 @@ func TestShadowPubSub_SecondaryWithinWindowMatches(t *testing.T) { func TestShadowPubSub_ExpiredPrimaryThenLateSecondary(t *testing.T) { clock := newTestClock() sp := newTestShadowPubSubWithClock(10*time.Millisecond, clock.Now) - defer func() { - unmatchedSecondaries.Lock() - delete(unmatchedSecondaries.data, sp) - unmatchedSecondaries.Unlock() - }() // Record the primary message. sp.RecordPrimary(&redis.Message{Channel: "ch1", Payload: "late"}) From 41cadd4fbd2b14ff82973bc515100145bbf7a189 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 07:37:48 +0000 Subject: [PATCH 3/8] fix: gci formatting in proxy_test.go (double space before comment) Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> --- proxy/proxy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index fdc13028..7e8efb3c 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -308,7 +308,7 @@ func TestShouldReportCooldown(t *testing.T) { assert.True(t, r.ShouldReport("fp1")) assert.False(t, r.ShouldReport("fp1")) // within cooldown - now = now.Add(60 * time.Millisecond) // advance past cooldown + now = now.Add(60 * time.Millisecond) // advance past cooldown assert.True(t, r.ShouldReport("fp1")) // cooldown elapsed } From 73987fd690e7c0571568dedd07e2cc484ac1092f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 20 Mar 2026 17:14:26 +0900 Subject: [PATCH 4/8] redis: leader-route demo reads and fail invalid Jepsen runs --- adapter/add_voter_join_test.go | 2 +- adapter/redis_server_setup_test.go | 19 +++++++++++++++++++ adapter/test_util.go | 2 +- cmd/server/demo.go | 3 ++- jepsen/src/elastickv/redis_workload.clj | 11 +++++++++-- jepsen/test/elastickv/redis_workload_test.clj | 13 +++++++++++++ 6 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 adapter/redis_server_setup_test.go create mode 100644 jepsen/test/elastickv/redis_workload_test.clj diff --git a/adapter/add_voter_join_test.go b/adapter/add_voter_join_test.go index ef20ce60..4ae09466 100644 --- a/adapter/add_voter_join_test.go +++ b/adapter/add_voter_join_test.go @@ -182,7 +182,7 @@ func startAddVoterJoinNode( return err }) - rd := NewRedisServer(lis.redis, port.redisAddress, st, coordinator, leaderRedisMap, relay) + rd := NewRedisServer(lis.redis, port.redisAddress, routedStore, coordinator, leaderRedisMap, relay) workers.Go(func() error { err := rd.Run() if errors.Is(err, net.ErrClosed) { diff --git a/adapter/redis_server_setup_test.go b/adapter/redis_server_setup_test.go new file mode 100644 index 00000000..ecbca30b --- /dev/null +++ b/adapter/redis_server_setup_test.go @@ -0,0 +1,19 @@ +package adapter + +import ( + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +func TestRedisServerUsesLeaderRoutedStoreInSingleGroupCluster(t *testing.T) { + t.Parallel() + + nodes, _, _ := createNode(t, 3) + defer shutdown(nodes) + + require.IsType(t, &kv.LeaderRoutedStore{}, nodes[0].redisServer.store) + require.IsType(t, &kv.LeaderRoutedStore{}, nodes[1].redisServer.store) + require.IsType(t, &kv.LeaderRoutedStore{}, nodes[2].redisServer.store) +} diff --git a/adapter/test_util.go b/adapter/test_util.go index d632ed5c..3eb3adf1 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -370,7 +370,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ( assert.NoError(t, srv.Serve(lis)) }(s, grpcSock) - rd := NewRedisServer(redisSock, port.redisAddress, st, coordinator, leaderRedisMap, relay) + rd := NewRedisServer(redisSock, port.redisAddress, routedStore, coordinator, leaderRedisMap, relay) go func(server *RedisServer) { assert.NoError(t, server.Run()) }(rd) diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 49e1fba7..2853bf61 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -354,7 +354,8 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co if err != nil { return nil, errors.WithStack(err) } - return adapter.NewRedisServer(l, redisAddr, st, coordinator, leaderRedis, relay), nil + routedStore := kv.NewLeaderRoutedStore(st, coordinator) + return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay), nil } func run(ctx context.Context, eg *errgroup.Group, cfg config) error { diff --git a/jepsen/src/elastickv/redis_workload.clj b/jepsen/src/elastickv/redis_workload.clj index 4a5730c6..de9b25e0 100644 --- a/jepsen/src/elastickv/redis_workload.clj +++ b/jepsen/src/elastickv/redis_workload.clj @@ -175,6 +175,13 @@ :parse-fn #(Integer/parseInt %)] ["-h" "--help"]]) +(defn fail-on-invalid! + "Raises when Jepsen completed analysis and found the history invalid." + [result] + (when (false? (:valid? result)) + (throw (ex-info "Jepsen analysis invalid" {:result result}))) + result) + (defn -main [& args] (let [{:keys [options errors summary]} (tools.cli/parse-opts args cli-opts) @@ -211,5 +218,5 @@ (seq errors) (binding [*out* *err*] (println "Error parsing options:" (str/join "; " errors))) (:local options) (binding [control/*dummy* true] - (jepsen/run! (elastickv-redis-test options))) - :else (jepsen/run! (elastickv-redis-test options))))) + (fail-on-invalid! (jepsen/run! (elastickv-redis-test options)))) + :else (fail-on-invalid! (jepsen/run! (elastickv-redis-test options)))))) diff --git a/jepsen/test/elastickv/redis_workload_test.clj b/jepsen/test/elastickv/redis_workload_test.clj new file mode 100644 index 00000000..3c63bcfd --- /dev/null +++ b/jepsen/test/elastickv/redis_workload_test.clj @@ -0,0 +1,13 @@ +(ns elastickv.redis-workload-test + (:require [clojure.test :refer :all] + [elastickv.redis-workload :as workload])) + +(deftest fail-on-invalid-passes-through-valid-results + (let [result {:valid? true}] + (is (= result (workload/fail-on-invalid! result))))) + +(deftest fail-on-invalid-throws-for-invalid-results + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"Jepsen analysis invalid" + (workload/fail-on-invalid! {:valid? false})))) From 148f5b3fda3205105b4f99517d3f83b922cadbd5 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 20 Mar 2026 19:13:11 +0900 Subject: [PATCH 5/8] Add raft migration and rollout tooling --- README.md | 26 + cmd/raft-migrate/main.go | 39 ++ go.mod | 1 + go.sum | 2 + internal/raftstore/migrate.go | 231 +++++++++ internal/raftstore/migrate_test.go | 144 ++++++ scripts/rolling-update.env.example | 38 ++ scripts/rolling-update.sh | 738 +++++++++++++++++++++++++++++ 8 files changed, 1219 insertions(+) create mode 100644 cmd/raft-migrate/main.go create mode 100644 internal/raftstore/migrate.go create mode 100644 internal/raftstore/migrate_test.go create mode 100644 scripts/rolling-update.env.example create mode 100755 scripts/rolling-update.sh diff --git a/README.md b/README.md index 69cd190b..cd38e4b9 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,32 @@ To start the server, use the following command: go run cmd/server/demo.go ``` +### Migrating Legacy BoltDB Raft Storage + +Recent versions store Raft logs and stable state in Pebble (`raft.db`) instead of +the legacy BoltDB files (`logs.dat` and `stable.dat`). If startup fails with: + +```text +legacy boltdb Raft storage "logs.dat" found in ... +``` + +stop the node and run the offline migrator against the directory shown in the +error: + +```bash +go run ./cmd/raft-migrate --dir /var/lib/elastickv/n1 +mv /var/lib/elastickv/n1/logs.dat /var/lib/elastickv/n1/logs.dat.bak +mv /var/lib/elastickv/n1/stable.dat /var/lib/elastickv/n1/stable.dat.bak +``` + +For multi-group layouts, pass the exact group directory from the error message +(for example `/var/lib/elastickv/n1/group-1`). + +After that, start Elastickv normally. The migrator leaves the legacy files in +place as a backup, but they must be moved or removed before startup because the +server intentionally refuses to run while `logs.dat` or `stable.dat` are still +present. + To expose metrics on a dedicated port: ```bash go run . \ diff --git a/cmd/raft-migrate/main.go b/cmd/raft-migrate/main.go new file mode 100644 index 00000000..05c5b89b --- /dev/null +++ b/cmd/raft-migrate/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "flag" + "fmt" + "log" + "path/filepath" + + "github.com/bootjp/elastickv/internal/raftstore" +) + +func main() { + var ( + dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat") + out = flag.String("out", "", "Destination Pebble raft.db directory (default: /raft.db)") + ) + flag.Parse() + + if *dir == "" { + log.Fatal("--dir is required") + } + + dest := *out + if dest == "" { + dest = filepath.Join(*dir, "raft.db") + } + + stats, err := raftstore.MigrateLegacyBoltDB( + filepath.Join(*dir, "logs.dat"), + filepath.Join(*dir, "stable.dat"), + dest, + ) + if err != nil { + log.Fatalf("migration failed: %v", err) + } + + fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys) + fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv") +} diff --git a/go.mod b/go.mod index 4c6824c0..5e024a17 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/tidwall/redcon v1.6.2 github.com/vmihailenco/msgpack/v5 v5.4.1 github.com/yuin/gopher-lua v1.1.1 + go.etcd.io/bbolt v1.4.3 golang.org/x/sync v0.20.0 golang.org/x/sys v0.42.0 google.golang.org/grpc v1.79.3 diff --git a/go.sum b/go.sum index 81ae84e2..06a996bb 100644 --- a/go.sum +++ b/go.sum @@ -336,6 +336,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= diff --git a/internal/raftstore/migrate.go b/internal/raftstore/migrate.go new file mode 100644 index 00000000..b37b5fc3 --- /dev/null +++ b/internal/raftstore/migrate.go @@ -0,0 +1,231 @@ +package raftstore + +import ( + "bytes" + "os" + "path/filepath" + + "github.com/cockroachdb/errors" + "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/raft" + "go.etcd.io/bbolt" +) + +const ( + legacyLogsBucket = "logs" + legacyStableBucket = "conf" + legacyBatchSize = 1024 + legacyBoltFileMode = 0o600 + legacyMigrationSuffix = ".migrating" +) + +type MigrationStats struct { + Logs uint64 + StableKeys uint64 +} + +func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) { + tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir) + if err != nil { + return nil, err + } + + logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath) + if err != nil { + return nil, err + } + defer closeSources() + + stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir) + if err != nil { + return nil, err + } + if err := finalizeMigratedStore(tempDir, destDir); err != nil { + return nil, err + } + return stats, nil +} + +func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) { + if logsPath == "" { + return "", errors.New("logs path is required") + } + if stablePath == "" { + return "", errors.New("stable path is required") + } + if destDir == "" { + return "", errors.New("destination dir is required") + } + + if err := requireExistingFile(logsPath); err != nil { + return "", err + } + if err := requireExistingFile(stablePath); err != nil { + return "", err + } + if err := requireDestinationAbsent(destDir); err != nil { + return "", err + } + + tempDir := destDir + legacyMigrationSuffix + if err := requireDestinationAbsent(tempDir); err != nil { + return "", err + } + return tempDir, nil +} + +func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) { + logsDB, err = openLegacyBoltReadOnly(logsPath) + if err != nil { + return nil, nil, nil, err + } + + stableDB, err = openLegacyBoltReadOnly(stablePath) + if err != nil { + _ = logsDB.Close() + return nil, nil, nil, err + } + + closeFn = func() { + _ = stableDB.Close() + _ = logsDB.Close() + } + return logsDB, stableDB, closeFn, nil +} + +func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) { + store, err := NewPebbleStore(tempDir) + if err != nil { + return nil, err + } + + cleanupTemp := func() { + _ = store.Close() + _ = os.RemoveAll(tempDir) + } + + stats, err := migrateLegacyBoltData(logsDB, stableDB, store) + if err != nil { + cleanupTemp() + return nil, err + } + if err := store.Close(); err != nil { + _ = os.RemoveAll(tempDir) + return nil, err + } + return stats, nil +} + +func finalizeMigratedStore(tempDir, destDir string) error { + if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil { + _ = os.RemoveAll(tempDir) + return errors.WithStack(err) + } + if err := os.Rename(tempDir, destDir); err != nil { + _ = os.RemoveAll(tempDir) + return errors.WithStack(err) + } + return nil +} + +func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) { + stats := &MigrationStats{} + + if err := copyLegacyStable(stableDB, dest, stats); err != nil { + return nil, err + } + if err := copyLegacyLogs(logsDB, dest, stats); err != nil { + return nil, err + } + + return stats, nil +} + +func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { + return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(legacyStableBucket)) + if bucket == nil { + return errors.Newf("legacy stable bucket %q not found", legacyStableBucket) + } + return bucket.ForEach(func(k, v []byte) error { + if err := dest.Set(k, append([]byte(nil), v...)); err != nil { + return err + } + stats.StableKeys++ + return nil + }) + })) +} + +func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { + batch := make([]*raft.Log, 0, legacyBatchSize) + + flush := func() error { + if len(batch) == 0 { + return nil + } + if err := dest.StoreLogs(batch); err != nil { + return err + } + stats.Logs += uint64(len(batch)) + batch = batch[:0] + return nil + } + + err := logsDB.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(legacyLogsBucket)) + if bucket == nil { + return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket) + } + return bucket.ForEach(func(_, v []byte) error { + var entry raft.Log + if err := decodeLegacyLog(v, &entry); err != nil { + return err + } + batch = append(batch, &entry) + if len(batch) < legacyBatchSize { + return nil + } + return flush() + }) + }) + if err != nil { + return errors.WithStack(err) + } + + return flush() +} + +func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) { + db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true}) + if err != nil { + return nil, errors.WithStack(err) + } + return db, nil +} + +func requireExistingFile(path string) error { + info, err := os.Stat(path) + if err != nil { + return errors.WithStack(err) + } + if info.IsDir() { + return errors.WithStack(errors.Newf("%s is a directory, expected file", path)) + } + return nil +} + +func requireDestinationAbsent(path string) error { + if _, err := os.Stat(path); err == nil { + return errors.WithStack(errors.Newf("destination already exists: %s", path)) + } else if !os.IsNotExist(err) { + return errors.WithStack(err) + } + return nil +} + +func decodeLegacyLog(payload []byte, out *raft.Log) error { + handle := codec.MsgpackHandle{} + decoder := codec.NewDecoder(bytes.NewReader(payload), &handle) + return errors.WithStack(decoder.Decode(out)) +} diff --git a/internal/raftstore/migrate_test.go b/internal/raftstore/migrate_test.go new file mode 100644 index 00000000..a436b145 --- /dev/null +++ b/internal/raftstore/migrate_test.go @@ -0,0 +1,144 @@ +package raftstore + +import ( + "bytes" + "encoding/binary" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestMigrateLegacyBoltDB(t *testing.T) { + baseDir := t.TempDir() + logsPath := filepath.Join(baseDir, "logs.dat") + stablePath := filepath.Join(baseDir, "stable.dat") + destDir := filepath.Join(baseDir, "raft.db") + + require.NoError(t, writeLegacyLogsDB(logsPath, []*raft.Log{ + { + Index: 1, + Term: 2, + Type: raft.LogCommand, + Data: []byte("set alpha"), + Extensions: []byte("ext-a"), + AppendedAt: time.Unix(100, 0).UTC(), + }, + { + Index: 2, + Term: 2, + Type: raft.LogNoop, + Data: []byte("noop"), + Extensions: []byte("ext-b"), + AppendedAt: time.Unix(200, 0).UTC(), + }, + })) + require.NoError(t, writeLegacyStableDB(stablePath, map[string][]byte{ + "CurrentTerm": encodeUint64(5), + "LastVote": []byte("n1"), + })) + + stats, err := MigrateLegacyBoltDB(logsPath, stablePath, destDir) + require.NoError(t, err) + require.Equal(t, &MigrationStats{Logs: 2, StableKeys: 2}, stats) + + store, err := NewPebbleStore(destDir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + var first raft.Log + require.NoError(t, store.GetLog(1, &first)) + require.Equal(t, uint64(1), first.Index) + require.Equal(t, uint64(2), first.Term) + require.Equal(t, raft.LogCommand, first.Type) + require.Equal(t, []byte("set alpha"), first.Data) + require.Equal(t, []byte("ext-a"), first.Extensions) + require.True(t, first.AppendedAt.Equal(time.Unix(100, 0).UTC())) + + var second raft.Log + require.NoError(t, store.GetLog(2, &second)) + require.Equal(t, raft.LogNoop, second.Type) + require.True(t, second.AppendedAt.Equal(time.Unix(200, 0).UTC())) + + currentTerm, err := store.Get([]byte("CurrentTerm")) + require.NoError(t, err) + require.Equal(t, encodeUint64(5), currentTerm) + + lastVote, err := store.Get([]byte("LastVote")) + require.NoError(t, err) + require.Equal(t, []byte("n1"), lastVote) +} + +func writeLegacyLogsDB(path string, logs []*raft.Log) error { + db, err := bbolt.Open(path, legacyBoltFileMode, nil) + if err != nil { + return err + } + defer func() { _ = db.Close() }() + + return db.Update(func(tx *bbolt.Tx) error { + logsBucket, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)) + if err != nil { + return err + } + if _, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)); err != nil { + return err + } + for _, entry := range logs { + payload, err := encodeLegacyLog(entry) + if err != nil { + return err + } + if err := logsBucket.Put(encodeUint64(entry.Index), payload); err != nil { + return err + } + } + return nil + }) +} + +func writeLegacyStableDB(path string, values map[string][]byte) error { + db, err := bbolt.Open(path, legacyBoltFileMode, nil) + if err != nil { + return err + } + defer func() { _ = db.Close() }() + + return db.Update(func(tx *bbolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)); err != nil { + return err + } + stableBucket, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)) + if err != nil { + return err + } + for key, value := range values { + if err := stableBucket.Put([]byte(key), value); err != nil { + return err + } + } + return nil + }) +} + +func encodeLegacyLog(entry *raft.Log) ([]byte, error) { + var buf bytes.Buffer + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(&buf, &handle) + if err := encoder.Encode(entry); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func encodeUint64(v uint64) []byte { + out := make([]byte, 8) + binary.BigEndian.PutUint64(out, v) + return out +} diff --git a/scripts/rolling-update.env.example b/scripts/rolling-update.env.example new file mode 100644 index 00000000..b5de3daf --- /dev/null +++ b/scripts/rolling-update.env.example @@ -0,0 +1,38 @@ +# Copy this file outside the repo or export the same variables in your shell. + +# Required: rollout order and advertised raft hosts. +NODES="n1=raft-1.internal.example,n2=raft-2.internal.example,n3=raft-3.internal.example" + +# Optional: if SSH targets differ from advertised raft hosts. +# Values may be either hostnames or full user@host targets. +# SSH_TARGETS="n1=admin@ssh-1.internal.example,n2=ssh-2.internal.example,n3=ssh-3.internal.example" + +# Optional: override rollout order without changing NODES. +# ROLLING_ORDER="n2,n3,n1" + +IMAGE="ghcr.io/bootjp/elastickv:latest" +SSH_USER="deploy" +CONTAINER_NAME="elastickv" +DATA_DIR="/var/lib/elastickv" +SERVER_ENTRYPOINT="/app" + +RAFT_PORT="50051" +REDIS_PORT="6379" +DYNAMO_PORT="8000" + +# Optional: override if Redis routing addresses differ from the advertised raft hosts. +# RAFT_TO_REDIS_MAP="raft-1.internal.example:50051=redis-1.internal.example:6379,raft-2.internal.example:50051=redis-2.internal.example:6379,raft-3.internal.example:50051=redis-3.internal.example:6379" + +HEALTH_TIMEOUT_SECONDS="60" +LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="30" +LEADER_DISCOVERY_TIMEOUT_SECONDS="30" +ROLLING_DELAY_SECONDS="2" + +SSH_CONNECT_TIMEOUT_SECONDS="10" +SSH_STRICT_HOST_KEY_CHECKING="accept-new" + +RAFTADMIN_VERSION="v1.2.1" +# If set, this binary must already be executable on the remote nodes. +# RAFTADMIN_BIN="/absolute/path/to/linux/raftadmin" +RAFTADMIN_REMOTE_BIN="/tmp/elastickv-raftadmin" +RAFTADMIN_RPC_TIMEOUT_SECONDS="5" diff --git a/scripts/rolling-update.sh b/scripts/rolling-update.sh new file mode 100755 index 00000000..1b468a65 --- /dev/null +++ b/scripts/rolling-update.sh @@ -0,0 +1,738 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + NODES="n1=raft-1.internal,n2=raft-2.internal,n3=raft-3.internal" ./scripts/rolling-update.sh + +Required environment: + NODES + Comma-separated raft node map in rollout order: "=,..." + +Optional environment: + ROLLING_UPDATE_ENV_FILE + Shell env file to source before evaluating the rest of the settings. + + SSH_TARGETS + Comma-separated SSH target map when SSH hosts differ from advertised hosts: + "=,..." + If omitted, the script SSHes to the advertised host and prefixes SSH_USER. + + ROLLING_ORDER + Comma-separated raft IDs to override the rollout order. + + IMAGE + SSH_USER + CONTAINER_NAME + DATA_DIR + SERVER_ENTRYPOINT + RAFT_PORT + REDIS_PORT + DYNAMO_PORT + RAFT_TO_REDIS_MAP + HEALTH_TIMEOUT_SECONDS + LEADERSHIP_TRANSFER_TIMEOUT_SECONDS + LEADER_DISCOVERY_TIMEOUT_SECONDS + ROLLING_DELAY_SECONDS + SSH_CONNECT_TIMEOUT_SECONDS + SSH_STRICT_HOST_KEY_CHECKING + RAFTADMIN_VERSION + RAFTADMIN_BIN + RAFTADMIN_REMOTE_BIN + RAFTADMIN_RPC_TIMEOUT_SECONDS + +Notes: + - If RAFT_TO_REDIS_MAP is unset, it is derived automatically from NODES, + RAFT_PORT, and REDIS_PORT. + - If RAFTADMIN_BIN is set, it must already be executable on the remote nodes. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ -n "${ROLLING_UPDATE_ENV_FILE:-}" ]]; then + if [[ ! -f "$ROLLING_UPDATE_ENV_FILE" ]]; then + echo "ROLLING_UPDATE_ENV_FILE not found: $ROLLING_UPDATE_ENV_FILE" >&2 + exit 1 + fi + # shellcheck disable=SC1090 + source "$ROLLING_UPDATE_ENV_FILE" +fi + +IMAGE="${IMAGE:-ghcr.io/bootjp/elastickv:latest}" +SSH_USER="${SSH_USER:-${USER:-$(id -un)}}" +CONTAINER_NAME="${CONTAINER_NAME:-elastickv}" +DATA_DIR="${DATA_DIR:-/var/lib/elastickv}" +SERVER_ENTRYPOINT="${SERVER_ENTRYPOINT:-/app}" +RAFT_PORT="${RAFT_PORT:-50051}" +REDIS_PORT="${REDIS_PORT:-6379}" +DYNAMO_PORT="${DYNAMO_PORT:-8000}" +HEALTH_TIMEOUT_SECONDS="${HEALTH_TIMEOUT_SECONDS:-60}" +LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="${LEADERSHIP_TRANSFER_TIMEOUT_SECONDS:-30}" +LEADER_DISCOVERY_TIMEOUT_SECONDS="${LEADER_DISCOVERY_TIMEOUT_SECONDS:-30}" +ROLLING_DELAY_SECONDS="${ROLLING_DELAY_SECONDS:-2}" +SSH_CONNECT_TIMEOUT_SECONDS="${SSH_CONNECT_TIMEOUT_SECONDS:-10}" +SSH_STRICT_HOST_KEY_CHECKING="${SSH_STRICT_HOST_KEY_CHECKING:-accept-new}" +RAFTADMIN_VERSION="${RAFTADMIN_VERSION:-v1.2.1}" +RAFTADMIN_REMOTE_BIN="${RAFTADMIN_REMOTE_BIN:-/tmp/elastickv-raftadmin}" +RAFTADMIN_RPC_TIMEOUT_SECONDS="${RAFTADMIN_RPC_TIMEOUT_SECONDS:-5}" +NODES="${NODES:-}" +SSH_TARGETS="${SSH_TARGETS:-}" +ROLLING_ORDER="${ROLLING_ORDER:-}" +RAFT_TO_REDIS_MAP="${RAFT_TO_REDIS_MAP:-}" + +if [[ -z "$NODES" ]]; then + echo "NODES is required" >&2 + usage >&2 + exit 1 +fi + +SSH_BASE_OPTS=( + -o BatchMode=yes + -o ConnectTimeout="${SSH_CONNECT_TIMEOUT_SECONDS}" + -o StrictHostKeyChecking="${SSH_STRICT_HOST_KEY_CHECKING}" +) +SCP_BASE_OPTS=(-q "${SSH_BASE_OPTS[@]}") + +RAFTADMIN_LOCAL_BIN="${RAFTADMIN_BIN:-}" +RAFTADMIN_TMP_DIR="" +RAFTADMIN_LINUX_AMD64_BIN="" +RAFTADMIN_LINUX_ARM64_BIN="" +RAFTADMIN_MODULE_DIR="" + +NODE_IDS=() +NODE_HOSTS=() +ROLLING_NODE_IDS=() + +cleanup() { + if [[ -n "$RAFTADMIN_TMP_DIR" && -d "$RAFTADMIN_TMP_DIR" ]]; then + rm -rf "$RAFTADMIN_TMP_DIR" + fi +} + +trap cleanup EXIT + +contains_value() { + local needle="$1" + shift + local v + for v in "$@"; do + if [[ "$v" == "$needle" ]]; then + return 0 + fi + done + return 1 +} + +lookup_mapping() { + local key="$1" + local mapping="$2" + local pair entry_key entry_value + + [[ -n "$mapping" ]] || return 1 + IFS=',' read -r -a pairs <<< "$mapping" + for pair in "${pairs[@]}"; do + pair="${pair//[[:space:]]/}" + [[ -n "$pair" ]] || continue + [[ "$pair" == *=* ]] || continue + entry_key="${pair%%=*}" + entry_value="${pair#*=}" + if [[ "$entry_key" == "$key" ]]; then + printf '%s\n' "$entry_value" + return 0 + fi + done + return 1 +} + +parse_nodes() { + local pair node_id node_host + + IFS=',' read -r -a pairs <<< "$NODES" + for pair in "${pairs[@]}"; do + pair="${pair//[[:space:]]/}" + [[ -n "$pair" ]] || continue + if [[ "$pair" != *=* ]]; then + echo "invalid NODES entry: $pair" >&2 + exit 1 + fi + node_id="${pair%%=*}" + node_host="${pair#*=}" + if [[ -z "$node_id" || -z "$node_host" ]]; then + echo "invalid NODES entry: $pair" >&2 + exit 1 + fi + if contains_value "$node_id" "${NODE_IDS[@]}"; then + echo "duplicate raft ID in NODES: $node_id" >&2 + exit 1 + fi + NODE_IDS+=("$node_id") + NODE_HOSTS+=("$node_host") + done + + if [[ "${#NODE_IDS[@]}" -eq 0 ]]; then + echo "NODES did not contain any nodes" >&2 + exit 1 + fi +} + +node_host_by_id() { + local wanted_id="$1" + local i + + for i in "${!NODE_IDS[@]}"; do + if [[ "${NODE_IDS[$i]}" == "$wanted_id" ]]; then + printf '%s\n' "${NODE_HOSTS[$i]}" + return 0 + fi + done + return 1 +} + +ssh_target_by_id() { + local node_id="$1" + local target + + target="$(lookup_mapping "$node_id" "$SSH_TARGETS" || true)" + if [[ -z "$target" ]]; then + target="$(node_host_by_id "$node_id")" + fi + + if [[ "$target" == *@* ]]; then + printf '%s\n' "$target" + return 0 + fi + printf '%s@%s\n' "$SSH_USER" "$target" +} + +prepare_rolling_order() { + local entry + + if [[ -z "$ROLLING_ORDER" ]]; then + ROLLING_NODE_IDS=("${NODE_IDS[@]}") + return 0 + fi + + IFS=',' read -r -a entries <<< "$ROLLING_ORDER" + for entry in "${entries[@]}"; do + entry="${entry//[[:space:]]/}" + [[ -n "$entry" ]] || continue + if ! contains_value "$entry" "${NODE_IDS[@]}"; then + echo "ROLLING_ORDER references unknown raft ID: $entry" >&2 + exit 1 + fi + if contains_value "$entry" "${ROLLING_NODE_IDS[@]}"; then + echo "ROLLING_ORDER contains duplicate raft ID: $entry" >&2 + exit 1 + fi + ROLLING_NODE_IDS+=("$entry") + done + + if [[ "${#ROLLING_NODE_IDS[@]}" -eq 0 ]]; then + echo "ROLLING_ORDER did not contain any nodes" >&2 + exit 1 + fi +} + +derive_raft_to_redis_map() { + local parts=() + local i + + for i in "${!NODE_IDS[@]}"; do + parts+=("${NODE_HOSTS[$i]}:${RAFT_PORT}=${NODE_HOSTS[$i]}:${REDIS_PORT}") + done + + ( + IFS=, + printf '%s\n' "${parts[*]}" + ) +} + +ensure_local_raftadmin() { + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + if [[ ! -x "$RAFTADMIN_LOCAL_BIN" ]]; then + echo "RAFTADMIN_BIN is not executable: $RAFTADMIN_LOCAL_BIN" >&2 + exit 1 + fi + return 0 + fi + + RAFTADMIN_TMP_DIR="$(mktemp -d)" + echo "[rolling-update] preparing raftadmin helper build workspace" + RAFTADMIN_MODULE_DIR="$( + go mod download -json "github.com/Jille/raftadmin@${RAFTADMIN_VERSION}" | + sed -nE 's/^[[:space:]]*"Dir":[[:space:]]*"([^"]+)",?/\1/p' | + head -n1 + )" + if [[ -z "$RAFTADMIN_MODULE_DIR" || ! -d "$RAFTADMIN_MODULE_DIR" ]]; then + echo "failed to locate raftadmin module source for ${RAFTADMIN_VERSION}" >&2 + exit 1 + fi +} + +build_raftadmin_variant() { + local goos="$1" + local goarch="$2" + local out + + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + printf '%s\n' "$RAFTADMIN_LOCAL_BIN" + return 0 + fi + + out="${RAFTADMIN_TMP_DIR}/raftadmin-${goos}-${goarch}" + if [[ -x "$out" ]]; then + printf '%s\n' "$out" + return 0 + fi + + echo "[rolling-update] building raftadmin helper for ${goos}/${goarch}" >&2 + CGO_ENABLED=0 GOOS="$goos" GOARCH="$goarch" \ + go build -C "$RAFTADMIN_MODULE_DIR" -o "$out" ./cmd/raftadmin + chmod +x "$out" + printf '%s\n' "$out" +} + +ensure_remote_raftadmin_binaries() { + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + return 0 + fi + RAFTADMIN_LINUX_AMD64_BIN="$(build_raftadmin_variant linux amd64)" + RAFTADMIN_LINUX_ARM64_BIN="$(build_raftadmin_variant linux arm64)" +} + +copy_raftadmin_to_remote() { + local node_id="$1" + local ssh_target="$2" + + echo "==> [helper@${node_id}] copying raftadmin" + + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LOCAL_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}" + else + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LINUX_AMD64_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}-amd64" + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LINUX_ARM64_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}-arm64" + fi + + ssh "${SSH_BASE_OPTS[@]}" "$ssh_target" \ + RAFTADMIN_REMOTE_BIN="$RAFTADMIN_REMOTE_BIN" \ + HAS_CUSTOM_RAFTADMIN_BIN="${RAFTADMIN_LOCAL_BIN:+1}" \ + 'bash -s' <<'REMOTE_HELPER' +set -euo pipefail + +if [[ -n "${HAS_CUSTOM_RAFTADMIN_BIN:-}" ]]; then + chmod +x "$RAFTADMIN_REMOTE_BIN" + exit 0 +fi + +case "$(uname -m)" in + x86_64|amd64) + cp "${RAFTADMIN_REMOTE_BIN}-amd64" "$RAFTADMIN_REMOTE_BIN" + ;; + aarch64|arm64) + cp "${RAFTADMIN_REMOTE_BIN}-arm64" "$RAFTADMIN_REMOTE_BIN" + ;; + *) + echo "unsupported remote architecture: $(uname -m)" >&2 + exit 1 + ;; +esac + +chmod +x "$RAFTADMIN_REMOTE_BIN" +REMOTE_HELPER +} + +update_one_node() { + local node_id="$1" + local node_host="$2" + local ssh_target="$3" + local all_node_ids_csv all_node_hosts_csv + + all_node_ids_csv="$(IFS=,; echo "${NODE_IDS[*]}")" + all_node_hosts_csv="$(IFS=,; echo "${NODE_HOSTS[*]}")" + + echo "==> [$node_id@$node_host] start" + + copy_raftadmin_to_remote "$node_id" "$ssh_target" + + ssh "${SSH_BASE_OPTS[@]}" "$ssh_target" \ + IMAGE="$IMAGE" \ + RAFTADMIN_BIN_PATH="$RAFTADMIN_REMOTE_BIN" \ + CONTAINER_NAME="$CONTAINER_NAME" \ + DATA_DIR="$DATA_DIR" \ + SERVER_ENTRYPOINT="$SERVER_ENTRYPOINT" \ + RAFT_PORT="$RAFT_PORT" \ + REDIS_PORT="$REDIS_PORT" \ + DYNAMO_PORT="$DYNAMO_PORT" \ + HEALTH_TIMEOUT_SECONDS="$HEALTH_TIMEOUT_SECONDS" \ + LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="$LEADERSHIP_TRANSFER_TIMEOUT_SECONDS" \ + LEADER_DISCOVERY_TIMEOUT_SECONDS="$LEADER_DISCOVERY_TIMEOUT_SECONDS" \ + RAFTADMIN_RPC_TIMEOUT_SECONDS="$RAFTADMIN_RPC_TIMEOUT_SECONDS" \ + NODE_ID="$node_id" \ + NODE_HOST="$node_host" \ + ALL_NODE_IDS_CSV="$all_node_ids_csv" \ + ALL_NODE_HOSTS_CSV="$all_node_hosts_csv" \ + RAFT_TO_REDIS_MAP="$RAFT_TO_REDIS_MAP" \ + 'bash -s' <<'REMOTE' +set -euo pipefail + +IFS=, read -r -a ALL_NODE_IDS <<< "$ALL_NODE_IDS_CSV" +IFS=, read -r -a ALL_NODE_HOSTS <<< "$ALL_NODE_HOSTS_CSV" + +grpc_healthy() { + bash -lc "exec 3<>/dev/tcp/${NODE_HOST}/${RAFT_PORT}" 2>/dev/null +} + +peer_grpc_healthy() { + local peer_host="$1" + bash -lc "exec 3<>/dev/tcp/${peer_host}/${RAFT_PORT}" 2>/dev/null +} + +wait_for_grpc() { + local i + for ((i = 0; i < HEALTH_TIMEOUT_SECONDS; i++)); do + if grpc_healthy; then + return 0 + fi + sleep 1 + done + return 1 +} + +extract_proto_string() { + local field="$1" + local payload="$2" + + printf '%s' "$payload" | + sed -nE "s/.*${field}:[[:space:]]+\"([^\"]*)\".*/\1/p" | + tail -n1 +} + +extract_proto_enum() { + local field="$1" + local payload="$2" + + printf '%s' "$payload" | + sed -nE "s/.*${field}:[[:space:]]+([A-Z_]+).*/\1/p" | + tail -n1 +} + +raftadmin_text() { + local addr="$1" + shift + + if command -v timeout >/dev/null 2>&1; then + timeout "${RAFTADMIN_RPC_TIMEOUT_SECONDS}s" "$RAFTADMIN_BIN_PATH" "$addr" "$@" 2>&1 + return $? + fi + + "$RAFTADMIN_BIN_PATH" "$addr" "$@" 2>&1 +} + +raft_leader_addr() { + local addr="$1" + local output + + output="$(raftadmin_text "$addr" leader)" || return 1 + extract_proto_string "address" "$output" +} + +raft_state() { + local addr="$1" + local output + local state + + output="$(raftadmin_text "$addr" state)" || return 1 + state="$(extract_proto_enum "state" "$output")" + if [[ -z "$state" ]]; then + printf '%s\n' "FOLLOWER" + return 0 + fi + printf '%s\n' "$state" +} + +cluster_leader_addr() { + local i addr state leader + + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + state="$(raft_state "$addr" || true)" + if [[ "$state" == "LEADER" ]]; then + printf '%s\n' "$addr" + return 0 + fi + done + + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + leader="$(raft_leader_addr "$addr" || true)" + if [[ -n "$leader" ]]; then + printf '%s\n' "$leader" + return 0 + fi + done + + return 1 +} + +wait_for_cluster_leader() { + local i leader + + for ((i = 0; i < LEADER_DISCOVERY_TIMEOUT_SECONDS; i++)); do + leader="$(cluster_leader_addr || true)" + if [[ -n "$leader" ]]; then + printf '%s\n' "$leader" + return 0 + fi + sleep 1 + done + + return 1 +} + +cluster_reachability_summary() { + local i addr summary reachable state + + summary=() + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + if peer_grpc_healthy "${ALL_NODE_HOSTS[$i]}"; then + reachable="up" + state="$(raft_state "$addr" || echo unknown)" + else + reachable="down" + state="unreachable" + fi + summary+=("${ALL_NODE_IDS[$i]}=${addr}:${reachable}:${state}") + done + + printf '%s\n' "${summary[*]}" +} + +choose_transfer_candidate() { + local i + + for i in "${!ALL_NODE_IDS[@]}"; do + if [[ "${ALL_NODE_IDS[$i]}" == "$NODE_ID" ]]; then + continue + fi + if peer_grpc_healthy "${ALL_NODE_HOSTS[$i]}"; then + printf '%s %s\n' "${ALL_NODE_IDS[$i]}" "${ALL_NODE_HOSTS[$i]}" + return 0 + fi + done + + return 1 +} + +wait_for_leader_change() { + local old_leader="$1" + local expected_leader="${2:-}" + local i leader + + for ((i = 0; i < LEADERSHIP_TRANSFER_TIMEOUT_SECONDS; i++)); do + leader="$(cluster_leader_addr || true)" + if [[ -n "$leader" && "$leader" != "$old_leader" ]]; then + if [[ -n "$expected_leader" && "$leader" != "$expected_leader" ]]; then + echo "leadership moved away from $old_leader, but elected $leader instead of preferred $expected_leader" + else + echo "leadership moved from $old_leader to $leader" + fi + return 0 + fi + sleep 1 + done + + return 1 +} + +ensure_not_leader_before_restart() { + local current_leader candidate_id candidate_host candidate_addr rpc_output local_state + + current_leader="$(wait_for_cluster_leader || true)" + if [[ -z "$current_leader" ]]; then + local_state="$(raft_state "${NODE_HOST}:${RAFT_PORT}" || echo unknown)" + echo "unable to determine current cluster leader within ${LEADER_DISCOVERY_TIMEOUT_SECONDS}s; refusing to restart $NODE_ID safely" >&2 + echo "local raft state on ${NODE_HOST}:${RAFT_PORT}: ${local_state}" >&2 + echo "cluster reachability: $(cluster_reachability_summary)" >&2 + return 1 + fi + + if [[ "$current_leader" != "${NODE_HOST}:${RAFT_PORT}" ]]; then + echo "node is not leader ($current_leader); safe to restart" + return 0 + fi + + if ! grpc_healthy; then + echo "node is current leader but its local gRPC endpoint is unreachable; refusing restart" >&2 + return 1 + fi + + if ! read -r candidate_id candidate_host < <(choose_transfer_candidate); then + echo "node is leader but no healthy peer is available as transfer target" >&2 + return 1 + fi + candidate_addr="${candidate_host}:${RAFT_PORT}" + + echo "node is leader; transferring leadership to ${candidate_id}@${candidate_addr}" + rpc_output="$(raftadmin_text "${NODE_HOST}:${RAFT_PORT}" leadership_transfer_to_server "${candidate_id}" "${candidate_addr}")" || { + echo "targeted leadership transfer RPC failed: $rpc_output" >&2 + echo "falling back to generic leadership transfer" + rpc_output="$(raftadmin_text "${NODE_HOST}:${RAFT_PORT}" leadership_transfer)" || { + echo "generic leadership transfer RPC failed: $rpc_output" >&2 + return 1 + } + candidate_addr="" + } + + if ! wait_for_leader_change "${NODE_HOST}:${RAFT_PORT}" "$candidate_addr"; then + echo "leadership did not move away from ${NODE_HOST}:${RAFT_PORT} within ${LEADERSHIP_TRANSFER_TIMEOUT_SECONDS}s" >&2 + return 1 + fi + + return 0 +} + +stop_container() { + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true +} + +run_container() { + docker run -d \ + --name "$CONTAINER_NAME" \ + --restart unless-stopped \ + --network host \ + -v "$DATA_DIR:$DATA_DIR" \ + "$IMAGE" "$SERVER_ENTRYPOINT" \ + --address "${NODE_HOST}:${RAFT_PORT}" \ + --redisAddress "${NODE_HOST}:${REDIS_PORT}" \ + --dynamoAddress "${NODE_HOST}:${DYNAMO_PORT}" \ + --raftId "$NODE_ID" \ + --raftDataDir "$DATA_DIR" \ + --raftRedisMap "$RAFT_TO_REDIS_MAP" >/dev/null +} + +archive_legacy_dir() { + local dir="$1" + local ts backup_dir moved + + moved=0 + ts="$(date -u +%Y%m%dT%H%M%SZ)" + backup_dir="${dir%/}/legacy-boltdb-${ts}" + + sudo mkdir -p "$backup_dir" + for name in logs.dat stable.dat; do + if sudo test -e "$dir/$name"; then + sudo mv "$dir/$name" "$backup_dir/$name" + moved=1 + fi + done + + if [[ "$moved" -eq 1 ]]; then + echo "archived legacy raft files from $dir to $backup_dir; node will resync from the cluster" + return 0 + fi + + sudo rmdir "$backup_dir" 2>/dev/null || true + return 1 +} + +archive_default_legacy_dir() { + local node_data_dir + + node_data_dir="${DATA_DIR%/}/${NODE_ID}" + if sudo test -d "$node_data_dir"; then + archive_legacy_dir "$node_data_dir" || true + sudo rm -rf "${node_data_dir}/raft.db.migrating" 2>/dev/null || true + fi +} + +archive_legacy_dirs_from_logs() { + local logs="$1" + local found=0 + local dir + + while IFS= read -r dir; do + [[ -n "$dir" ]] || continue + archive_legacy_dir "$dir" || true + sudo rm -rf "${dir}/raft.db.migrating" 2>/dev/null || true + found=1 + done < <( + printf '%s\n' "$logs" | + sed -nE 's/.*legacy boltdb Raft storage "[^"]+" found in ([^;]+);.*/\1/p' | + sort -u + ) + + [[ "$found" -eq 1 ]] +} + +docker pull "$IMAGE" >/dev/null +new_image_id="$(docker image inspect "$IMAGE" --format "{{.Id}}")" +running_image_id="$(docker inspect --format "{{.Image}}" "$CONTAINER_NAME" 2>/dev/null || true)" +running_status="$(docker inspect --format "{{.State.Status}}" "$CONTAINER_NAME" 2>/dev/null || echo missing)" + +if [[ "$new_image_id" == "$running_image_id" && "$running_status" == "running" ]]; then + if grpc_healthy; then + echo "image unchanged and gRPC healthy; skip" + exit 0 + fi + echo "container is running but gRPC is not reachable; recreating" +fi + +sudo mkdir -p "$DATA_DIR" +if [[ "$running_status" == "running" ]]; then + ensure_not_leader_before_restart +fi +stop_container +archive_default_legacy_dir +run_container + +if ! wait_for_grpc; then + logs="$(docker logs --tail 200 "$CONTAINER_NAME" 2>&1 || true)" + if printf '%s\n' "$logs" | grep -q 'legacy boltdb Raft storage'; then + echo "detected legacy BoltDB raft storage in container logs; archiving and retrying" + stop_container + if archive_legacy_dirs_from_logs "$logs"; then + run_container + if wait_for_grpc; then + echo "updated successfully" + exit 0 + fi + echo "gRPC port did not come up on ${NODE_HOST}:${RAFT_PORT} after legacy cleanup retry" >&2 + docker logs --tail 200 "$CONTAINER_NAME" || true + exit 1 + fi + fi + + echo "gRPC port did not come up on ${NODE_HOST}:${RAFT_PORT}" >&2 + printf '%s\n' "$logs" >&2 + exit 1 +fi + +echo "updated successfully" +REMOTE + + echo "==> [$node_id@$node_host] done" +} + +parse_nodes +prepare_rolling_order + +if [[ -z "$RAFT_TO_REDIS_MAP" ]]; then + RAFT_TO_REDIS_MAP="$(derive_raft_to_redis_map)" +fi + +ensure_local_raftadmin +ensure_remote_raftadmin_binaries + +echo "[rolling-update] target image: $IMAGE" +for node_id in "${ROLLING_NODE_IDS[@]}"; do + update_one_node "$node_id" "$(node_host_by_id "$node_id")" "$(ssh_target_by_id "$node_id")" + sleep "$ROLLING_DELAY_SECONDS" +done + +echo "[rolling-update] all nodes completed" From 14ae0736474818924c22f26b6c5f1d2dbe3ea752 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 20 Mar 2026 19:30:09 +0900 Subject: [PATCH 6/8] Update scripts/rolling-update.sh Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/rolling-update.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/rolling-update.sh b/scripts/rolling-update.sh index 1b468a65..eb7769f3 100755 --- a/scripts/rolling-update.sh +++ b/scripts/rolling-update.sh @@ -343,6 +343,9 @@ case "$(uname -m)" in esac chmod +x "$RAFTADMIN_REMOTE_BIN" + +# Clean up architecture-specific helper binaries after installing the final binary. +rm -f "${RAFTADMIN_REMOTE_BIN}-amd64" "${RAFTADMIN_REMOTE_BIN}-arm64" REMOTE_HELPER } From 16b8ad2cee3f85d04b799a2549ca9396e47f3e3d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 10:30:24 +0000 Subject: [PATCH 7/8] Initial plan From e0994d54a2b719e2ec9f428f2808689382cbeabb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Mar 2026 10:37:01 +0000 Subject: [PATCH 8/8] Address review feedback: sudo -n, env SSH vars, filepath.Clean Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> --- go.mod | 2 +- internal/raftstore/migrate.go | 2 ++ scripts/rolling-update.sh | 61 ++++++++++++++++++++--------------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 5e024a17..47457faa 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/emirpasic/gods v1.18.1 github.com/getsentry/sentry-go v0.27.0 github.com/hashicorp/go-hclog v1.6.3 + github.com/hashicorp/go-msgpack/v2 v2.1.2 github.com/hashicorp/raft v1.7.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 @@ -66,7 +67,6 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect - github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/internal/raftstore/migrate.go b/internal/raftstore/migrate.go index b37b5fc3..0f816bab 100644 --- a/internal/raftstore/migrate.go +++ b/internal/raftstore/migrate.go @@ -57,6 +57,8 @@ func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) return "", errors.New("destination dir is required") } + destDir = filepath.Clean(destDir) + if err := requireExistingFile(logsPath); err != nil { return "", err } diff --git a/scripts/rolling-update.sh b/scripts/rolling-update.sh index eb7769f3..eab45a59 100755 --- a/scripts/rolling-update.sh +++ b/scripts/rolling-update.sh @@ -363,24 +363,25 @@ update_one_node() { copy_raftadmin_to_remote "$node_id" "$ssh_target" ssh "${SSH_BASE_OPTS[@]}" "$ssh_target" \ - IMAGE="$IMAGE" \ - RAFTADMIN_BIN_PATH="$RAFTADMIN_REMOTE_BIN" \ - CONTAINER_NAME="$CONTAINER_NAME" \ - DATA_DIR="$DATA_DIR" \ - SERVER_ENTRYPOINT="$SERVER_ENTRYPOINT" \ - RAFT_PORT="$RAFT_PORT" \ - REDIS_PORT="$REDIS_PORT" \ - DYNAMO_PORT="$DYNAMO_PORT" \ - HEALTH_TIMEOUT_SECONDS="$HEALTH_TIMEOUT_SECONDS" \ - LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="$LEADERSHIP_TRANSFER_TIMEOUT_SECONDS" \ - LEADER_DISCOVERY_TIMEOUT_SECONDS="$LEADER_DISCOVERY_TIMEOUT_SECONDS" \ - RAFTADMIN_RPC_TIMEOUT_SECONDS="$RAFTADMIN_RPC_TIMEOUT_SECONDS" \ - NODE_ID="$node_id" \ - NODE_HOST="$node_host" \ - ALL_NODE_IDS_CSV="$all_node_ids_csv" \ - ALL_NODE_HOSTS_CSV="$all_node_hosts_csv" \ - RAFT_TO_REDIS_MAP="$RAFT_TO_REDIS_MAP" \ - 'bash -s' <<'REMOTE' + env \ + IMAGE="$IMAGE" \ + RAFTADMIN_BIN_PATH="$RAFTADMIN_REMOTE_BIN" \ + CONTAINER_NAME="$CONTAINER_NAME" \ + DATA_DIR="$DATA_DIR" \ + SERVER_ENTRYPOINT="$SERVER_ENTRYPOINT" \ + RAFT_PORT="$RAFT_PORT" \ + REDIS_PORT="$REDIS_PORT" \ + DYNAMO_PORT="$DYNAMO_PORT" \ + HEALTH_TIMEOUT_SECONDS="$HEALTH_TIMEOUT_SECONDS" \ + LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="$LEADERSHIP_TRANSFER_TIMEOUT_SECONDS" \ + LEADER_DISCOVERY_TIMEOUT_SECONDS="$LEADER_DISCOVERY_TIMEOUT_SECONDS" \ + RAFTADMIN_RPC_TIMEOUT_SECONDS="$RAFTADMIN_RPC_TIMEOUT_SECONDS" \ + NODE_ID="$node_id" \ + NODE_HOST="$node_host" \ + ALL_NODE_IDS_CSV="$all_node_ids_csv" \ + ALL_NODE_HOSTS_CSV="$all_node_hosts_csv" \ + RAFT_TO_REDIS_MAP="$RAFT_TO_REDIS_MAP" \ + bash -s <<'REMOTE' set -euo pipefail IFS=, read -r -a ALL_NODE_IDS <<< "$ALL_NODE_IDS_CSV" @@ -619,6 +620,13 @@ run_container() { --raftRedisMap "$RAFT_TO_REDIS_MAP" >/dev/null } +require_passwordless_sudo() { + if ! sudo -n true 2>/dev/null; then + echo "error: passwordless sudo is required on this host; configure NOPASSWD sudo for the remote user" >&2 + exit 1 + fi +} + archive_legacy_dir() { local dir="$1" local ts backup_dir moved @@ -627,10 +635,10 @@ archive_legacy_dir() { ts="$(date -u +%Y%m%dT%H%M%SZ)" backup_dir="${dir%/}/legacy-boltdb-${ts}" - sudo mkdir -p "$backup_dir" + sudo -n mkdir -p "$backup_dir" for name in logs.dat stable.dat; do - if sudo test -e "$dir/$name"; then - sudo mv "$dir/$name" "$backup_dir/$name" + if sudo -n test -e "$dir/$name"; then + sudo -n mv "$dir/$name" "$backup_dir/$name" moved=1 fi done @@ -640,7 +648,7 @@ archive_legacy_dir() { return 0 fi - sudo rmdir "$backup_dir" 2>/dev/null || true + sudo -n rmdir "$backup_dir" 2>/dev/null || true return 1 } @@ -648,9 +656,9 @@ archive_default_legacy_dir() { local node_data_dir node_data_dir="${DATA_DIR%/}/${NODE_ID}" - if sudo test -d "$node_data_dir"; then + if sudo -n test -d "$node_data_dir"; then archive_legacy_dir "$node_data_dir" || true - sudo rm -rf "${node_data_dir}/raft.db.migrating" 2>/dev/null || true + sudo -n rm -rf "${node_data_dir}/raft.db.migrating" 2>/dev/null || true fi } @@ -662,7 +670,7 @@ archive_legacy_dirs_from_logs() { while IFS= read -r dir; do [[ -n "$dir" ]] || continue archive_legacy_dir "$dir" || true - sudo rm -rf "${dir}/raft.db.migrating" 2>/dev/null || true + sudo -n rm -rf "${dir}/raft.db.migrating" 2>/dev/null || true found=1 done < <( printf '%s\n' "$logs" | @@ -686,7 +694,8 @@ if [[ "$new_image_id" == "$running_image_id" && "$running_status" == "running" ] echo "container is running but gRPC is not reachable; recreating" fi -sudo mkdir -p "$DATA_DIR" +require_passwordless_sudo +sudo -n mkdir -p "$DATA_DIR" if [[ "$running_status" == "running" ]]; then ensure_not_leader_before_restart fi