diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index e478118da..cc19bece0 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -750,10 +750,11 @@ func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string { } func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []byte) (uint64, error) { - g, ok := c.groupForKey(key) + routeID, g, ok := c.routeAndGroupForKey(key) if !ok { return 0, errors.WithStack(ErrLeaderNotFound) } + c.observeRead(routeID, len(key)) return linearizableReadEngineCtx(ctx, engineForGroup(g)) } @@ -771,10 +772,11 @@ func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error) { // Each group maintains its own lease since each group has independent // leadership and term. func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) { - g, ok := c.groupForKey(key) + routeID, g, ok := c.routeAndGroupForKey(key) if !ok { return 0, errors.WithStack(ErrLeaderNotFound) } + c.observeRead(routeID, len(key)) return groupLeaseRead(ctx, g, c.leaseObserver) } @@ -836,6 +838,24 @@ func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) { return g, ok } +// routeAndGroupForKey is groupForKey + the resolved RouteID. Read +// entry points that observe into keyviz call this so the GetRoute +// lookup runs once instead of twice (Gemini round-1 nit on PR #661). +// Leadership-only callers (IsLeaderForKey / VerifyLeaderForKey / +// RaftLeaderForKey) keep using groupForKey because they don't need +// the route ID. +func (c *ShardedCoordinator) routeAndGroupForKey(key []byte) (uint64, *ShardGroup, bool) { + route, ok := c.engine.GetRoute(routeKey(key)) + if !ok { + return 0, nil, false + } + g, ok := c.groups[route.GroupID] + if !ok { + return 0, nil, false + } + return route.RouteID, g, true +} + func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { route, ok := c.engine.GetRoute(routeKey(key)) if !ok { @@ -975,18 +995,12 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids) } -// observeMutation: reads never reach this path; the early return -// keeps the disabled-keyviz hot path allocation-free. Counted -// pre-commit, so a mutation that subsequently fails its Raft -// proposal is still recorded — the heatmap reflects offered load, -// not just committed writes (intentional for traffic visualisation). -// -// TODO(keyviz Phase 2): the design (§5.1, §10) calls for read -// sampling on the node that serves the read (LeaseRead / -// LinearizableRead / follower reads). Until that wiring lands the -// matrix's Reads / ReadBytes series stay zero. Tracked as a Phase-2 -// milestone in docs/admin_ui_key_visualizer_design.md — not a -// regression for the writes-first slice this method covers. +// observeMutation: counted pre-commit, so a mutation that subsequently +// fails its Raft proposal is still recorded — the heatmap reflects +// offered load, not just committed writes (intentional for traffic +// visualisation). The early return keeps the disabled-keyviz hot +// path allocation-free. Reads have their own observeReadKey helper +// (LinearizableReadForKey / LeaseReadForKey). func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { if c.sampler == nil { return @@ -994,6 +1008,28 @@ func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value)) } +// observeRead records a single linearizable / lease read against the +// route. valueLen is always 0 here — the consistency check at this +// layer doesn't fetch data; the actual GetAt on the store happens +// further down the stack and isn't observed yet. +// +// Callers MUST pass an already-resolved routeID (via +// routeAndGroupForKey) so the GetRoute lookup runs once across the +// dispatch path — repeating it here just to compute routeID would +// double the per-read overhead when sampling is enabled +// (Gemini round-1 nit on PR #661). +// +// Adapter-direct read paths (Redis / DynamoDB / S3 hitting +// MVCCStore.GetAt without going through the coordinator) still +// bypass keyviz; sampling those is task B in the design's Phase 2 +// follow-up. +func (c *ShardedCoordinator) observeRead(routeID uint64, keyLen int) { + if c.sampler == nil { + return + } + c.sampler.Observe(routeID, keyviz.OpRead, keyLen, 0) +} + func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) { grouped := make(map[uint64][]*pb.Mutation) for _, req := range reqs { diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go index 97e9e1068..ea4d85a7b 100644 --- a/kv/sharded_coordinator_sampler_test.go +++ b/kv/sharded_coordinator_sampler_test.go @@ -151,3 +151,73 @@ func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { }) } } + +// TestShardedCoordinatorObservesLeaseAndLinearizableReads pins the +// read-side wiring: LeaseReadForKey and LinearizableReadForKey each +// produce one Observe(routeID, OpRead, len(key), 0) call. valueLen +// is always zero at this layer because the consistency check doesn't +// fetch data — the actual GetAt happens further down the stack and +// is sampled separately (Phase 2 follow-up for adapter direct reads). +func TestShardedCoordinatorObservesLeaseAndLinearizableReads(t *testing.T) { + t.Parallel() + ctx := context.Background() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-read", NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + } + rec := &recordingSampler{} + coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)).WithSampler(rec) + + key := []byte("hot-key") + _, err := coord.LinearizableReadForKey(ctx, key) + require.NoError(t, err) + _, err = coord.LeaseReadForKey(ctx, key) + require.NoError(t, err) + + calls := rec.snapshot() + require.Len(t, calls, 2, "expected one Observe per read entry") + route, ok := engine.GetRoute(routeKey(key)) + require.True(t, ok) + want := sampleCall{ + routeID: route.RouteID, + op: keyviz.OpRead, + keyLen: len(key), + valueLen: 0, + } + require.Equal(t, want, calls[0]) + require.Equal(t, want, calls[1]) +} + +// TestShardedCoordinatorSkipsObserveForLeadershipChecks pins the +// negative contract: leadership-only entries (IsLeaderForKey, +// VerifyLeaderForKey, RaftLeaderForKey) MUST NOT produce read +// observations — they don't represent user-facing data reads, just +// internal routing checks. +func TestShardedCoordinatorSkipsObserveForLeadershipChecks(t *testing.T) { + t.Parallel() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-leadership", NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + } + rec := &recordingSampler{} + coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)).WithSampler(rec) + + key := []byte("k") + _ = coord.IsLeaderForKey(key) + _ = coord.VerifyLeaderForKey(key) + _ = coord.RaftLeaderForKey(key) + + require.Empty(t, rec.snapshot(), "leadership checks must not produce read samples") +}