Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,27 @@ var argsLen = map[string]int{
}

type RedisServer struct {
listen net.Listener
store store.MVCCStore
coordinator kv.Coordinator
readTracker *kv.ActiveTimestampTracker
redisTranscoder *redisTranscoder
pubsub *redisPubSub
scriptMu sync.RWMutex
scriptCache map[string]string
traceCommands bool
traceSeq atomic.Uint64
redisAddr string
relay *RedisPubSubRelay
relayConnCache kv.GRPCConnCache
requestObserver monitoring.RedisRequestObserver
luaObserver monitoring.LuaScriptObserver
listen net.Listener
store store.MVCCStore
coordinator kv.Coordinator
readTracker *kv.ActiveTimestampTracker
redisTranscoder *redisTranscoder
pubsub *redisPubSub
scriptMu sync.RWMutex
scriptCache map[string]string
traceCommands bool
traceSeq atomic.Uint64
redisAddr string
relay *RedisPubSubRelay
relayConnCache kv.GRPCConnCache
requestObserver monitoring.RedisRequestObserver
luaObserver monitoring.LuaScriptObserver
luaFastPathObserver monitoring.LuaFastPathObserver
// luaFastPathZRange is the pre-resolved counter bundle for the
// ZRANGEBYSCORE / ZREVRANGEBYSCORE Lua fast path. Resolved once in
// WithLuaFastPathObserver so the hot path does not pay for
// CounterVec.WithLabelValues on every redis.call().
luaFastPathZRange monitoring.LuaFastPathCmd
// baseCtx is the parent context for per-request handlers.
// NewRedisServer creates a cancelable context here; Stop() cancels
// it so in-flight handlers abort promptly instead of running
Expand Down Expand Up @@ -320,6 +326,25 @@ func WithLuaObserver(observer monitoring.LuaScriptObserver) RedisServerOption {
}
}

// WithLuaFastPathObserver enables per-redis.call() fast-path outcome
// metrics inside Lua scripts. Used to diagnose fast-path hit ratios
// for commands like ZRANGEBYSCORE / ZSCORE / HGET.
//
// Resolves per-command counter handles up front so the hot path
// avoids CounterVec.WithLabelValues on every redis.call().
func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServerOption {
return func(r *RedisServer) {
r.luaFastPathObserver = observer
r.luaFastPathZRange = observer.ForCommand(luaFastPathCmdZRangeByScore)
}
}

// luaFastPathCmdZRangeByScore is the shared label for ZRANGEBYSCORE
// and ZREVRANGEBYSCORE fast-path outcomes. Both directions take the
// same branch through zsetRangeByScoreFast so sharing one label
// keeps the counter cardinality bounded.
const luaFastPathCmdZRangeByScore = "zrangebyscore"

// redisMetricsConn wraps a redcon.Conn to detect whether WriteError was called.
type redisMetricsConn struct {
redcon.Conn
Expand Down
10 changes: 10 additions & 0 deletions adapter/redis_lua_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2424,19 +2424,29 @@ func (c *luaScriptContext) cmdZRangeByScore(args []string, reverse bool) (luaRep
// Fast path eligibility: no script-local mutation / deletion /
// type-change on this key. Mirrors the cmdZScore / cmdHGet guards
// so in-script ZADD / ZREM / DEL / SET behave exactly as before.
//
// zrangeMetrics is a zero-value LuaFastPathCmd when no observer is
// wired (tests); Observe* methods on it are no-ops. When wired,
// each Observe* is a single atomic increment on a pre-resolved
// Counter (see WithLuaFastPathObserver).
zrangeMetrics := c.server.luaFastPathZRange
if luaZSetAlreadyLoaded(c, key) {
zrangeMetrics.ObserveSkipLoaded()
return c.cmdZRangeByScoreSlow(key, options, reverse)
}
if _, cached := c.cachedType(key); cached {
zrangeMetrics.ObserveSkipCachedType()
return c.cmdZRangeByScoreSlow(key, options, reverse)
}
entries, hit, fastErr := c.zrangeByScoreFastPath(key, options, reverse)
if fastErr != nil {
return luaReply{}, fastErr
}
if !hit {
zrangeMetrics.ObserveFallback()
return c.cmdZRangeByScoreSlow(key, options, reverse)
}
zrangeMetrics.ObserveHit()
if len(entries) == 0 {
return luaArrayReply(), nil
}
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr
adapter.WithRedisActiveTimestampTracker(readTracker),
adapter.WithRedisRequestObserver(metricsRegistry.RedisObserver()),
adapter.WithLuaObserver(metricsRegistry.LuaObserver()),
adapter.WithLuaFastPathObserver(metricsRegistry.LuaFastPathObserver()),
adapter.WithRedisCompactor(deltaCompactor),
)
eg.Go(func() error {
Expand Down
88 changes: 88 additions & 0 deletions monitoring/hotpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,19 @@ type HotPathMetrics struct {
dispatchDroppedTotal *prometheus.CounterVec
dispatchErrorsTotal *prometheus.CounterVec
stepQueueFullTotal *prometheus.CounterVec
luaFastPathTotal *prometheus.CounterVec
}

// LuaFastPathOutcome labels tag each Lua-side read fast-path decision
// so operators can see how often a given command (ZRANGEBYSCORE,
// ZSCORE, HGET, etc.) actually takes the fast path vs falls back.
const (
LuaFastPathOutcomeHit = "hit"
LuaFastPathOutcomeSkipLoaded = "skip_loaded"
LuaFastPathOutcomeSkipCachedType = "skip_cached_type"
LuaFastPathOutcomeFallback = "fallback"
)

func newHotPathMetrics(registerer prometheus.Registerer) *HotPathMetrics {
m := &HotPathMetrics{
leaseReadsTotal: prometheus.NewCounterVec(
Expand Down Expand Up @@ -66,17 +77,94 @@ func newHotPathMetrics(registerer prometheus.Registerer) *HotPathMetrics {
},
[]string{"group"},
),
luaFastPathTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_lua_cmd_fastpath_total",
Help: "Per-redis.call() fast-path outcome inside Lua scripts. cmd identifies the command (zrangebyscore, zscore, ...); outcome is hit, skip_loaded, skip_cached_type, or fallback.",
},
[]string{"cmd", "outcome"},
),
}

registerer.MustRegister(
m.leaseReadsTotal,
m.dispatchDroppedTotal,
m.dispatchErrorsTotal,
m.stepQueueFullTotal,
m.luaFastPathTotal,
)
return m
}

// LuaFastPathObserver records fast-path outcomes for redis.call()
// inside Lua scripts. The zero value is safe and silently drops
// samples so tests can pass LuaFastPathObserver{} as a stub.
//
// Hot-path shape: each Observe* call on a LuaFastPathCmd handle is a
// single non-blocking atomic increment on a pre-resolved
// prometheus.Counter (client_golang's default Counter uses
// sync/atomic internally). Callers resolve one LuaFastPathCmd per
// command at server construction to avoid
// CounterVec.WithLabelValues (mutex-guarded map lookup) on the hot
// path.
type LuaFastPathObserver struct {
metrics *HotPathMetrics
}

// LuaFastPathCmd is a pre-resolved bundle of fast-path outcome
// counters for a single Lua command. Construct once via
// LuaFastPathObserver.ForCommand(cmd) at server startup; call the
// Observe* methods per redis.call(). Safe to copy.
type LuaFastPathCmd struct {
hit prometheus.Counter
skipLoaded prometheus.Counter
skipCachedType prometheus.Counter
fallback prometheus.Counter
}

// ForCommand pre-resolves the counter handles for cmd. Returns a
// zero-value LuaFastPathCmd when the observer is empty (tests),
// which silently drops all Observe* calls.
func (o LuaFastPathObserver) ForCommand(cmd string) LuaFastPathCmd {
if o.metrics == nil {
return LuaFastPathCmd{}
}
vec := o.metrics.luaFastPathTotal
return LuaFastPathCmd{
hit: vec.WithLabelValues(cmd, LuaFastPathOutcomeHit),
skipLoaded: vec.WithLabelValues(cmd, LuaFastPathOutcomeSkipLoaded),
skipCachedType: vec.WithLabelValues(cmd, LuaFastPathOutcomeSkipCachedType),
fallback: vec.WithLabelValues(cmd, LuaFastPathOutcomeFallback),
}
}

// ObserveHit / ObserveSkipLoaded / ObserveSkipCachedType /
// ObserveFallback record one outcome. Each is a single atomic
// increment when the counter is wired; a no-op on the zero value.
func (c LuaFastPathCmd) ObserveHit() {
if c.hit != nil {
c.hit.Inc()
}
}

func (c LuaFastPathCmd) ObserveSkipLoaded() {
if c.skipLoaded != nil {
c.skipLoaded.Inc()
}
}

func (c LuaFastPathCmd) ObserveSkipCachedType() {
if c.skipCachedType != nil {
c.skipCachedType.Inc()
}
}

func (c LuaFastPathCmd) ObserveFallback() {
if c.fallback != nil {
c.fallback.Inc()
}
}

// LeaseReadObserver implements kv.LeaseReadObserver by incrementing the
// elastickv_lease_read_total counter vector. Callers grab an instance
// via Registry.LeaseReadObserver(); the zero value is safe and silently
Expand Down
35 changes: 35 additions & 0 deletions monitoring/hotpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,41 @@ elastickv_lease_read_total{node_address="10.0.0.1:50051",node_id="n1",outcome="m
require.NoError(t, err)
}

func TestLuaFastPathObserverCountsByCmdAndOutcome(t *testing.T) {
registry := NewRegistry("n1", "10.0.0.1:50051")
cmd := registry.LuaFastPathObserver().ForCommand("zrangebyscore")

cmd.ObserveHit()
cmd.ObserveHit()
cmd.ObserveSkipLoaded()
cmd.ObserveFallback()

err := testutil.GatherAndCompare(
registry.Gatherer(),
strings.NewReader(`
# HELP elastickv_lua_cmd_fastpath_total Per-redis.call() fast-path outcome inside Lua scripts. cmd identifies the command (zrangebyscore, zscore, ...); outcome is hit, skip_loaded, skip_cached_type, or fallback.
# TYPE elastickv_lua_cmd_fastpath_total counter
elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="fallback"} 1
elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="hit"} 2
elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="skip_cached_type"} 0
elastickv_lua_cmd_fastpath_total{cmd="zrangebyscore",node_address="10.0.0.1:50051",node_id="n1",outcome="skip_loaded"} 1
`),
"elastickv_lua_cmd_fastpath_total",
)
require.NoError(t, err)
}

func TestLuaFastPathObserverZeroValueIsNoop(t *testing.T) {
var observer LuaFastPathObserver
cmd := observer.ForCommand("zrangebyscore")
require.NotPanics(t, func() {
cmd.ObserveHit()
cmd.ObserveSkipLoaded()
cmd.ObserveSkipCachedType()
cmd.ObserveFallback()
})
}

func TestLeaseReadObserverZeroValueIsNoop(t *testing.T) {
// LeaseReadObserver{} is documented as safe; the Coordinator
// falls back to this when monitoring is disabled. Calling
Expand Down
10 changes: 10 additions & 0 deletions monitoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ func (r *Registry) LeaseReadObserver() LeaseReadObserver {
return LeaseReadObserver{metrics: r.hotPath}
}

// LuaFastPathObserver returns an observer for Lua-side redis.call()
// fast-path outcomes (hit / skip / fallback per command). Zero-value
// safe for tests and tools that do not wire a registry.
func (r *Registry) LuaFastPathObserver() LuaFastPathObserver {
if r == nil {
return LuaFastPathObserver{}
}
return LuaFastPathObserver{metrics: r.hotPath}
}

// DispatchCollector returns a collector that polls the etcd raft
// engine's dispatch counters and exports them to Prometheus. Start it
// with the node's raft sources after engine Open() completes.
Expand Down
Loading