Skip to content

Commit

Permalink
spanconfig: correctly order updates when applying in the KVSubscriber
Browse files Browse the repository at this point in the history
Previously, there was no ordering guarantee between KVSubscriber
events at the same timestamp. As a result, if a batch of events
included updates to overlapping spans at the same timestamp, we could
apply additions before deletions -- this would cause the additions to
get clobbered, which was not the intention. This could lead to missing
span configurations, resulting in bugs such as the linked issue.

This patch fixes the issue by sorting deletions before additions if
two span configuration events have the same timestamp.

Closes #110908

Release note (bug fix): Previously, altering from a Regional By Row
table to a Regional By Table table could cause leaseholders to never
move to the databse's primary region. This is now fixed.
  • Loading branch information
arulajmani committed Feb 5, 2024
1 parent c0dc3be commit 73cb0da
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ ALL_TESTS = [
"//pkg/ccl/serverccl/statusccl:statusccl_test",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/spanconfigccl/spanconfigkvaccessorccl:spanconfigkvaccessorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigkvsubscriberccl:spanconfigkvsubscriberccl_test",
"//pkg/ccl/spanconfigccl/spanconfiglimiterccl:spanconfiglimiterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
Expand Down Expand Up @@ -917,6 +918,7 @@ GO_TARGETS = [
"//pkg/ccl/serverccl:serverccl",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/spanconfigccl/spanconfigkvaccessorccl:spanconfigkvaccessorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigkvsubscriberccl:spanconfigkvsubscriberccl_test",
"//pkg/ccl/spanconfigccl/spanconfiglimiterccl:spanconfiglimiterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -26,6 +27,7 @@ type multiRegionTestClusterParams struct {
replicationMode base.TestClusterReplicationMode
useDatabase string
settings *cluster.Settings
scanInterval time.Duration
}

// MultiRegionTestClusterParamsOption is an option that can be passed to
Expand Down Expand Up @@ -64,6 +66,14 @@ func WithSettings(settings *cluster.Settings) MultiRegionTestClusterParamsOption
}
}

// WithScanInterval is used to configure the scan interval for various KV
// queues.
func WithScanInterval(interval time.Duration) MultiRegionTestClusterParamsOption {
return func(params *multiRegionTestClusterParams) {
params.scanInterval = interval
}
}

// TestingCreateMultiRegionCluster creates a test cluster with numServers number
// of nodes and the provided testing knobs applied to each of the nodes. Every
// node is placed in its own locality, named "us-east1", "us-east2", and so on.
Expand Down Expand Up @@ -113,6 +123,7 @@ func TestingCreateMultiRegionClusterWithRegionList(
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: region}},
},
ScanInterval: params.scanInterval,
}
totalServerCount++
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigkvsubscriberccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "spanconfigkvsubscriberccl_test",
srcs = [
"kvsubscriber_test.go",
"main_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
"//pkg/jobs",
"//pkg/kv/kvserver",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigkvsubscriberccl

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestSpanConfigUpdatesApplyInCorrectOrder ensures the KVSubscriber applies
// updates to its in-memory store in correct order. In particular, when there
// are both deletions and additions at the same timestamp, the deletions should
// be applied before the additions. This scenario is created by altering from a
// regional by row table to a regional table -- doing so creates overlapping
// updates with both deletions and additions with the same timestamo.
//
// Regression test for https://github.com/cockroachdb/cockroach/issues/110908.
func TestSpanConfigUpdatesApplyInCorrectOrder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true, // we don't want the partitions to merge away
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes.
}, multiregionccltestutils.WithScanInterval(50*time.Millisecond))
defer cleanup()
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))

// Speed up the test.
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '50ms'`)

sqlDB.Exec(t, `CREATE DATABASE mr PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"; USE mr`)
sqlDB.Exec(t, `CREATE TABLE t() LOCALITY REGIONAL BY ROW`)

testutils.SucceedsSoon(t, func() error {
var count int
sqlDB.QueryRow(t,
"SELECT count(distinct lease_holder) from [show ranges from table t with details]",
).Scan(&count)
if count == 3 {
return nil
}
return fmt.Errorf("waiting for each region to pick up leaseholders; count %d", count)
})

sqlDB.Exec(t, `ALTER TABLE t SET LOCALITY REGIONAL BY TABLE`)
testutils.SucceedsSoon(t, func() error {
var count int
sqlDB.QueryRow(t,
"SELECT count(distinct lease_holder) from [show ranges from table t with details]",
).Scan(&count)
if count == 1 {
return nil
}
return fmt.Errorf(
"waiting for all partition leases to move to the primary region; number of regions %d", count,
)
})
}
33 changes: 33 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigkvsubscriberccl/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigkvsubscriberccl

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
defer ccl.TestingEnableEnterprise()()
securityassets.SetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
22 changes: 22 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package spanconfigkvsubscriber

import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -439,6 +440,27 @@ func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) {
func (s *KVSubscriber) handlePartialUpdate(
ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event,
) {
// The events we've received from the rangefeed buffer are sorted in
// increasing timestamp order. However, any updates with the same timestamp
// may be ordered arbitrarily. That's okay if they don't overlap. However, if
// they do overlap, the assumption is that an overlapping delete should be
// ordered before an addition it overlaps with -- not doing would cause the
// addition to get clobbered by the deletion, which will result in the store
// having missing span configurations. As such, we re-sort the list of events
// before applying it to our store, using Deletion() as a tie-breaker when
// timestamps are equal.
sort.Slice(events, func(i, j int) bool {
switch events[i].Timestamp().Compare(events[j].Timestamp()) {
case -1: // ts(i) < ts(j)
return true
case 1: // ts(i) > ts(j)
return false
case 0: // ts(i) == ts(j); deletions sort before additions
return events[i].(*BufferEvent).Deletion() // no need to worry about the sort being stable
default:
panic("unexpected")
}
})
handlers := func() []handler {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 73cb0da

Please sign in to comment.