Skip to content

Commit

Permalink
Revert "rangefeed: unconditionally use rangefeed scheduler"
Browse files Browse the repository at this point in the history
This reverts commit 9f20cd9.
  • Loading branch information
erikgrinaker committed Mar 27, 2024
1 parent 8eb7ca9 commit d8f53d5
Show file tree
Hide file tree
Showing 7 changed files with 1,203 additions and 992 deletions.
11 changes: 11 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ func (f *enumFeatureFlag) enabled(r enthropy, choose func(enthropy) string) (str
// cdcFeatureFlags describes various cdc feature flags.
// zero value cdcFeatureFlags uses metamorphic settings for features.
type cdcFeatureFlags struct {
RangeFeedScheduler featureFlag
SchemaLockTables featureFlag
DistributionStrategy enumFeatureFlag
}
Expand Down Expand Up @@ -2864,6 +2865,16 @@ func (cfc *changefeedCreator) applySettings() error {
return err
}

schedEnabled := cfc.flags.RangeFeedScheduler.enabled(cfc.rng)
if schedEnabled != featureUnset {
cfc.logger.Printf("Setting kv.rangefeed.scheduler.enabled to %t", schedEnabled == featureEnabled)
if _, err := cfc.db.Exec(
"SET CLUSTER SETTING kv.rangefeed.scheduler.enabled = $1", schedEnabled == featureEnabled,
); err != nil {
return err
}
}

rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng,
chooseDistributionStrategy)
if rangeDistributionEnabled == featureEnabled {
Expand Down
58 changes: 42 additions & 16 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
)

type cdcBenchScanType string
type cdcBenchServer string

const (
// cdcBenchInitialScan runs an initial scan across a table, i.e. it scans and
Expand All @@ -59,11 +60,16 @@ const (
// do so efficiently. Ideally, this wouldn't take any time at all, but in
// practice it can.
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"

cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler
)

var (
cdcBenchScanTypes = []cdcBenchScanType{
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer, cdcBenchSchedulerServer}
)

func registerCDCBench(r registry.Registry) {
Expand Down Expand Up @@ -119,26 +125,29 @@ func registerCDCBench(r registry.Registry) {
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "")
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "")
},
})

// Workloads with a concurrent changefeed running.
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=scheduler/protocol=mux/format=%s/sink=null",
readPercent, nodes, cpus, formatSI(ranges), format),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, format)
},
})
for _, server := range cdcBenchServers {
server := server // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=null",
readPercent, nodes, cpus, formatSI(ranges), server, format),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format)
},
})
}
}
}
}
Expand Down Expand Up @@ -352,6 +361,7 @@ func runCDCBenchWorkload(
c cluster.Cluster,
numRanges int64,
readPercent int,
server cdcBenchServer,
format string,
) {
const sink = "null://"
Expand All @@ -370,12 +380,28 @@ func runCDCBenchWorkload(
if readPercent == 100 {
insertCount = 1_000_000 // ingest some data to read
}
// Either of these will disable changefeeds. Make sure they're all disabled.
if server == "" || format == "" {
require.Empty(t, server)
require.Empty(t, format)
cdcEnabled = false
}

// Start data nodes first to place data on them. We'll start the changefeed
// coordinator later, since we don't want any data on it.
opts, settings := makeCDCBenchOptions(c)
settings.ClusterSettings["kv.rangefeed.enabled"] = strconv.FormatBool(cdcEnabled)

switch server {
case cdcBenchProcessorServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "false"
case cdcBenchSchedulerServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "true"
case cdcBenchNoServer:
default:
t.Fatalf("unknown server type %q", server)
}

c.Start(ctx, t.L(), opts, settings, nData)
m := c.NewMonitor(ctx, nData.Merge(nCoord))

Expand Down
26 changes: 16 additions & 10 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
}

var ff cdcFeatureFlags
rangefeedSchedulerSupported, err := cmvt.rangefeedSchedulerSupported(r, h)
if err != nil {
return err
}
if !rangefeedSchedulerSupported {
ff.RangeFeedScheduler.v = &featureUnset
}

distributionStrategySupported, err := cmvt.distributionStrategySupported(r, h)
if err != nil {
return err
Expand Down Expand Up @@ -426,17 +434,14 @@ func (cmvt *cdcMixedVersionTester) muxRangeFeedSupported(
)
}

const v232CV = "23.2"
const v241CV = "24.1"

func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported(
h *mixedversion.Helper,
) (bool, option.NodeListOption, error) {
// kv.rangefeed.scheduler.enabled only exists in 23.2. In 24.1, it is enabled
// unconditionally.
return canMixedVersionUseDeletedClusterSetting(h,
clusterupgrade.MustParseVersion("v23.2.0"),
clusterupgrade.MustParseVersion("v24.1.0-alpha.00000000"),
)
r *rand.Rand, h *mixedversion.Helper,
) (bool, error) {
// kv.rangefeed.scheduler.enabled only exists since 23.2.
return h.ClusterVersionAtLeast(r, v232CV)
}

func (cmvt *cdcMixedVersionTester) distributionStrategySupported(
Expand Down Expand Up @@ -530,14 +535,15 @@ func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) {

// Rangefeed scheduler available in 23.2
setRangeFeedSchedulerEnabled := func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error {
supported, gatewayNodes, err := tester.rangefeedSchedulerSupported(h)
supported, err := tester.rangefeedSchedulerSupported(r, h)
if err != nil {
return err
}
l.Printf("kv.rangefeed.scheduler.enabled=%t", supported)
if supported {
coin := r.Int()%2 == 0
l.PrintfCtx(ctx, "Setting kv.rangefeed.scheduler.enabled=%t", coin)
return h.ExecWithGateway(r, gatewayNodes, "SET CLUSTER SETTING kv.rangefeed.scheduler.enabled=$1", coin)
return h.Exec(r, "SET CLUSTER SETTING kv.rangefeed.scheduler.enabled=$1", coin)
}
return nil
}
Expand Down

0 comments on commit d8f53d5

Please sign in to comment.