Skip to content

Commit

Permalink
Merge #115166
Browse files Browse the repository at this point in the history
115166: changefeedccl: add changefeed range distribution strategies r=jayshrivastava a=jayshrivastava

### changefeedccl: add changefeed range distribution strategies 

This commit introduces several changes.

It makes `replicaoracle.BinPackingChoice` the explicit default for changefeeds.
Previously, we would use `physicalplan.DefaultReplicaChooser`, which would point
to the bin packing oracle. This change is better since it gives the changefeed package more control over its dependencies.

A new cluster setting `changefeed.default_range_distribution_strategy` is added to
 specify how changefeeds should distribute work. In the default case,
`none`, we defer to distsql to select nodes and distribute work among them. This
is the same behavior as running a changefeed today.

In the other case, `balanced_simple`, we still let distsql choose nodes for us, but we attempt
to evently distribute ranges over them. This case is exactly the same as setting
`changefeed.balance_range_distribution.enable = true`, but it is allowed to be used
outside of initial scans (`changefeed.balance_range_distribution.enable` was only allowed
with `initial_scan=only`.

This change also deprecates `changefeed.balance_range_distribution.enable` in favor of
`changefeed.default_range_distribution_strategy` because the latter can be expanded
in the future to add more functionality.

This change also moves changefeed distribution tests to a dedicated file:
`changefeed_dist_test.go`. This file contains a tester which can be used to
start a cluster with a given topology and range distribution. Changefeed tests
can use the tester to perform various load balancing strategies. Right now, the new
tests assert how ranges are distributed after planning changefeeds, which implicitly tests
code outside of the changefeedccl package (ie. `dsp.PartitionSpans()`). However,
this is important since changefeeds rely on the behavior of this black box to perform well.
In the future, cdc will implement more distribution strategies and likely implement its
own partitioning logic. Tests for these strategies can be added in this file. Over time,
cdc can move away from distsql planning.

Informs: #113898
Epic: None

Release note (enterprise change): `changefeed.balance_range_distribution.enable` is now deprecated.
Users should use a new cluster setting `changefeed.default_range_distribution_strategy`
instead. `changefeed.default_range_distribution_strategy='balanced_simple'` has the same
effect as setting `changefeed.balance_range_distribution.enable=true`. It does not require
`initial_scan='only'`, which was required by the old setting.

### roachtest/cdc: metamorphically determine range distribution strategy

This change adds a new metamorphic flag in cdc roachtests which choses
a distribution strategy randomly.

Release note: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed Dec 19, 2023
2 parents 57782da + 0b1f895 commit 716a9f3
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 120 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ bulkio.backup.read_with_priority_after duration 1m0s amount of time since the re
changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency application
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.balance_range_distribution.enabled boolean false if enabled, the ranges are balanced equally among all nodes. Note that this is supported only in export mode with initial_scan=only. application
changefeed.batch_reduction_retry.enabled boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0.1</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-balance-range-distribution-enable" class="anchored"><code>changefeed.balance_range_distribution.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes. Note that this is supported only in export mode with initial_scan=only.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/physicalplan",
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/sql/privilege",
"//pkg/sql/protoreflect",
"//pkg/sql/roleoption",
Expand Down Expand Up @@ -183,6 +184,7 @@ go_test(
srcs = [
"alter_changefeed_test.go",
"avro_test.go",
"changefeed_dist_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_test.go",
Expand Down
87 changes: 56 additions & 31 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -315,14 +316,38 @@ func startDistChangefeed(
return ctxgroup.GoAndWait(ctx, execPlan)
}

var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
// The bin packing choice gives preference to leaseholder replicas if possible.
var replicaOracleChoice = replicaoracle.BinPackingChoice

type rangeDistributionType int

const (
// defaultDistribution employs no load balancing on the changefeed
// side. We defer to distsql to select nodes and distribute work.
defaultDistribution rangeDistributionType = 0
// balancedSimpleDistribution defers to distsql for selecting the
// set of nodes to distribute work to. However, changefeeds will try to
// distribute work evenly across this set of nodes.
balancedSimpleDistribution rangeDistributionType = 1
// TODO(jayant): add balancedFullDistribution which takes
// full control of node selection and distribution.
)

// RangeDistributionStrategy is used to determine how the changefeed balances
// ranges between nodes.
// TODO: deprecate this setting in favor of a changefeed option.
var RangeDistributionStrategy = settings.RegisterEnumSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.enable",
"if enabled, the ranges are balanced equally among all nodes. "+
"Note that this is supported only in export mode with initial_scan=only.",
util.ConstantWithMetamorphicTestBool(
"changefeed.balance_range_distribution.enabled", false),
settings.WithName("changefeed.balance_range_distribution.enabled"),
"changefeed.default_range_distribution_strategy",
"configures how work is distributed among nodes for a given changefeed. "+
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
"will not override locality restrictions",
util.ConstantWithMetamorphicTestChoice("default_range_distribution_strategy",
"default", "balanced_simple").(string),
map[int64]string{
int64(defaultDistribution): "default",
int64(balancedSimpleDistribution): "balanced_simple",
},
settings.WithPublic)

func makePlan(
Expand All @@ -335,6 +360,8 @@ func makePlan(
drainingNodes []roachpb.NodeID,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
sv := &execCtx.ExecCfg().Settings.SV
maybeCfKnobs, haveKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
var blankTxn *kv.Txn

distMode := sql.DistributionTypeAlways
Expand All @@ -350,41 +377,39 @@ func makePlan(
}
}

planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
sql.DistributionType(distMode), physicalplan.DefaultReplicaChooser, locFilter)
rangeDistribution := RangeDistributionStrategy.Get(sv)
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */
blankTxn, sql.DistributionType(distMode), oracle, locFilter)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
}

cfKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil &&
knobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {
spanPartitions, err = knobs.FilterDrainingNodes(spanPartitions, drainingNodes)
switch {
case distMode == sql.DistributionTypeNone || rangeDistribution == int64(defaultDistribution):
case rangeDistribution == int64(balancedSimpleDistribution):
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
rangeDistribution, distMode)
}

sv := &execCtx.ExecCfg().Settings.SV
if enableBalancedRangeDistribution.Get(sv) {
scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {
spanPartitions, err = maybeCfKnobs.FilterDrainingNodes(spanPartitions, drainingNodes)
if err != nil {
return nil, nil, err
}
}

// Currently, balanced range distribution supported only in export mode.
// TODO(yevgeniy): Consider lifting this restriction.
if scanType == changefeedbase.OnlyInitialScan {
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
}
if haveKnobs && maybeCfKnobs.SpanPartitionsCallback != nil {
maybeCfKnobs.SpanPartitionsCallback(spanPartitions)
}

// Use the same checkpoint for all aggregators; each aggregator will only look at
Expand Down Expand Up @@ -434,8 +459,8 @@ func makePlan(
UserProto: execCtx.User().EncodeProto(),
}

if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.OnDistflowSpec != nil {
knobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec)
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {
maybeCfKnobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec)
}

aggregatorCorePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions))
Expand Down

0 comments on commit 716a9f3

Please sign in to comment.