Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,11 @@ func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string {
}

func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []byte) (uint64, error) {
g, ok := c.groupForKey(key)
routeID, g, ok := c.routeAndGroupForKey(key)
if !ok {
return 0, errors.WithStack(ErrLeaderNotFound)
}
c.observeRead(routeID, len(key))
return linearizableReadEngineCtx(ctx, engineForGroup(g))
}

Expand All @@ -771,10 +772,11 @@ func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error) {
// Each group maintains its own lease since each group has independent
// leadership and term.
func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) {
g, ok := c.groupForKey(key)
routeID, g, ok := c.routeAndGroupForKey(key)
if !ok {
return 0, errors.WithStack(ErrLeaderNotFound)
}
c.observeRead(routeID, len(key))
return groupLeaseRead(ctx, g, c.leaseObserver)
}

Expand Down Expand Up @@ -836,6 +838,24 @@ func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) {
return g, ok
}

// routeAndGroupForKey is groupForKey + the resolved RouteID. Read
// entry points that observe into keyviz call this so the GetRoute
// lookup runs once instead of twice (Gemini round-1 nit on PR #661).
// Leadership-only callers (IsLeaderForKey / VerifyLeaderForKey /
// RaftLeaderForKey) keep using groupForKey because they don't need
// the route ID.
func (c *ShardedCoordinator) routeAndGroupForKey(key []byte) (uint64, *ShardGroup, bool) {
route, ok := c.engine.GetRoute(routeKey(key))
if !ok {
return 0, nil, false
}
g, ok := c.groups[route.GroupID]
if !ok {
return 0, nil, false
}
return route.RouteID, g, true
}

func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 {
route, ok := c.engine.GetRoute(routeKey(key))
if !ok {
Expand Down Expand Up @@ -975,25 +995,41 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e
return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids)
}

// observeMutation: reads never reach this path; the early return
// keeps the disabled-keyviz hot path allocation-free. Counted
// pre-commit, so a mutation that subsequently fails its Raft
// proposal is still recorded — the heatmap reflects offered load,
// not just committed writes (intentional for traffic visualisation).
//
// TODO(keyviz Phase 2): the design (§5.1, §10) calls for read
// sampling on the node that serves the read (LeaseRead /
// LinearizableRead / follower reads). Until that wiring lands the
// matrix's Reads / ReadBytes series stay zero. Tracked as a Phase-2
// milestone in docs/admin_ui_key_visualizer_design.md — not a
// regression for the writes-first slice this method covers.
// observeMutation: counted pre-commit, so a mutation that subsequently
// fails its Raft proposal is still recorded — the heatmap reflects
// offered load, not just committed writes (intentional for traffic
// visualisation). The early return keeps the disabled-keyviz hot
// path allocation-free. Reads have their own observeReadKey helper
// (LinearizableReadForKey / LeaseReadForKey).
func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) {
if c.sampler == nil {
return
}
c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value))
}

// observeRead records a single linearizable / lease read against the
// route. valueLen is always 0 here — the consistency check at this
// layer doesn't fetch data; the actual GetAt on the store happens
// further down the stack and isn't observed yet.
//
// Callers MUST pass an already-resolved routeID (via
// routeAndGroupForKey) so the GetRoute lookup runs once across the
// dispatch path — repeating it here just to compute routeID would
// double the per-read overhead when sampling is enabled
// (Gemini round-1 nit on PR #661).
//
// Adapter-direct read paths (Redis / DynamoDB / S3 hitting
// MVCCStore.GetAt without going through the coordinator) still
// bypass keyviz; sampling those is task B in the design's Phase 2
// follow-up.
func (c *ShardedCoordinator) observeRead(routeID uint64, keyLen int) {
if c.sampler == nil {
return
}
c.sampler.Observe(routeID, keyviz.OpRead, keyLen, 0)
}

func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) {
grouped := make(map[uint64][]*pb.Mutation)
for _, req := range reqs {
Expand Down
70 changes: 70 additions & 0 deletions kv/sharded_coordinator_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,73 @@ func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) {
})
}
}

// TestShardedCoordinatorObservesLeaseAndLinearizableReads pins the
// read-side wiring: LeaseReadForKey and LinearizableReadForKey each
// produce one Observe(routeID, OpRead, len(key), 0) call. valueLen
// is always zero at this layer because the consistency check doesn't
// fetch data — the actual GetAt happens further down the stack and
// is sampled separately (Phase 2 follow-up for adapter direct reads).
func TestShardedCoordinatorObservesLeaseAndLinearizableReads(t *testing.T) {
t.Parallel()
ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte("a"), nil, 1)

s1 := store.NewMVCCStore()
r1, stop1 := newSingleRaft(t, "kv-sampler-read", NewKvFSMWithHLC(s1, NewHLC()))
t.Cleanup(stop1)
groups := map[uint64]*ShardGroup{
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
}
rec := &recordingSampler{}
coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)).WithSampler(rec)

key := []byte("hot-key")
_, err := coord.LinearizableReadForKey(ctx, key)
require.NoError(t, err)
_, err = coord.LeaseReadForKey(ctx, key)
require.NoError(t, err)

calls := rec.snapshot()
require.Len(t, calls, 2, "expected one Observe per read entry")
route, ok := engine.GetRoute(routeKey(key))
require.True(t, ok)
want := sampleCall{
routeID: route.RouteID,
op: keyviz.OpRead,
keyLen: len(key),
valueLen: 0,
}
require.Equal(t, want, calls[0])
require.Equal(t, want, calls[1])
}

// TestShardedCoordinatorSkipsObserveForLeadershipChecks pins the
// negative contract: leadership-only entries (IsLeaderForKey,
// VerifyLeaderForKey, RaftLeaderForKey) MUST NOT produce read
// observations — they don't represent user-facing data reads, just
// internal routing checks.
func TestShardedCoordinatorSkipsObserveForLeadershipChecks(t *testing.T) {
t.Parallel()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte("a"), nil, 1)

s1 := store.NewMVCCStore()
r1, stop1 := newSingleRaft(t, "kv-sampler-leadership", NewKvFSMWithHLC(s1, NewHLC()))
t.Cleanup(stop1)
groups := map[uint64]*ShardGroup{
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
}
rec := &recordingSampler{}
coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)).WithSampler(rec)

key := []byte("k")
_ = coord.IsLeaderForKey(key)
_ = coord.VerifyLeaderForKey(key)
_ = coord.RaftLeaderForKey(key)

require.Empty(t, rec.snapshot(), "leadership checks must not produce read samples")
}
Loading