diff --git a/adapter/redis.go b/adapter/redis.go index 917f6d23..ef709c08 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -250,21 +250,27 @@ var argsLen = map[string]int{ } type RedisServer struct { - listen net.Listener - store store.MVCCStore - coordinator kv.Coordinator - readTracker *kv.ActiveTimestampTracker - redisTranscoder *redisTranscoder - pubsub *redisPubSub - scriptMu sync.RWMutex - scriptCache map[string]string - traceCommands bool - traceSeq atomic.Uint64 - redisAddr string - relay *RedisPubSubRelay - relayConnCache kv.GRPCConnCache - requestObserver monitoring.RedisRequestObserver - luaObserver monitoring.LuaScriptObserver + listen net.Listener + store store.MVCCStore + coordinator kv.Coordinator + readTracker *kv.ActiveTimestampTracker + redisTranscoder *redisTranscoder + pubsub *redisPubSub + scriptMu sync.RWMutex + scriptCache map[string]string + traceCommands bool + traceSeq atomic.Uint64 + redisAddr string + relay *RedisPubSubRelay + relayConnCache kv.GRPCConnCache + requestObserver monitoring.RedisRequestObserver + luaObserver monitoring.LuaScriptObserver + luaFastPathObserver monitoring.LuaFastPathObserver + // luaFastPathZRange is the pre-resolved counter bundle for the + // ZRANGEBYSCORE / ZREVRANGEBYSCORE Lua fast path. Resolved once in + // WithLuaFastPathObserver so the hot path does not pay for + // CounterVec.WithLabelValues on every redis.call(). + luaFastPathZRange monitoring.LuaFastPathCmd // baseCtx is the parent context for per-request handlers. // NewRedisServer creates a cancelable context here; Stop() cancels // it so in-flight handlers abort promptly instead of running @@ -320,6 +326,25 @@ func WithLuaObserver(observer monitoring.LuaScriptObserver) RedisServerOption { } } +// WithLuaFastPathObserver enables per-redis.call() fast-path outcome +// metrics inside Lua scripts. Used to diagnose fast-path hit ratios +// for commands like ZRANGEBYSCORE / ZSCORE / HGET. +// +// Resolves per-command counter handles up front so the hot path +// avoids CounterVec.WithLabelValues on every redis.call(). +func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServerOption { + return func(r *RedisServer) { + r.luaFastPathObserver = observer + r.luaFastPathZRange = observer.ForCommand(luaFastPathCmdZRangeByScore) + } +} + +// luaFastPathCmdZRangeByScore is the shared label for ZRANGEBYSCORE +// and ZREVRANGEBYSCORE fast-path outcomes. Both directions take the +// same branch through zsetRangeByScoreFast so sharing one label +// keeps the counter cardinality bounded. +const luaFastPathCmdZRangeByScore = "zrangebyscore" + // redisMetricsConn wraps a redcon.Conn to detect whether WriteError was called. type redisMetricsConn struct { redcon.Conn diff --git a/adapter/redis_lua_context.go b/adapter/redis_lua_context.go index b9436136..8c423e54 100644 --- a/adapter/redis_lua_context.go +++ b/adapter/redis_lua_context.go @@ -2424,10 +2424,18 @@ func (c *luaScriptContext) cmdZRangeByScore(args []string, reverse bool) (luaRep // Fast path eligibility: no script-local mutation / deletion / // type-change on this key. Mirrors the cmdZScore / cmdHGet guards // so in-script ZADD / ZREM / DEL / SET behave exactly as before. + // + // zrangeMetrics is a zero-value LuaFastPathCmd when no observer is + // wired (tests); Observe* methods on it are no-ops. When wired, + // each Observe* is a single atomic increment on a pre-resolved + // Counter (see WithLuaFastPathObserver). + zrangeMetrics := c.server.luaFastPathZRange if luaZSetAlreadyLoaded(c, key) { + zrangeMetrics.ObserveSkipLoaded() return c.cmdZRangeByScoreSlow(key, options, reverse) } if _, cached := c.cachedType(key); cached { + zrangeMetrics.ObserveSkipCachedType() return c.cmdZRangeByScoreSlow(key, options, reverse) } entries, hit, fastErr := c.zrangeByScoreFastPath(key, options, reverse) @@ -2435,8 +2443,10 @@ func (c *luaScriptContext) cmdZRangeByScore(args []string, reverse bool) (luaRep return luaReply{}, fastErr } if !hit { + zrangeMetrics.ObserveFallback() return c.cmdZRangeByScoreSlow(key, options, reverse) } + zrangeMetrics.ObserveHit() if len(entries) == 0 { return luaArrayReply(), nil } diff --git a/main.go b/main.go index a1aed4a8..2e5b43f6 100644 --- a/main.go +++ b/main.go @@ -553,6 +553,7 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr adapter.WithRedisActiveTimestampTracker(readTracker), adapter.WithRedisRequestObserver(metricsRegistry.RedisObserver()), adapter.WithLuaObserver(metricsRegistry.LuaObserver()), + adapter.WithLuaFastPathObserver(metricsRegistry.LuaFastPathObserver()), adapter.WithRedisCompactor(deltaCompactor), ) eg.Go(func() error { diff --git a/monitoring/hotpath.go b/monitoring/hotpath.go index 479b1303..61ec8fe6 100644 --- a/monitoring/hotpath.go +++ b/monitoring/hotpath.go @@ -34,8 +34,19 @@ type HotPathMetrics struct { dispatchDroppedTotal *prometheus.CounterVec dispatchErrorsTotal *prometheus.CounterVec stepQueueFullTotal *prometheus.CounterVec + luaFastPathTotal *prometheus.CounterVec } +// LuaFastPathOutcome labels tag each Lua-side read fast-path decision +// so operators can see how often a given command (ZRANGEBYSCORE, +// ZSCORE, HGET, etc.) actually takes the fast path vs falls back. +const ( + LuaFastPathOutcomeHit = "hit" + LuaFastPathOutcomeSkipLoaded = "skip_loaded" + LuaFastPathOutcomeSkipCachedType = "skip_cached_type" + LuaFastPathOutcomeFallback = "fallback" +) + func newHotPathMetrics(registerer prometheus.Registerer) *HotPathMetrics { m := &HotPathMetrics{ leaseReadsTotal: prometheus.NewCounterVec( @@ -66,6 +77,13 @@ func newHotPathMetrics(registerer prometheus.Registerer) *HotPathMetrics { }, []string{"group"}, ), + luaFastPathTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_lua_cmd_fastpath_total", + Help: "Per-redis.call() fast-path outcome inside Lua scripts. cmd identifies the command (zrangebyscore, zscore, ...); outcome is hit, skip_loaded, skip_cached_type, or fallback.", + }, + []string{"cmd", "outcome"}, + ), } registerer.MustRegister( @@ -73,10 +91,80 @@ func newHotPathMetrics(registerer prometheus.Registerer) *HotPathMetrics { m.dispatchDroppedTotal, m.dispatchErrorsTotal, m.stepQueueFullTotal, + m.luaFastPathTotal, ) return m } +// LuaFastPathObserver records fast-path outcomes for redis.call() +// inside Lua scripts. The zero value is safe and silently drops +// samples so tests can pass LuaFastPathObserver{} as a stub. +// +// Hot-path shape: each Observe* call on a LuaFastPathCmd handle is a +// single non-blocking atomic increment on a pre-resolved +// prometheus.Counter (client_golang's default Counter uses +// sync/atomic internally). Callers resolve one LuaFastPathCmd per +// command at server construction to avoid +// CounterVec.WithLabelValues (mutex-guarded map lookup) on the hot +// path. +type LuaFastPathObserver struct { + metrics *HotPathMetrics +} + +// LuaFastPathCmd is a pre-resolved bundle of fast-path outcome +// counters for a single Lua command. Construct once via +// LuaFastPathObserver.ForCommand(cmd) at server startup; call the +// Observe* methods per redis.call(). Safe to copy. +type LuaFastPathCmd struct { + hit prometheus.Counter + skipLoaded prometheus.Counter + skipCachedType prometheus.Counter + fallback prometheus.Counter +} + +// ForCommand pre-resolves the counter handles for cmd. Returns a +// zero-value LuaFastPathCmd when the observer is empty (tests), +// which silently drops all Observe* calls. +func (o LuaFastPathObserver) ForCommand(cmd string) LuaFastPathCmd { + if o.metrics == nil { + return LuaFastPathCmd{} + } + vec := o.metrics.luaFastPathTotal + return LuaFastPathCmd{ + hit: vec.WithLabelValues(cmd, LuaFastPathOutcomeHit), + skipLoaded: vec.WithLabelValues(cmd, LuaFastPathOutcomeSkipLoaded), + skipCachedType: vec.WithLabelValues(cmd, LuaFastPathOutcomeSkipCachedType), + fallback: vec.WithLabelValues(cmd, LuaFastPathOutcomeFallback), + } +} + +// ObserveHit / ObserveSkipLoaded / ObserveSkipCachedType / +// ObserveFallback record one outcome. Each is a single atomic +// increment when the counter is wired; a no-op on the zero value. +func (c LuaFastPathCmd) ObserveHit() { + if c.hit != nil { + c.hit.Inc() + } +} + +func (c LuaFastPathCmd) ObserveSkipLoaded() { + if c.skipLoaded != nil { + c.skipLoaded.Inc() + } +} + +func (c LuaFastPathCmd) ObserveSkipCachedType() { + if c.skipCachedType != nil { + c.skipCachedType.Inc() + } +} + +func (c LuaFastPathCmd) ObserveFallback() { + if c.fallback != nil { + c.fallback.Inc() + } +} + // LeaseReadObserver implements kv.LeaseReadObserver by incrementing the // elastickv_lease_read_total counter vector. Callers grab an instance // via Registry.LeaseReadObserver(); the zero value is safe and silently diff --git a/monitoring/hotpath_test.go b/monitoring/hotpath_test.go index e59c3454..9b9d2969 100644 --- a/monitoring/hotpath_test.go +++ b/monitoring/hotpath_test.go @@ -30,6 +30,41 @@ elastickv_lease_read_total{node_address="10.0.0.1:50051",node_id="n1",outcome="m require.NoError(t, err) } +func TestLuaFastPathObserverCountsByCmdAndOutcome(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + cmd := registry.LuaFastPathObserver().ForCommand("zrangebyscore") + + cmd.ObserveHit() + cmd.ObserveHit() + cmd.ObserveSkipLoaded() + cmd.ObserveFallback() + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_lua_cmd_fastpath_total Per-redis.call() fast-path outcome inside Lua scripts. cmd identifies the command (zrangebyscore, zscore, ...); outcome is hit, skip_loaded, skip_cached_type, or fallback. +# TYPE elastickv_lua_cmd_fastpath_total counter +elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="fallback"} 1 +elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="hit"} 2 +elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="skip_cached_type"} 0 +elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="skip_loaded"} 1 +`), + "elastickv_lua_cmd_fastpath_total", + ) + require.NoError(t, err) +} + +func TestLuaFastPathObserverZeroValueIsNoop(t *testing.T) { + var observer LuaFastPathObserver + cmd := observer.ForCommand("zrangebyscore") + require.NotPanics(t, func() { + cmd.ObserveHit() + cmd.ObserveSkipLoaded() + cmd.ObserveSkipCachedType() + cmd.ObserveFallback() + }) +} + func TestLeaseReadObserverZeroValueIsNoop(t *testing.T) { // LeaseReadObserver{} is documented as safe; the Coordinator // falls back to this when monitoring is disabled. Calling diff --git a/monitoring/registry.go b/monitoring/registry.go index bc58e20d..2145f979 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -112,6 +112,16 @@ func (r *Registry) LeaseReadObserver() LeaseReadObserver { return LeaseReadObserver{metrics: r.hotPath} } +// LuaFastPathObserver returns an observer for Lua-side redis.call() +// fast-path outcomes (hit / skip / fallback per command). Zero-value +// safe for tests and tools that do not wire a registry. +func (r *Registry) LuaFastPathObserver() LuaFastPathObserver { + if r == nil { + return LuaFastPathObserver{} + } + return LuaFastPathObserver{metrics: r.hotPath} +} + // DispatchCollector returns a collector that polls the etcd raft // engine's dispatch counters and exports them to Prometheus. Start it // with the node's raft sources after engine Open() completes.