fix(proxy): retry secondary writes on "read timestamp has been compacted"#528
fix(proxy): retry secondary writes on "read timestamp has been compacted"#528
Conversation
…ted" EVALSHA replays to the ElasticKV secondary can surface FailedPrecondition / "read timestamp has been compacted" when the script's startTS falls behind a peer node's MinRetainedTS (the local readPin only protects the node that picked the timestamp). Each retry re-sends the command so the secondary re-selects a fresh read snapshot.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 48 minutes and 13 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds leader-aware Redis backend that discovers and routes to the current Raft leader, introduces retry-with-jittered-exponential-backoff for secondary dual-writes on "read timestamp has been compacted" errors, expands INFO replication fields, updates proxy CLI parsing for secondary seeds, and adds tests for these behaviors. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant LeaderAwareBackend
participant SeedA
participant SeedB
participant CurrentLeader
Client->>LeaderAwareBackend: Do(command)
alt cached leader client exists
LeaderAwareBackend->>CurrentLeader: Forward command
CurrentLeader-->>LeaderAwareBackend: OK/Reply
LeaderAwareBackend-->>Client: Reply
else no leader cached / refresh needed
LeaderAwareBackend->>SeedA: INFO replication (probe)
SeedA-->>LeaderAwareBackend: "raft_leader_redis: host:port"
LeaderAwareBackend->>SeedB: INFO replication (probe)
SeedB-->>LeaderAwareBackend: "raft_leader_redis: host:port"
LeaderAwareBackend->>LeaderAwareBackend: update cached leader client
LeaderAwareBackend->>CurrentLeader: Forward command
CurrentLeader-->>LeaderAwareBackend: OK/Reply
LeaderAwareBackend-->>Client: Reply
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
proxy/dualwrite.go (1)
272-297: Add retry observability so compaction pressure is visible to operators.The retry loop silently swallows
isReadTSCompactedErroroccurrences — a successful retry is recorded as a plainokinwriteSecondary, and a bounded failure is only counted as a genericSecondaryWriteErrors. That makes this PR's failure mode indistinguishable from unrelated secondary failures in dashboards and logs, which defeats the diagnostic value of the fix when the waterline starts advancing faster than expected.Consider emitting a structured
slogline per retry (or at least on the final give-up) and/or bumping a dedicated counter soMinRetainedTSpressure can be alerted on. Keys should follow the project convention (cmd,attempt,err).♻️ Sketch
func (d *DualWriter) executeSecondary(sCtx context.Context, cmd string, iArgs []any) error { backoff := compactedRetryInitialBackoff var sErr error for attempt := 0; ; attempt++ { result := d.secondary.Do(sCtx, iArgs...) _, sErr = result.Result() if isNoScriptError(sErr) { if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok { result = d.secondary.Do(sCtx, fallbackArgs...) _, sErr = result.Result() } } if !isReadTSCompactedError(sErr) { return sErr } if attempt >= maxCompactedRetries { + d.logger.Warn("secondary compacted-read retries exhausted", + "cmd", cmd, "attempts", attempt+1, "err", sErr) return sErr } + d.logger.Debug("secondary compacted-read, retrying", + "cmd", cmd, "attempt", attempt, "backoff", backoff) select { case <-sCtx.Done(): return sErr case <-time.After(backoff): } backoff *= 2 } }As per coding guidelines: "Use
slogfor logging and maintain structured logging keys (e.g.,key,commit_ts)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/dualwrite.go` around lines 272 - 297, The retry loop in DualWriter.executeSecondary currently swallows isReadTSCompactedError occurrences making compaction-pressure invisible; modify executeSecondary to emit structured slog entries (use slog.*) on each retry and on the final give-up that include keys cmd, attempt, and err (and any context like backoff), and also increment a dedicated metric/counter (e.g., SecondaryCompactionRetryCounter) when a retry happens and when the maxCompactedRetries is reached; keep the existing retry/backoff logic (compactedRetryInitialBackoff, maxCompactedRetries) but add the logging/metric updates so writeSecondary callers can distinguish compaction retries/failures from generic SecondaryWriteErrors.proxy/proxy_test.go (1)
923-975: Optional: consolidate the two retry tests into a table-driven case.The two new tests share nearly identical setup and only differ in (a) the secondary's
doFuncbehavior and (b) the expectedcalls/SecondaryWriteErrorsvalues. A table-driven form would cover both scenarios plus future cases (e.g., context cancellation mid-retry, NOSCRIPT+compacted interleaving) with less duplication.♻️ Sketch
func TestDualWriter_writeSecondary_ReadTSCompacted(t *testing.T) { longErr := testRedisErr("<string>:71: rpc error: code = FailedPrecondition desc = ... read timestamp has been compacted ...") shortErr := testRedisErr("rpc error: code = FailedPrecondition desc = read timestamp has been compacted") tests := []struct { name string makeDoFunc func() func(ctx context.Context, args ...any) *redis.Cmd wantCalls int wantWriteErrs float64 }{ { name: "retries until success", makeDoFunc: func() func(context.Context, ...any) *redis.Cmd { var calls int return func(ctx context.Context, args ...any) *redis.Cmd { calls++ cmd := redis.NewCmd(ctx, args...) if calls < 3 { cmd.SetErr(longErr); return cmd } cmd.SetVal("OK"); return cmd } }, wantCalls: 3, wantWriteErrs: 0, }, { name: "bounded when persistent", makeDoFunc: func() func(context.Context, ...any) *redis.Cmd { return makeCmd(nil, shortErr) }, wantCalls: maxCompactedRetries + 1, wantWriteErrs: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { /* ... */ }) } }As per coding guidelines: "Unit tests must be co-located with packages in
*_test.gofiles; prefer table-driven test cases".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/proxy_test.go` around lines 923 - 975, The two tests TestDualWriter_writeSecondary_RetriesReadTSCompacted and TestDualWriter_writeSecondary_ReadTSCompactedRetriesAreBounded should be merged into a single table-driven test to remove duplication: create a slice of test cases each with a name, a makeDoFunc that returns the secondary.doFunc (use the existing testRedisErr, makeCmd, and long/short error variations), expected call count and expected SecondaryWriteErrors; in the loop t.Run set up primary := newMockBackend(...), secondary := newMockBackend(...); assign secondary.doFunc = tt.makeDoFunc(); create metrics := newTestMetrics() and d := NewDualWriter(...), call d.writeSecondary("EVALSHA", ...), then assert secondary.CallCount() equals tt.wantCalls and testutil.ToFloat64(metrics.SecondaryWriteErrors) equals tt.wantWriteErrs; reference NewDualWriter, writeSecondary, newMockBackend, makeCmd, testRedisErr, maxCompactedRetries, and metrics.SecondaryWriteErrors to locate code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@proxy/dualwrite.go`:
- Around line 272-297: The retry loop in DualWriter.executeSecondary currently
swallows isReadTSCompactedError occurrences making compaction-pressure
invisible; modify executeSecondary to emit structured slog entries (use slog.*)
on each retry and on the final give-up that include keys cmd, attempt, and err
(and any context like backoff), and also increment a dedicated metric/counter
(e.g., SecondaryCompactionRetryCounter) when a retry happens and when the
maxCompactedRetries is reached; keep the existing retry/backoff logic
(compactedRetryInitialBackoff, maxCompactedRetries) but add the logging/metric
updates so writeSecondary callers can distinguish compaction retries/failures
from generic SecondaryWriteErrors.
In `@proxy/proxy_test.go`:
- Around line 923-975: The two tests
TestDualWriter_writeSecondary_RetriesReadTSCompacted and
TestDualWriter_writeSecondary_ReadTSCompactedRetriesAreBounded should be merged
into a single table-driven test to remove duplication: create a slice of test
cases each with a name, a makeDoFunc that returns the secondary.doFunc (use the
existing testRedisErr, makeCmd, and long/short error variations), expected call
count and expected SecondaryWriteErrors; in the loop t.Run set up primary :=
newMockBackend(...), secondary := newMockBackend(...); assign secondary.doFunc =
tt.makeDoFunc(); create metrics := newTestMetrics() and d := NewDualWriter(...),
call d.writeSecondary("EVALSHA", ...), then assert secondary.CallCount() equals
tt.wantCalls and testutil.ToFloat64(metrics.SecondaryWriteErrors) equals
tt.wantWriteErrs; reference NewDualWriter, writeSecondary, newMockBackend,
makeCmd, testRedisErr, maxCompactedRetries, and metrics.SecondaryWriteErrors to
locate code to change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 43391e89-ca84-4994-9460-355d595bbfc5
📒 Files selected for processing (2)
proxy/dualwrite.goproxy/proxy_test.go
There was a problem hiding this comment.
Code Review
This pull request introduces a retry mechanism in the DualWriter to handle cases where a secondary backend returns a "read timestamp has been compacted" error. It refactors the secondary write logic into a new executeSecondary method that implements exponential backoff and maintains the existing NOSCRIPT to EVAL fallback. New unit tests verify that retries are performed correctly and are properly bounded. The review feedback suggests optimizing the retry loop by persisting fallback arguments to avoid redundant NOSCRIPT errors in subsequent attempts and adding jitter to the backoff logic to prevent thundering herd issues.
INFO replication on an ElasticKV node now reports its actual role (master/slave) and a new raft_leader_redis field pointing at the Redis address of the current Raft leader. The proxy wraps the ElasticKV backend in LeaderAwareRedisBackend, which polls INFO every 2s and routes every Do/Pipeline/PubSub through the discovered leader, falling back to the configured seeds when a probe fails. -secondary now accepts a comma-separated seed list.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a leader-aware Redis backend that dynamically discovers the current Raft leader via INFO replication and routes commands accordingly. It also implements a retry mechanism for secondary writes that fail due to compacted read timestamps. Review feedback suggests addressing potential timer leaks in retry loops, improving context propagation for graceful shutdowns in background loops, and logging errors during resource cleanup.
- proxy/dualwrite.go: preserve NOSCRIPT→EVAL fallback across compacted retries so each attempt after the first skips the known-missing EVALSHA; add jitter + bounded exponential backoff; use time.NewTimer so the timer is released on ctx cancellation; tag the two Result() reads with the same nolint:wrapcheck used elsewhere. - proxy/leader_aware_backend.go: derive a cancellable context from stopCh so Close() interrupts in-flight INFO probes instead of waiting for refreshTimeout, check ctx.Err() between candidates, and log per-client Close errors. - proxy/leader_aware_backend_test.go: replace net.Listen with net.ListenConfig.Listen (noctx linter). - adapter/redis_info_test.go: gofmt. - proxy/proxy_test.go: regression test that the NOSCRIPT resolution is reused across compacted retries.
…dary wrapcheck flags the return statements where the external redis error crosses the function boundary, not the Result() call. Tag each of the three early returns to match the existing writeSecondary pattern in this file.
- wrapSecondaryError wraps the secondary error with %w so wrapcheck is satisfied; errors.Is still reaches redis.Nil / redis.Error through the wrap, so writeSecondary's classification and Sentry fingerprint are unaffected. - Extract compactedRetryBackoffFactor and compactedRetryJitterDivisor constants so mnd stops flagging the bare 2s in the backoff helpers. - Tests: check the type assertion on args[0] before switching on it so forcetypeassert is satisfied.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a LeaderAwareRedisBackend that dynamically discovers the Raft leader via INFO replication probes, enabling the proxy to route traffic to the correct node. It also implements a retry mechanism in the DualWriter to handle transient "read timestamp has been compacted" errors from the secondary backend. Review feedback suggests adding debug logging for these retries to improve observability, using a time.Timer instead of a time.Ticker in the refresh loop to prevent overlapping probes, and adding warning logs when no leader is discovered.
- Inline the compacted-retry loop back into writeSecondary so the raw redis error never crosses a function boundary. Removes the need for nolint:wrapcheck and the wrapSecondaryError helper, while preserving errors.Is(sErr, redis.Nil), Sentry fingerprinting, and the original error message in the structured log. - Replace math/rand/v2 jitter with a crypto/rand helper (jitterFor) that gosec accepts without nolint:gosec. - Add a Debug log before each compacted-retry backoff for visibility into how often the secondary re-probes for a fresh snapshot. - refreshLoop: switch from time.Ticker to a drained time.Timer with Reset so a probe that runs longer than refreshInterval can't cause back-to-back probes once it returns. - refreshLeader: Warn (only when ctx is still alive) when no candidate advertised a leader, so an unstable cluster is visible in logs.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements dynamic leader discovery for the ElasticKV secondary backend via a new LeaderAwareRedisBackend, which polls INFO replication to route commands to the current Raft leader. Additionally, it introduces a jittered exponential backoff retry mechanism in the DualWriter to handle compacted read timestamp errors. Review feedback highlights critical thread-safety and resource management improvements in the leader-aware backend, including the need for safe map iteration during shutdown, preventing client instantiation after the backend is closed, and ensuring atomic state checks for the closed status.
| b.mu.Lock() | ||
| clients := b.clients | ||
| b.mu.Unlock() |
There was a problem hiding this comment.
Iterating over the b.clients map while it can be concurrently modified will cause a panic. You should copy the map under a lock to ensure a safe snapshot for closing. Furthermore, per repository rules, do not silently ignore errors from Close() methods; log them to ensure resource leaks or cleanup problems are visible.
b.mu.Lock()
clients := make(map[string]*redis.Client, len(b.clients))
for k, v := range b.clients {
clients[k] = v
}
b.mu.Unlock()
for _, client := range clients {
if err := client.Close(); err != nil {
log.Printf("failed to close redis client: %v", err)
}
}References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
| func (b *LeaderAwareRedisBackend) ensureClientLocked(addr string) *redis.Client { | ||
| if cli, ok := b.clients[addr]; ok { | ||
| return cli | ||
| } |
There was a problem hiding this comment.
To prevent resource leaks during or after shutdown, ensureClientLocked must check if the backend is already closed before creating a new redis.Client. Without this check, a command arriving during Close() could instantiate a new client that is never cleaned up.
func (b *LeaderAwareRedisBackend) ensureClientLocked(addr string) *redis.Client {
if cli, ok := b.clients[addr]; ok {
return cli
}
if b.closed {
return nil
}| func (b *LeaderAwareRedisBackend) currentClient() *redis.Client { | ||
| b.mu.RLock() | ||
| cli, ok := b.clients[b.leader] | ||
| b.mu.RUnlock() | ||
| if ok && cli != nil { |
There was a problem hiding this comment.
The currentClient method should check the b.closed state while holding the lock. According to repository rules, when performing operations based on a state variable, do not release the lock between reading the variable and performing the conditional check to prevent race conditions and the use of stale data.
| func (b *LeaderAwareRedisBackend) currentClient() *redis.Client { | |
| b.mu.RLock() | |
| cli, ok := b.clients[b.leader] | |
| b.mu.RUnlock() | |
| if ok && cli != nil { | |
| func (b *LeaderAwareRedisBackend) currentClient() *redis.Client { | |
| b.mu.RLock() | |
| defer b.mu.RUnlock() | |
| if b.closed { | |
| return nil | |
| } | |
| return b.clients[b.leader] |
References
- When performing atomic operations based on a version or state variable, do not release the lock between reading the variable and performing the conditional check to prevent race conditions and the use of stale data.
Three concurrency issues surfaced by code review: - ensureClientLocked now returns nil when b.closed is set, so a Do() that races Close() cannot instantiate a fresh redis.Client that will never be cleaned up. - currentClient holds the RLock for the whole read (closed flag + clients[leader]) instead of releasing it between the two, closing the TOCTOU window where Close() could run mid-lookup. - Close snapshots the clients map under the lock and swaps in an empty map before iterating, so Close's cli.Close() loop can never race a concurrent ensureClientLocked map write. setLeader and probeLeader also bail out cleanly when closed, to avoid spurious log lines and nil-client dereferences during shutdown. Regression guard TestLeaderAwareRedisBackend_ConcurrentCloseIsRaceFree runs Do() from 8 goroutines while Close() fires, and passes under `go test -race`.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a leader-aware Redis backend that automatically discovers and follows the Raft leader of an ElasticKV cluster by probing INFO replication. It also implements a retry mechanism with jittered exponential backoff for secondary writes encountering compacted read timestamp errors. Feedback is provided regarding the potential for unbounded memory growth in the clients map of the LeaderAwareRedisBackend if cluster addresses change frequently over time.
| logger *slog.Logger | ||
|
|
||
| mu sync.RWMutex | ||
| clients map[string]*redis.Client |
There was a problem hiding this comment.
The clients map in LeaderAwareRedisBackend grows as new leader addresses are discovered but is only cleared upon calling Close(). In a long-running process where cluster nodes are replaced or addresses change, this can lead to a slow accumulation of stale redis.Client instances and their associated connection pools. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to this collection and reject new requests or implement a pruning mechanism when the bound is reached.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
There was a problem hiding this comment.
🧹 Nitpick comments (4)
cmd/redis-proxy/main.go (1)
77-80: Nit: prefererrors.Newfor a static message, and consider delegating trim+split toproxy.normalizeSeeds.
fmt.Errorf("at least one secondary address is required")contains no verbs —errors.New(...)is idiomatic. More importantly,parseAddrListin this file andnormalizeSeedsinproxy/leader_aware_backend.goboth trim whitespace and drop empties on a seed list;normalizeSeedsadditionally dedupes. SinceNewLeaderAwareRedisBackendalready callsnormalizeSeeds, this wrapper duplicates part of that logic. If the intent is to validate empty input up-front, a thinproxy.NormalizeSeedsexport (or aproxy.ParseSeedList) would avoid two near-identical helpers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/redis-proxy/main.go` around lines 77 - 80, Replace the static fmt.Errorf with errors.New and avoid duplicating seed parsing: instead of calling parseAddrList(cfg.SecondaryAddr) use the shared normalizer (export proxy.NormalizeSeeds or proxy.ParseSeedList) and then validate the result; if the normalized slice is empty return errors.New("at least one secondary address is required"). Update references around parseAddrList and ensure NewLeaderAwareRedisBackend still calls proxy.normalizeSeeds internally (or use the new exported proxy function) so trimming/deduping logic is centralized.proxy/leader_aware_backend.go (1)
67-93: Prefer returning an error from the constructor instead of panicking on empty seeds.Line 70 panics when
normalizeSeedsreturns nothing.cmd/redis-proxy/main.goalready guards this, but for a package-level constructor this is fragile: any future caller (new binaries, library use, tests) has to know the invariant, and a panic here crashes the whole proxy process without a structured error. Returning(*LeaderAwareRedisBackend, error)— or mirroring thecmd/redis-proxy/main.goguard's error at call sites — is more idiomatic and lets callers handle misconfiguration uniformly. Not urgent because the call site already validates, but worth cleaning up before other callers appear.Sketch
-func NewLeaderAwareRedisBackendWithInterval(seeds []string, name string, opts BackendOptions, refreshInterval, refreshTimeout time.Duration, logger *slog.Logger) *LeaderAwareRedisBackend { +func NewLeaderAwareRedisBackendWithInterval(seeds []string, name string, opts BackendOptions, refreshInterval, refreshTimeout time.Duration, logger *slog.Logger) (*LeaderAwareRedisBackend, error) { normalized := normalizeSeeds(seeds) if len(normalized) == 0 { - panic("proxy: LeaderAwareRedisBackend requires at least one seed address") + return nil, errors.New("proxy: LeaderAwareRedisBackend requires at least one seed address") }(and propagate through
NewLeaderAwareRedisBackend+ call sites).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/leader_aware_backend.go` around lines 67 - 93, Change NewLeaderAwareRedisBackendWithInterval to return (*LeaderAwareRedisBackend, error) instead of panicking: if normalizeSeeds(seeds) yields zero entries return (nil, fmt.Errorf("proxy: LeaderAwareRedisBackend requires at least one seed address")). Update NewLeaderAwareRedisBackend (the convenience constructor) to mirror the new signature and propagate the error, and update all callers to handle the returned error instead of relying on a panic; ensure the rest of the function (initializing b, starting refreshLoop) runs only on success and that imports (fmt) and any call sites in cmd/redis-proxy/main.go are updated to check and return/log the error.proxy/dualwrite.go (2)
269-303: Worth watching: retries extendscriptSemhold time under sustained compaction.When
writeSecondaryis invoked fromScript(line 252 usesgoScript), the retry loop holds a slot ofscriptSem(capmaxScriptWriteGoroutines = 64) for the full 4-attempt duration including backoffs. Total sleep budget is roughly 10 + 20 + 40 ms plus jitter up to 50%, i.e. ~70–105 ms worst case before the 4th send, plus command RTT. If every secondary script replay is hitting a compacted snapshot during a waterline advancement, 64 concurrent replays all backing off will cause new Lua secondary writes to be dropped viametrics.AsyncDrops. That's the intended load-shed behavior, but operators should see this in the dashboard before drawing wrong conclusions ("the proxy is dropping writes").Consider emitting a dedicated counter (e.g.
SecondaryCompactedRetries) or a log atInfolevel on each retry path entry so compaction-driven backpressure is distinguishable from plain overload. TheDebuglog on line 297 is too quiet for production diagnostics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/dualwrite.go` around lines 269 - 303, writeSecondary's retry loop is silently holding a scriptSem slot during compacted-snapshot retries; add an observable signal so operators can distinguish compaction-driven backpressure from general overload by incrementing a dedicated metric (e.g. SecondaryCompactedRetries) or emitting an Info-level log on each retry path. Inside writeSecondary, in the branch guarded by isReadTSCompactedError(sErr) (before the waitCompactedRetryBackoff call and after checking attempt >= maxCompactedRetries), increment the new metric or call d.logger.Info with context ("cmd", cmd, "attempt", attempt+1, "err", sErr) so every compacted retry is counted/logged; ensure the metric name and logger call are wired through the existing d.metrics/d.logger fields so no globals are introduced.
52-63: Substring matching on error text is brittle but currently unavoidable — consider a structured signal over time.The doc on lines 52–56 correctly notes that both gRPC (
FailedPrecondition) and LuaPCallerase the typedstore.ErrReadTSCompacted, so string matching is the last resort. Two small risks worth tracking:
- If an upstream library ever wraps/translates the error (e.g. changes casing or localization), the predicate silently stops firing and retries regress to zero.
- User-authored Lua scripts that
redis.error_reply("read timestamp has been compacted")on purpose would be retried unnecessarily.Neither is worth fixing now, but a long-term remediation is to surface the typed error through gRPC
details(e.g. a status-detailsErrorInfo{Reason: "READ_TS_COMPACTED"}) and match on reason, falling back to this substring only during rollout. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/dualwrite.go` around lines 52 - 63, Update isReadTSCompactedError to prefer a structured signal (gRPC status details ErrorInfo with Reason "READ_TS_COMPACTED") before falling back to substring matching: inspect the error for a gRPC status (status.FromError) and check its Details for an ErrorInfo whose Reason equals "READ_TS_COMPACTED"; if not present, keep the existing fallback that uses readTSCompactedMarker and strings.Contains(err.Error(), readTSCompactedMarker). Ensure the implementation references the existing symbols isReadTSCompactedError and readTSCompactedMarker and preserves the nil check at the top.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cmd/redis-proxy/main.go`:
- Around line 77-80: Replace the static fmt.Errorf with errors.New and avoid
duplicating seed parsing: instead of calling parseAddrList(cfg.SecondaryAddr)
use the shared normalizer (export proxy.NormalizeSeeds or proxy.ParseSeedList)
and then validate the result; if the normalized slice is empty return
errors.New("at least one secondary address is required"). Update references
around parseAddrList and ensure NewLeaderAwareRedisBackend still calls
proxy.normalizeSeeds internally (or use the new exported proxy function) so
trimming/deduping logic is centralized.
In `@proxy/dualwrite.go`:
- Around line 269-303: writeSecondary's retry loop is silently holding a
scriptSem slot during compacted-snapshot retries; add an observable signal so
operators can distinguish compaction-driven backpressure from general overload
by incrementing a dedicated metric (e.g. SecondaryCompactedRetries) or emitting
an Info-level log on each retry path. Inside writeSecondary, in the branch
guarded by isReadTSCompactedError(sErr) (before the waitCompactedRetryBackoff
call and after checking attempt >= maxCompactedRetries), increment the new
metric or call d.logger.Info with context ("cmd", cmd, "attempt", attempt+1,
"err", sErr) so every compacted retry is counted/logged; ensure the metric name
and logger call are wired through the existing d.metrics/d.logger fields so no
globals are introduced.
- Around line 52-63: Update isReadTSCompactedError to prefer a structured signal
(gRPC status details ErrorInfo with Reason "READ_TS_COMPACTED") before falling
back to substring matching: inspect the error for a gRPC status
(status.FromError) and check its Details for an ErrorInfo whose Reason equals
"READ_TS_COMPACTED"; if not present, keep the existing fallback that uses
readTSCompactedMarker and strings.Contains(err.Error(), readTSCompactedMarker).
Ensure the implementation references the existing symbols isReadTSCompactedError
and readTSCompactedMarker and preserves the nil check at the top.
In `@proxy/leader_aware_backend.go`:
- Around line 67-93: Change NewLeaderAwareRedisBackendWithInterval to return
(*LeaderAwareRedisBackend, error) instead of panicking: if normalizeSeeds(seeds)
yields zero entries return (nil, fmt.Errorf("proxy: LeaderAwareRedisBackend
requires at least one seed address")). Update NewLeaderAwareRedisBackend (the
convenience constructor) to mirror the new signature and propagate the error,
and update all callers to handle the returned error instead of relying on a
panic; ensure the rest of the function (initializing b, starting refreshLoop)
runs only on success and that imports (fmt) and any call sites in
cmd/redis-proxy/main.go are updated to check and return/log the error.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e3081c5e-6d32-4a5d-8e97-1d20574d5e2c
📒 Files selected for processing (7)
adapter/redis_compat_commands.goadapter/redis_info_test.gocmd/redis-proxy/main.goproxy/dualwrite.goproxy/leader_aware_backend.goproxy/leader_aware_backend_test.goproxy/proxy_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- proxy/proxy_test.go
… param - newFakeElasticKVNode: drop the unused leader parameter flagged by the unparam linter; callers always seeded via SetLeader after the fact. - LeaderAwareRedisBackend: cap the cached client map at maxLeaderAwareClients=16 with FIFO eviction, skipping seeds and the current leader, so leader churn in a long-running proxy cannot leak redis.Client pools. Close clears the FIFO along with the map. - Regression test TestLeaderAwareRedisBackend_EvictsOldestNonProtectedClient churns 2x the cap worth of leader addresses and verifies the cache stays bounded and the seed is never evicted.
|
/gemini review |
|
|
||
| logger *slog.Logger | ||
|
|
||
| mu sync.RWMutex |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
File is not properly formatted (gci)
There was a problem hiding this comment.
Code Review
This pull request introduces a leader-aware Redis backend that dynamically discovers the current Raft leader via INFO replication and routes commands accordingly. It also adds a transparent retry mechanism with jittered exponential backoff for secondary writes that encounter compacted read timestamp errors. Review feedback highlights a performance optimization for the client retrieval hot path using atomic pointers and a potential logic issue where the leader address could be updated even if a corresponding client cannot be initialized.
| func (b *LeaderAwareRedisBackend) currentClient() *redis.Client { | ||
| b.mu.RLock() | ||
| defer b.mu.RUnlock() | ||
| if b.closed { | ||
| return nil | ||
| } | ||
| return b.clients[b.leader] | ||
| } |
There was a problem hiding this comment.
The currentClient method is called on every command execution, making it a hot path. Using a RWMutex here can introduce unnecessary overhead and potential contention under high load. Following the general rule for frequently accessed fields, consider using an atomic.Pointer[*redis.Client] to store and retrieve the current leader's client. This would allow lock-free access for the majority of operations, with the pointer being updated only when a leader change is detected or during shutdown.
References
- For frequently accessed fields that require monotonic updates (like a configuration index), prefer atomic operations (e.g., CAS) over mutexes to improve performance on hot paths.
| func (b *LeaderAwareRedisBackend) setLeader(addr string) { | ||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
| if b.closed || b.leader == addr { | ||
| return | ||
| } | ||
| prev := b.leader | ||
| b.leader = addr | ||
| b.ensureClientLocked(addr) | ||
| b.logger.Info("elastickv leader updated", "backend", b.name, "from", prev, "to", addr) | ||
| } |
There was a problem hiding this comment.
In setLeader, the b.leader address is updated before ensuring that a client can be successfully created or retrieved for it. If ensureClientLocked returns nil (e.g., because the client cache is full of non-evictable entries like seeds), b.leader will point to an address with no corresponding client in the b.clients map. This will cause all subsequent commands to fail with ErrNoLeaderBackend. It is safer to verify that a client can be obtained for the new address before updating b.leader.
func (b *LeaderAwareRedisBackend) setLeader(addr string) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed || b.leader == addr {
return
}
cli := b.ensureClientLocked(addr)
if cli == nil {
return
}
prev := b.leader
b.leader = addr
b.logger.Info("elastickv leader updated", "backend", b.name, "from", prev, "to", addr)
}
EVALSHA replays to the ElasticKV secondary can surface
FailedPrecondition / "read timestamp has been compacted" when the
script's startTS falls behind a peer node's MinRetainedTS (the local
readPin only protects the node that picked the timestamp). Each retry
re-sends the command so the secondary re-selects a fresh read snapshot.
Summary by CodeRabbit