Skip to content

Commit

Permalink
rangefeed: unconditionally use rangefeed scheduler
Browse files Browse the repository at this point in the history
This patch unconditionally uses the new rangefeed scheduler. This has no
mixed-version concerns, since the scheduler is internal to each node and
uses the same client protocol.

Epic: none
Release note (ops change): the setting `kv.rangefeed.scheduler.enabled`
has been removed, as the rangefeed scheduler is now unconditionally
enabled.
  • Loading branch information
erikgrinaker committed Jan 3, 2024
1 parent 1dbdbe9 commit 9f20cd9
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 204 deletions.
10 changes: 0 additions & 10 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2649,16 +2649,6 @@ func (cfc *changefeedCreator) applySettings() error {
}
}

schedEnabled := cfc.flags.RangeFeedScheduler.enabled(cfc.rng)
if schedEnabled != featureUnset {
cfc.logger.Printf("Setting kv.rangefeed.scheduler.enabled to %t", schedEnabled.bool())
if _, err := cfc.db.Exec(
"SET CLUSTER SETTING kv.rangefeed.scheduler.enabled = $1", schedEnabled.bool(),
); err != nil {
return err
}
}

rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng,
chooseDistributionStrategy)
if rangeDistributionEnabled == featureEnabled {
Expand Down
58 changes: 19 additions & 39 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
)

type cdcBenchScanType string
type cdcBenchServer string
type cdcBenchProtocol string

const (
Expand All @@ -62,10 +61,6 @@ const (
// practice it can.
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"

cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler

cdcBenchNoProtocol cdcBenchProtocol = ""
cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol
cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol
Expand All @@ -74,7 +69,6 @@ const (
var (
cdcBenchScanTypes = []cdcBenchScanType{
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer, cdcBenchSchedulerServer}
cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol}
)

Expand Down Expand Up @@ -133,30 +127,28 @@ func registerCDCBench(r registry.Registry) {
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", "")
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "")
},
})

// Workloads with a concurrent changefeed running.
for _, server := range cdcBenchServers {
for _, protocol := range cdcBenchProtocols {
server, protocol := server, protocol // pin loop variables
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=%s/format=%s/sink=null",
readPercent, nodes, cpus, formatSI(ranges), server, protocol, format),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, protocol, format)
},
})
}
for _, protocol := range cdcBenchProtocols {
protocol := protocol // pin loop variables
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=scheduler/protocol=%s/format=%s/sink=null",
readPercent, nodes, cpus, formatSI(ranges), protocol, format),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, protocol, format)
},
})
}
}
}
Expand Down Expand Up @@ -382,7 +374,6 @@ func runCDCBenchWorkload(
c cluster.Cluster,
numRanges int64,
readPercent int,
server cdcBenchServer,
protocol cdcBenchProtocol,
format string,
) {
Expand All @@ -403,8 +394,7 @@ func runCDCBenchWorkload(
insertCount = 1_000_000 // ingest some data to read
}
// Either of these will disable changefeeds. Make sure they're all disabled.
if server == "" || protocol == "" || format == "" {
require.Empty(t, server)
if protocol == "" || format == "" {
require.Empty(t, protocol)
require.Empty(t, format)
cdcEnabled = false
Expand All @@ -425,16 +415,6 @@ func runCDCBenchWorkload(
t.Fatalf("unknown protocol %q", protocol)
}

switch server {
case cdcBenchProcessorServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "false"
case cdcBenchSchedulerServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "true"
case cdcBenchNoServer:
default:
t.Fatalf("unknown server type %q", server)
}

c.Start(ctx, t.L(), opts, settings, nData)
m := c.NewMonitor(ctx, nData.Merge(nCoord))

Expand Down
8 changes: 7 additions & 1 deletion pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,13 @@ const v232CV = "23.2"
func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported(
r *rand.Rand, h *mixedversion.Helper,
) (bool, error) {
return h.ClusterVersionAtLeast(r, v232CV)
cv, err := h.ClusterVersion(r)
if err != nil {
return false, err
}
// kv.rangefeed.scheduler.enabled only exists in 23.2. In 24.1, it is enabled
// unconditionally.
return cv.Major == 23 && cv.Minor == 2 && cv.Internal == 0, nil
}

func (cmvt *cdcMixedVersionTester) distributionStrategySupported(
Expand Down
84 changes: 16 additions & 68 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package rangefeed_test

import (
"context"
"fmt"
"runtime/pprof"
"sync"
"testing"
Expand Down Expand Up @@ -57,31 +56,24 @@ var (

type kvs = storageutils.KVs

type feedProcessorType struct {
type rangefeedType struct {
useMuxRangefeed bool
useScheduler bool
}

func (t feedProcessorType) String() string {
func (t rangefeedType) String() string {
if t.useMuxRangefeed {
return fmt.Sprintf("mux/scheduler=%t", t.useScheduler)
return "mux"
} else {
return fmt.Sprintf("single/scheduler=%t", t.useScheduler)
return "single"
}
}

var procTypes = []feedProcessorType{
var rangefeedTypes = []rangefeedType{
{
useMuxRangefeed: false,
useScheduler: false,
},
{
useMuxRangefeed: true,
useScheduler: false,
},
{
useMuxRangefeed: true,
useScheduler: true,
},
}

Expand All @@ -91,13 +83,9 @@ func TestRangeFeedIntegration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -194,13 +182,9 @@ func TestWithOnFrontierAdvance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Settings: settings},
Expand Down Expand Up @@ -326,13 +310,9 @@ func TestWithOnCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -433,13 +413,9 @@ func TestRangefeedValueTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -554,13 +530,9 @@ func TestWithOnSSTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109473),
Expand Down Expand Up @@ -658,15 +630,11 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {

storage.DisableMetamorphicSimpleValueEncoding(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109473),
Expand Down Expand Up @@ -773,13 +741,9 @@ func TestWithOnDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Expand Down Expand Up @@ -961,13 +925,9 @@ func TestUnrecoverableErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109472),
Knobs: base.TestingKnobs{
Expand Down Expand Up @@ -1062,15 +1022,11 @@ func TestMVCCHistoryMutationError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -1153,13 +1109,9 @@ func TestRangefeedWithLabelsOption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -1273,13 +1225,9 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
testutils.RunValues(t, "rangefeed_type", rangefeedTypes, func(t *testing.T, s rangefeedType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down Expand Up @@ -417,7 +416,6 @@ go_test(
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/replicastats",
Expand Down

0 comments on commit 9f20cd9

Please sign in to comment.