Skip to content
32 changes: 31 additions & 1 deletion adapter/redis_compat_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,45 @@ type bzpopminResult struct {
}

func (r *RedisServer) info(conn redcon.Conn, _ redcon.Command) {
role := "slave"
if r.coordinator != nil && r.coordinator.IsLeader() {
role = "master"
}

leaderRedis := r.raftLeaderRedisAddr()

conn.WriteBulkString(strings.Join([]string{
"# Server",
"redis_version:7.2.0",
"loading:0",
"role:master",
"role:" + role,
"",
"# Replication",
"role:" + role,
"raft_leader_redis:" + leaderRedis,
"",
}, "\r\n"))
}

// raftLeaderRedisAddr returns the Redis-protocol address of the current Raft
// leader as known by this node. When this node is itself the leader the
// server's own listen address is returned. An empty string is returned when
// the leader is not yet known or when the leader's Redis address is not
// configured in the leaderRedis map.
func (r *RedisServer) raftLeaderRedisAddr() string {
if r.coordinator == nil {
return ""
}
if r.coordinator.IsLeader() {
return r.redisAddr
}
leader := r.coordinator.RaftLeader()
if leader == "" {
return ""
}
return r.leaderRedis[leader]
}

// SETEX key seconds value — equivalent to SET key value EX seconds
func (r *RedisServer) setex(conn redcon.Conn, cmd redcon.Command) {
if r.proxyToLeader(conn, cmd, cmd.Args[1]) {
Expand Down
94 changes: 94 additions & 0 deletions adapter/redis_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package adapter

import (
"context"
"strings"
"testing"

"github.com/bootjp/elastickv/kv"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/redcon"
)

type infoTestCoordinator struct {
isLeader bool
raftLeader raft.ServerAddress
clock *kv.HLC
}

func (c *infoTestCoordinator) Dispatch(context.Context, *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
return &kv.CoordinateResponse{}, nil
}
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeader() error { return nil }
func (c *infoTestCoordinator) RaftLeader() raft.ServerAddress { return c.raftLeader }
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) raft.ServerAddress { return c.raftLeader }
func (c *infoTestCoordinator) Clock() *kv.HLC {
if c.clock == nil {
c.clock = kv.NewHLC()
}
return c.clock
}

func TestRedisServer_Info_LeaderRole(t *testing.T) {
r := &RedisServer{
redisAddr: "10.0.0.1:6379",
leaderRedis: map[raft.ServerAddress]string{"raft-1": "10.0.0.1:6379"},
coordinator: &infoTestCoordinator{isLeader: true, raftLeader: "raft-1"},
}

conn := &recordingConn{}
r.info(conn, redcon.Command{})

out := string(conn.bulk)
assert.Contains(t, out, "# Server", "INFO reply must keep the Server section")
assert.Contains(t, out, "# Replication", "INFO reply must expose a Replication section")
assert.Contains(t, out, "role:master", "leader must advertise role:master")
assert.Contains(t, out, "raft_leader_redis:10.0.0.1:6379",
"leader must point raft_leader_redis at itself")
}

func TestRedisServer_Info_FollowerRole(t *testing.T) {
r := &RedisServer{
redisAddr: "10.0.0.2:6379",
leaderRedis: map[raft.ServerAddress]string{
"raft-1": "10.0.0.1:6379",
"raft-2": "10.0.0.2:6379",
},
coordinator: &infoTestCoordinator{isLeader: false, raftLeader: "raft-1"},
}

conn := &recordingConn{}
r.info(conn, redcon.Command{})

out := string(conn.bulk)
assert.Contains(t, out, "role:slave", "follower must advertise role:slave")
assert.Contains(t, out, "raft_leader_redis:10.0.0.1:6379",
"follower must point raft_leader_redis at the actual leader")
// The role must appear in the Replication section so clients that only
// scan that section still see the right value.
idx := strings.Index(out, "# Replication")
require.GreaterOrEqual(t, idx, 0)
assert.Contains(t, out[idx:], "role:slave")
assert.Contains(t, out[idx:], "raft_leader_redis:10.0.0.1:6379")
}

func TestRedisServer_Info_UnknownLeader(t *testing.T) {
r := &RedisServer{
redisAddr: "10.0.0.3:6379",
leaderRedis: map[raft.ServerAddress]string{},
coordinator: &infoTestCoordinator{isLeader: false, raftLeader: ""},
}

conn := &recordingConn{}
r.info(conn, redcon.Command{})

out := string(conn.bulk)
assert.Contains(t, out, "role:slave")
assert.Contains(t, out, "raft_leader_redis:\r\n",
"when the leader is unknown the field must be empty so clients know to keep probing")
}
23 changes: 20 additions & 3 deletions cmd/redis-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ func run() error {
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address. Comma-separated list of seeds is supported; the proxy discovers the current Raft leader via INFO replication.")
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
Expand Down Expand Up @@ -73,14 +74,19 @@ func run() error {
secondaryOpts.DB = cfg.SecondaryDB
secondaryOpts.Password = cfg.SecondaryPassword

secondarySeeds := parseAddrList(cfg.SecondaryAddr)
if len(secondarySeeds) == 0 {
return fmt.Errorf("at least one secondary address is required")
}

var primary, secondary proxy.Backend
switch cfg.Mode {
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
primary = proxy.NewLeaderAwareRedisBackend(secondarySeeds, "elastickv", secondaryOpts, logger)
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
secondary = proxy.NewLeaderAwareRedisBackend(secondarySeeds, "elastickv", secondaryOpts, logger)
}
defer primary.Close()
defer secondary.Close()
Expand Down Expand Up @@ -124,3 +130,14 @@ func run() error {
}
return nil
}

func parseAddrList(raw string) []string {
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
129 changes: 123 additions & 6 deletions proxy/dualwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package proxy

import (
"context"
"crypto/rand"
"errors"
"fmt"
"log/slog"
"math/big"
"strings"
"sync"
"time"

Expand All @@ -25,8 +28,40 @@ const (
// contention bounded; this is only tolerable in modes where the script write
// is targeting the non-authoritative backend.
maxScriptWriteGoroutines = 64

// maxCompactedRetries caps retries when the secondary returns
// "read timestamp has been compacted". Each attempt re-sends the command so
// the secondary re-selects a fresh read snapshot; a small bound is enough
// because the compaction waterline advances slowly relative to SecondaryTimeout.
maxCompactedRetries = 3
// compactedRetryInitialBackoff is the first delay before retrying a secondary
// command that failed with a compacted-read error.
compactedRetryInitialBackoff = 10 * time.Millisecond
// compactedRetryMaxBackoff caps the jittered exponential backoff so it
// stays well within SecondaryTimeout even if the retry policy is ever
// widened.
compactedRetryMaxBackoff = 100 * time.Millisecond
// compactedRetryBackoffFactor is the multiplier applied to the backoff
// between retries; 2 gives a conventional exponential schedule.
compactedRetryBackoffFactor = 2
// compactedRetryJitterDivisor bounds the jitter to backoff/divisor so
// jitter stays small relative to the base delay.
compactedRetryJitterDivisor = 2
)

// readTSCompactedMarker is the substring produced by
// store.ErrReadTSCompacted as it flows through gRPC (wrapped as
// FailedPrecondition) and Lua PCall. Matching on substring is necessary
// because both layers erase the typed error.
const readTSCompactedMarker = "read timestamp has been compacted"

func isReadTSCompactedError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), readTSCompactedMarker)
}

// DualWriter routes commands to primary and secondary backends based on mode.
type DualWriter struct {
primary Backend
Expand Down Expand Up @@ -220,19 +255,53 @@ func (d *DualWriter) Script(ctx context.Context, cmd string, args [][]byte) (any
return resp, err //nolint:wrapcheck // redis.Nil must pass through unwrapped for callers to detect nil replies
}

// writeSecondary sends the command to the secondary, handling the NOSCRIPT
// → EVAL fallback and transparently retrying when the secondary reports that
// the read snapshot has been compacted. A re-sent command causes the backend
// to re-select a fresh read timestamp, which is the only way to recover once
// the original startTS has fallen behind MinRetainedTS on a peer node.
//
// The secondary's raw redis error is kept in sErr (not wrapped) so that
// writeSecondary can classify it via errors.Is(sErr, redis.Nil), attach the
// original message to Sentry and the structured log, and so the retry
// predicate isReadTSCompactedError matches the exact substring coming back
// from gRPC.
func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
sCtx, cancel := context.WithTimeout(context.Background(), d.cfg.SecondaryTimeout)
defer cancel()

start := time.Now()
result := d.secondary.Do(sCtx, iArgs...)
_, sErr := result.Result()
if isNoScriptError(sErr) {
if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok {
result = d.secondary.Do(sCtx, fallbackArgs...)
_, sErr = result.Result()

backoff := compactedRetryInitialBackoff
var sErr error
args := iArgs
for attempt := 0; ; attempt++ {
result := d.secondary.Do(sCtx, args...)
_, sErr = result.Result()
if isNoScriptError(sErr) {
// After a successful NOSCRIPT→EVAL resolution, retries reuse the
// resolved EVAL args so we don't waste a round-trip on the
// known-missing EVALSHA every iteration.
if fallbackArgs, ok := d.evalFallbackArgs(cmd, args); ok {
args = fallbackArgs
result = d.secondary.Do(sCtx, args...)
_, sErr = result.Result()
}
}
if !isReadTSCompactedError(sErr) {
break
}
if attempt >= maxCompactedRetries {
break
}
d.logger.Debug("retrying secondary write on compacted snapshot",
"cmd", cmd, "attempt", attempt+1, "backoff", backoff, "err", sErr)
if !waitCompactedRetryBackoff(sCtx, backoff) {
Comment thread
bootjp marked this conversation as resolved.
break
}
backoff = nextCompactedRetryBackoff(backoff)
}

d.metrics.CommandDuration.WithLabelValues(cmd, d.secondary.Name()).Observe(time.Since(start).Seconds())

if sErr != nil && !errors.Is(sErr, redis.Nil) {
Expand All @@ -248,6 +317,54 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
d.metrics.CommandTotal.WithLabelValues(cmd, d.secondary.Name(), "ok").Inc()
}

// waitCompactedRetryBackoff sleeps for a jittered interval or returns early
// when the context is cancelled. Returns false if the caller should abort
// the retry loop (context done).
//
// Jitter is in [backoff, backoff + backoff/2) so that concurrent retries
// caused by a single compaction waterline advancement do not re-hit the
// secondary in lockstep. A NewTimer is used instead of time.After so the
// timer is released promptly on ctx cancellation (avoiding a leak until
// expiry when the async goroutine is shutting down).
func waitCompactedRetryBackoff(ctx context.Context, backoff time.Duration) bool {
if backoff <= 0 {
return ctx.Err() == nil
}
timer := time.NewTimer(backoff + jitterFor(backoff))
defer timer.Stop()

select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}

// jitterFor returns a random additive jitter in [0, backoff/compactedRetryJitterDivisor).
// crypto/rand is used so the code does not trip the gosec G404 rule that
// flags math/rand for randomness; the security grade does not matter for a
// retry spread but this avoids a blanket lint suppression.
func jitterFor(backoff time.Duration) time.Duration {
window := int64(backoff / compactedRetryJitterDivisor)
if window <= 0 {
return 0
}
n, err := rand.Int(rand.Reader, big.NewInt(window))
if err != nil {
return 0
}
return time.Duration(n.Int64())
}

func nextCompactedRetryBackoff(current time.Duration) time.Duration {
next := current * compactedRetryBackoffFactor
if next > compactedRetryMaxBackoff {
return compactedRetryMaxBackoff
}
return next
}

// goWrite launches fn in a bounded write goroutine.
func (d *DualWriter) goWrite(fn func()) {
d.goAsyncWithSem(d.writeSem, fn)
Expand Down
Loading
Loading