Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions kv/shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,32 @@ func (s *ShardStore) reverseScanRoutesAt(
clampToRoutes bool,
) ([]*store.KVPair, error) {
out := make([]*store.KVPair, 0)
for i := len(routes) - 1; i >= 0 && len(out) < limit; i-- {
for i := len(routes) - 1; i >= 0; i-- {
route := routes[i]
scanStart := start
scanEnd := end
if clampToRoutes {
if len(out) >= limit {
break
}
scanStart = clampScanStart(start, route.Start)
scanEnd = clampScanEnd(end, route.End)
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit-len(out), ts, true)
if err != nil {
return nil, err
}
out = append(out, kvs...)
} else {
// When clampToRoutes is false (e.g. S3 manifest scans spanning multiple
// shards), keys from different routes may interleave in descending order.
// Fetch up to limit from every route and merge+sort descending so the
// result honours the ReverseScanAt contract.
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit, ts, true)
if err != nil {
return nil, err
}
out = mergeAndTrimReverseScanResults(out, kvs, limit)
}
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit-len(out), ts, true)
if err != nil {
return nil, err
}
out = append(out, kvs...)
}
return out, nil
}
Expand Down Expand Up @@ -343,6 +356,21 @@ func mergeAndTrimScanResults(out []*store.KVPair, kvs []*store.KVPair, limit int
return out[:limit]
}

func mergeAndTrimReverseScanResults(out []*store.KVPair, kvs []*store.KVPair, limit int) []*store.KVPair {
if len(kvs) == 0 {
return out
}
out = append(out, kvs...)
if len(out) <= limit {
return out
}
sort.Slice(out, func(i, j int) bool {
return bytes.Compare(out[i].Key, out[j].Key) > 0
})
clear(out[limit:])
return out[:limit]
}

func countNonInternalKVs(kvs []*store.KVPair) int {
count := 0
for _, kvp := range kvs {
Expand Down
191 changes: 191 additions & 0 deletions kv/shard_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,197 @@ func TestShardStoreScanAt_RoutesS3ManifestScansByLogicalObjectKey(t *testing.T)
require.Equal(t, k1, kvs[1].Key)
}

// TestShardStoreReverseScanAt_DescendingOrderAcrossShards verifies that
// ReverseScanAt with a nil start (clampToRoutes=false) merges results from all
// shards and returns them in descending key order.
func TestShardStoreReverseScanAt_DescendingOrderAcrossShards(t *testing.T) {
t.Parallel()

ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte(""), []byte("m"), 1)
engine.UpdateRoute([]byte("m"), nil, 2)

groups := map[uint64]*ShardGroup{
1: {Store: store.NewMVCCStore()},
2: {Store: store.NewMVCCStore()},
}
st := NewShardStore(engine, groups)

// Shard 1 (keys < "m")
require.NoError(t, st.PutAt(ctx, []byte("a"), []byte("va"), 1, 0))
require.NoError(t, st.PutAt(ctx, []byte("c"), []byte("vc"), 2, 0))
// Shard 2 (keys >= "m")
require.NoError(t, st.PutAt(ctx, []byte("x"), []byte("vx"), 3, 0))
require.NoError(t, st.PutAt(ctx, []byte("z"), []byte("vz"), 4, 0))

// nil start → clampToRoutes=false; both shards must be merged in descending order.
kvs, err := st.ReverseScanAt(ctx, nil, nil, 4, ^uint64(0))
require.NoError(t, err)
require.Len(t, kvs, 4)
require.Equal(t, []byte("z"), kvs[0].Key)
require.Equal(t, []byte("x"), kvs[1].Key)
require.Equal(t, []byte("c"), kvs[2].Key)
require.Equal(t, []byte("a"), kvs[3].Key)
}

// TestShardStoreReverseScanAt_LimitAcrossShards verifies that the limit is
// correctly applied when results from multiple shards are merged in descending
// order. The top-N keys across all shards must be returned.
func TestShardStoreReverseScanAt_LimitAcrossShards(t *testing.T) {
t.Parallel()

ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte(""), []byte("m"), 1)
engine.UpdateRoute([]byte("m"), nil, 2)

groups := map[uint64]*ShardGroup{
1: {Store: store.NewMVCCStore()},
2: {Store: store.NewMVCCStore()},
}
st := NewShardStore(engine, groups)

// Shard 1 (keys < "m")
require.NoError(t, st.PutAt(ctx, []byte("a"), []byte("va"), 1, 0))
require.NoError(t, st.PutAt(ctx, []byte("b"), []byte("vb"), 2, 0))
require.NoError(t, st.PutAt(ctx, []byte("c"), []byte("vc"), 3, 0))
// Shard 2 (keys >= "m")
require.NoError(t, st.PutAt(ctx, []byte("x"), []byte("vx"), 4, 0))
require.NoError(t, st.PutAt(ctx, []byte("y"), []byte("vy"), 5, 0))
require.NoError(t, st.PutAt(ctx, []byte("z"), []byte("vz"), 6, 0))

// limit=4: top-4 in descending order are z, y, x, c.
kvs, err := st.ReverseScanAt(ctx, nil, nil, 4, ^uint64(0))
require.NoError(t, err)
require.Len(t, kvs, 4)
require.Equal(t, []byte("z"), kvs[0].Key)
require.Equal(t, []byte("y"), kvs[1].Key)
require.Equal(t, []byte("x"), kvs[2].Key)
require.Equal(t, []byte("c"), kvs[3].Key)
}

// TestShardStoreReverseScanAt_SingleShard verifies that ReverseScanAt on a
// single shard returns results in descending key order.
func TestShardStoreReverseScanAt_SingleShard(t *testing.T) {
t.Parallel()

ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte(""), nil, 1)

groups := map[uint64]*ShardGroup{
1: {Store: store.NewMVCCStore()},
}
st := NewShardStore(engine, groups)

require.NoError(t, st.PutAt(ctx, []byte("a"), []byte("va"), 1, 0))
require.NoError(t, st.PutAt(ctx, []byte("b"), []byte("vb"), 2, 0))
require.NoError(t, st.PutAt(ctx, []byte("c"), []byte("vc"), 3, 0))

kvs, err := st.ReverseScanAt(ctx, nil, nil, 2, ^uint64(0))
require.NoError(t, err)
require.Len(t, kvs, 2)
require.Equal(t, []byte("c"), kvs[0].Key)
require.Equal(t, []byte("b"), kvs[1].Key)
}

// TestShardStoreReverseScanAt_IncludesS3ManifestKeysDescending mirrors
// TestShardStoreScanAt_IncludesS3ManifestKeysAcrossShards but for
// ReverseScanAt — results must be returned in descending key order.
func TestShardStoreReverseScanAt_IncludesS3ManifestKeysDescending(t *testing.T) {
t.Parallel()

ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte(""), []byte("m"), 1)
engine.UpdateRoute([]byte("m"), nil, 2)

groups := map[uint64]*ShardGroup{
1: {Store: store.NewMVCCStore()},
2: {Store: store.NewMVCCStore()},
}
st := NewShardStore(engine, groups)

k1 := s3keys.ObjectManifestKey("bucket-a", 1, "alpha")
k2 := s3keys.ObjectManifestKey("bucket-a", 1, "zeta")
require.NoError(t, st.PutAt(ctx, k1, []byte("m1"), 1, 0))
require.NoError(t, st.PutAt(ctx, k2, []byte("m2"), 2, 0))

start := s3keys.ObjectManifestPrefixForBucket("bucket-a", 1)
kvs, err := st.ReverseScanAt(ctx, start, prefixScanEnd(start), 10, ^uint64(0))
require.NoError(t, err)
require.Len(t, kvs, 2)
// "zeta" > "alpha" → descending order puts k2 first.
require.Equal(t, k2, kvs[0].Key)
require.Equal(t, k1, kvs[1].Key)
}

// TestMergeAndTrimReverseScanResults verifies that the helper merges two
// slices, sorts them in descending key order, and trims to the given limit.
func TestMergeAndTrimReverseScanResults(t *testing.T) {
t.Parallel()

out := []*store.KVPair{
{Key: []byte("z"), Value: []byte("vz")},
{Key: []byte("m"), Value: []byte("vm")},
}
kvs := []*store.KVPair{
{Key: []byte("y"), Value: []byte("vy")},
{Key: []byte("a"), Value: []byte("va")},
}

result := mergeAndTrimReverseScanResults(out, kvs, 3)
require.Len(t, result, 3)
require.Equal(t, []byte("z"), result[0].Key)
require.Equal(t, []byte("y"), result[1].Key)
require.Equal(t, []byte("m"), result[2].Key)
}

func TestMergeAndTrimReverseScanResults_EmptyInput(t *testing.T) {
t.Parallel()

out := []*store.KVPair{{Key: []byte("z"), Value: []byte("vz")}}
result := mergeAndTrimReverseScanResults(out, nil, 10)
require.Len(t, result, 1)
require.Equal(t, []byte("z"), result[0].Key)
}

func TestMergeAndTrimReverseScanResults_WithinLimit(t *testing.T) {
t.Parallel()

out := []*store.KVPair{{Key: []byte("z"), Value: []byte("vz")}}
kvs := []*store.KVPair{{Key: []byte("a"), Value: []byte("va")}}

result := mergeAndTrimReverseScanResults(out, kvs, 10)
require.Len(t, result, 2)
require.Equal(t, []byte("z"), result[0].Key)
require.Equal(t, []byte("a"), result[1].Key)
}

func TestMergeAndTrimReverseScanResults_ExactLimit(t *testing.T) {
t.Parallel()

out := []*store.KVPair{
{Key: []byte("z"), Value: []byte("vz")},
{Key: []byte("c"), Value: []byte("vc")},
}
kvs := []*store.KVPair{
{Key: []byte("y"), Value: []byte("vy")},
{Key: []byte("a"), Value: []byte("va")},
}

// limit=2: top-2 in descending order are "z", "y".
result := mergeAndTrimReverseScanResults(out, kvs, 2)
require.Len(t, result, 2)
require.Equal(t, []byte("z"), result[0].Key)
require.Equal(t, []byte("y"), result[1].Key)
}

func TestScanLockBoundsForKVs_ReverseOrder(t *testing.T) {
t.Parallel()

Expand Down
Loading