Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: use new bulk oracle for changefeed planning #120077

Merged

Conversation

rharding6373
Copy link
Collaborator

@rharding6373 rharding6373 commented Mar 7, 2024

This change uses the BulkOracle by default as part of changefeed planning, instead of the bin packing oracle. This will allow changefeeds to have plans that randomly assign spans to any replica, including followers if enabled, following locality filter constraints.

A new cluster setting, changefeed.balanced_distribution.enabled, protects this change. When enabled (by default), changefeeds will use the new BulkOracle for planning. If disabled, changefeeds will use the previous bin packing oracle.

Epic: none
Fixes: #119777
Fixes: #114611

Release note (enterprise change): Changefeeds now use the BulkOracle for planning, which distributes work evenly across all replica in the locality filter, including followers if enabled. This is enabled by default with the cluster setting
changefeed.balanced_distribution.enabled. If disabled, changefeed planning reverts to its previous bin packing oracle.

@rharding6373 rharding6373 requested a review from a team as a code owner March 7, 2024 20:49
@rharding6373 rharding6373 requested review from jayshrivastava and removed request for a team March 7, 2024 20:49
Copy link

blathers-crl bot commented Mar 7, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @dt, and @jayshrivastava)


pkg/ccl/changefeedccl/changefeed_dist.go line 360 at r1 (raw file):

var useBulkOracle = settings.RegisterBoolSetting(
	settings.ApplicationLevel,
	"changefeed.balanced_distribution.enabled",

@andyyang890 I keep going back and forth on whether it's better to be consistent with the backup naming or rename it to something that differentiates it from the existing distribution setting above. @dt do you have opinions on naming?

Copy link
Contributor

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @dt, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist.go line 360 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

@andyyang890 I keep going back and forth on whether it's better to be consistent with the backup naming or rename it to something that differentiates it from the existing distribution setting above. @dt do you have opinions on naming?

I think we should call it something else because we used to have a setting changefeed.balance_range_distribution.enable, which is very similar.


pkg/ccl/changefeedccl/changefeed_dist.go line 399 at r1 (raw file):

		planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */
			blankTxn, sql.DistributionType(distMode), oracle, locFilter)
		spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)

I think we need to add a little bit more for this change to be effective.

From what I understand, we get a list of spans from PartitionSpans which does not contain replica info. Then we eventually pass this list of spans to the dist sender and the dist sender makes its own decision as to which replica we end up reading from. The aggregator starts the kvfeed here

ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
which starts the rangefeed here
physicalCfg := rangeFeedConfig{
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
Knobs: f.knobs,
RangeObserver: f.rangeObserver,
}
. The only input to the rangefeed is a list of spans, so the rangefeed code must be deciding which replicas to read from independently. Erik mentioned that the dist sender goes for the closest replica.

This means that we cannot influence which replicas we read from by changing the planning code. However, we can change the span partitions, which is what this change does. It's very subtle - it.Desc() in the code below points to a replica (using the oracle). Then getSQLInstanceIDForKVNodeID() maps the span to a partition using the replica ID, but the replica ID itself does not end up mattering because its not an input to the dist sender.

it := planCtx.spanIter
// rSpan is the span we are currently partitioning.
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, 0, err
}
var lastSQLInstanceID base.SQLInstanceID
// lastKey maintains the EndKey of the last piece of `span`.
lastKey := rSpan.Key
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
}
// We break up rSpan into its individual ranges (which may or may not be on
// separate nodes). We then create "partitioned spans" using the end keys of
// these individual ranges.
for it.Seek(ctx, span, kvcoord.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, 0, it.Error()
}
replDesc, ignore, err := it.ReplicaInfo(ctx)
if err != nil {
return nil, 0, err
}
*ignoreMisplannedRanges = *ignoreMisplannedRanges || ignore
desc := it.Desc()
if log.V(1) {
descCpy := desc // don't let desc escape
log.Infof(ctx, "lastKey: %s desc: %s", lastKey, &descCpy)
}
if !desc.ContainsKey(lastKey) {
// This range must contain the last range's EndKey.
log.Fatalf(
ctx, "next range %v doesn't cover last end key %v. Partitions: %#v",
desc.RSpan(), lastKey, partitions,
)
}
sqlInstanceID, reason := getSQLInstanceIDForKVNodeID(replDesc.NodeID)

Looking at the implementation for getSQLInstanceIDForKVNodeID below, there's a lot going on. It takes into account mixed process mode (I'm honestly not sure what that is), the gateway node, closest instances etc. So even if the oracle chooses replias in a way that uniformly distributes them across nodes, I feel that this mapping function might result in an imbalanced distribution.

// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason), error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter
var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason)
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx)
}
if mixedProcessMode && locFilter.Empty() {
return mixedProcessSameNodeResolver, nil
}
// GetAllInstances only returns healthy instances.
instances, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, err
}
if len(instances) == 0 {
// For whatever reason, we think that we don't have any healthy
// instances (one example is someone explicitly removing the rows from
// the sql_instances table), but we always have the gateway pod to
// execute on, so we'll use it (unless we have a locality filter).
if locFilter.NonEmpty() {
return nil, noInstancesMatchingLocalityFilterErr
}
log.Warningf(ctx, "no healthy sql instances available for planning, only using the gateway")
return dsp.alwaysUseGatewayWithReason(SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES), nil
}
rng, _ := randutil.NewPseudoRand()
instancesHaveLocality := false
var gatewayIsEligible bool
if locFilter.NonEmpty() {
eligible := make([]sqlinstance.InstanceInfo, 0, len(instances))
for i := range instances {
if ok, _ := instances[i].Locality.Matches(locFilter); ok {
eligible = append(eligible, instances[i])
if instances[i].InstanceID == dsp.gatewaySQLInstanceID {
gatewayIsEligible = true
}
}
}
if len(eligible) == 0 {
return nil, noInstancesMatchingLocalityFilterErr
}
instances = eligible
instancesHaveLocality = true
} else {
for i := range instances {
if instances[i].Locality.NonEmpty() {
instancesHaveLocality = true
break
}
}
gatewayIsEligible = true
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "healthy SQL instances available for distributed planning: %v", instances)
}
// If we were able to determine the locality information for at least some
// instances, use the locality-aware resolver.
if instancesHaveLocality {
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
// Lookup the node localities to compare to the instance localities.
nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID)
if err != nil {
log.Eventf(ctx, "unable to get node descriptor for KV node %s", nodeID)
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_ON_ERROR
}
// If we're in mixed-mode, check if the picked node already matches the
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
nodeID, nodeDesc.Locality, locFilter,
)
}
}
// TODO(dt): Pre-compute / cache this result, e.g. in the instance reader.
if closest, _ := ClosestInstances(instances,
nodeDesc.Locality); len(closest) > 0 {
return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH
}
// No instances had any locality tiers in common with the node locality.
// At this point we pick the gateway if it is eligible, otherwise we pick
// a random instance from the eligible instances.
if !gatewayIsEligible {
return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_FILTERED_RANDOM
}
if dsp.shouldPickGateway(planCtx, instances) {
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH
} else {
// If the gateway has a disproportionate number of partitions pick a
// random instance that is not the gateway.
if planCtx.spanPartitionState.testingOverrideRandomSelection != nil {
return planCtx.spanPartitionState.testingOverrideRandomSelection(),
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
// NB: This random selection may still pick the gateway but that is
// alright as we are more interested in a uniform distribution rather
// than avoiding the gateway.
id := instances[rng.Intn(len(instances))].InstanceID
return id, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
}
return resolver, nil
}
// If no sql instances have locality information, fallback to a naive
// round-robin strategy that is completely locality-ignorant. Randomize the
// order in which we choose instances so that work is allocated fairly across
// queries.
rng.Shuffle(len(instances), func(i, j int) {
instances[i], instances[j] = instances[j], instances[i]
})
var i int
resolver := func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
id := instances[i%len(instances)].InstanceID
i++
return id, SpanPartitionReason_ROUND_ROBIN
}
return resolver, nil
}

Also, in some scenarios we use this other implementation for getSQLInstanceIDForKVNodeID here which uses the gateway node as a backup.

func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem(

Overall, I'm not confident about how uniformly the work is distributed after making this change because of thegetSQLInstanceIDForKVNodeID mapping part. I think we would need to change that to simply use the node which houses the replica which the oracle chose for us. I think this is something which can go in this PR. Note that we have changefeed.default_range_distribution_strategy=balanced_simple which calls rebalanceSpanPartitions and rebalances the partitions after distsql gives them to us. I think that setting is very good already and is used by our customers - the 2 main problems are (1) doing the rebalancing after distsql isn't optimal. Ideally distsql gives us a uniformly balanced plan; and (2) sometimes distsql gives us fewer partitions than there are available nodes, so we only rebalance on a smaller set of nodes than we could have used. I think these are problems which this PR can solve and would be helpful to solve. Using the bulk oracle + a new getSQLInstanceIDForKVNodeID function probably solves them. I'm pretty sure those are the two things which cause non-unform distributions. Btw the tests in pkg/ccl/changefeedccl/changefeed_dist_test.go are a good way to play around with planning changes you make!

Also, this PR asks the question - do we want to get rid of default_range_distribution_strategy? If we keep it, what would be its purpose?

One more thing - consider this: maybe if you have a small 10 range changefeed, you don't want to spread the work across 10 nodes. Adding a minimum number of ranges before you actually use these hyper-distributed oracles/planners might be a good idea.

@dt
Copy link
Member

dt commented Mar 11, 2024

@dt do you have opinions on naming?

No opinions about consistency with backup and it probably isn't something I'd worry about: ours is also undocumented and non-public.

In fact, we will probably actually just remove the cluster setting and just make it unconditionally the behavior soon / after 24.1 is cut since having too many knobs/differing versions of behavior has made it hard to reason about what is happening or how those are interacting in unexpected ways. Just recently we found a cluster that had toggled a restore setting that was causing it to behave strangely.

On the DR side we don't really care about optimizing for edge cases on the small end e.g. we don't worry about a small 10 span backup getting planned as a single processor over 10 spans rather than 10 single span processors. We need a 10 processor backup, cross processor overhead and all, to perform well enough in the cases where being that distributed it is the only option, and if it does then we might as well use it all the time, even when we don't strictly need it.

Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTRs!

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @dt, and @jayshrivastava)


pkg/ccl/changefeedccl/changefeed_dist.go line 360 at r1 (raw file):

Previously, jayshrivastava (Jayant) wrote…

I think we should call it something else because we used to have a setting changefeed.balance_range_distribution.enable, which is very similar.

I changed it.


pkg/ccl/changefeedccl/changefeed_dist.go line 399 at r1 (raw file):

Previously, jayshrivastava (Jayant) wrote…

I think we need to add a little bit more for this change to be effective.

From what I understand, we get a list of spans from PartitionSpans which does not contain replica info. Then we eventually pass this list of spans to the dist sender and the dist sender makes its own decision as to which replica we end up reading from. The aggregator starts the kvfeed here

ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
which starts the rangefeed here
physicalCfg := rangeFeedConfig{
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
Knobs: f.knobs,
RangeObserver: f.rangeObserver,
}
. The only input to the rangefeed is a list of spans, so the rangefeed code must be deciding which replicas to read from independently. Erik mentioned that the dist sender goes for the closest replica.

This means that we cannot influence which replicas we read from by changing the planning code. However, we can change the span partitions, which is what this change does. It's very subtle - it.Desc() in the code below points to a replica (using the oracle). Then getSQLInstanceIDForKVNodeID() maps the span to a partition using the replica ID, but the replica ID itself does not end up mattering because its not an input to the dist sender.

it := planCtx.spanIter
// rSpan is the span we are currently partitioning.
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, 0, err
}
var lastSQLInstanceID base.SQLInstanceID
// lastKey maintains the EndKey of the last piece of `span`.
lastKey := rSpan.Key
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
}
// We break up rSpan into its individual ranges (which may or may not be on
// separate nodes). We then create "partitioned spans" using the end keys of
// these individual ranges.
for it.Seek(ctx, span, kvcoord.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, 0, it.Error()
}
replDesc, ignore, err := it.ReplicaInfo(ctx)
if err != nil {
return nil, 0, err
}
*ignoreMisplannedRanges = *ignoreMisplannedRanges || ignore
desc := it.Desc()
if log.V(1) {
descCpy := desc // don't let desc escape
log.Infof(ctx, "lastKey: %s desc: %s", lastKey, &descCpy)
}
if !desc.ContainsKey(lastKey) {
// This range must contain the last range's EndKey.
log.Fatalf(
ctx, "next range %v doesn't cover last end key %v. Partitions: %#v",
desc.RSpan(), lastKey, partitions,
)
}
sqlInstanceID, reason := getSQLInstanceIDForKVNodeID(replDesc.NodeID)

Looking at the implementation for getSQLInstanceIDForKVNodeID below, there's a lot going on. It takes into account mixed process mode (I'm honestly not sure what that is), the gateway node, closest instances etc. So even if the oracle chooses replias in a way that uniformly distributes them across nodes, I feel that this mapping function might result in an imbalanced distribution.

// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason), error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter
var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason)
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx)
}
if mixedProcessMode && locFilter.Empty() {
return mixedProcessSameNodeResolver, nil
}
// GetAllInstances only returns healthy instances.
instances, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, err
}
if len(instances) == 0 {
// For whatever reason, we think that we don't have any healthy
// instances (one example is someone explicitly removing the rows from
// the sql_instances table), but we always have the gateway pod to
// execute on, so we'll use it (unless we have a locality filter).
if locFilter.NonEmpty() {
return nil, noInstancesMatchingLocalityFilterErr
}
log.Warningf(ctx, "no healthy sql instances available for planning, only using the gateway")
return dsp.alwaysUseGatewayWithReason(SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES), nil
}
rng, _ := randutil.NewPseudoRand()
instancesHaveLocality := false
var gatewayIsEligible bool
if locFilter.NonEmpty() {
eligible := make([]sqlinstance.InstanceInfo, 0, len(instances))
for i := range instances {
if ok, _ := instances[i].Locality.Matches(locFilter); ok {
eligible = append(eligible, instances[i])
if instances[i].InstanceID == dsp.gatewaySQLInstanceID {
gatewayIsEligible = true
}
}
}
if len(eligible) == 0 {
return nil, noInstancesMatchingLocalityFilterErr
}
instances = eligible
instancesHaveLocality = true
} else {
for i := range instances {
if instances[i].Locality.NonEmpty() {
instancesHaveLocality = true
break
}
}
gatewayIsEligible = true
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "healthy SQL instances available for distributed planning: %v", instances)
}
// If we were able to determine the locality information for at least some
// instances, use the locality-aware resolver.
if instancesHaveLocality {
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
// Lookup the node localities to compare to the instance localities.
nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID)
if err != nil {
log.Eventf(ctx, "unable to get node descriptor for KV node %s", nodeID)
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_ON_ERROR
}
// If we're in mixed-mode, check if the picked node already matches the
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
nodeID, nodeDesc.Locality, locFilter,
)
}
}
// TODO(dt): Pre-compute / cache this result, e.g. in the instance reader.
if closest, _ := ClosestInstances(instances,
nodeDesc.Locality); len(closest) > 0 {
return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH
}
// No instances had any locality tiers in common with the node locality.
// At this point we pick the gateway if it is eligible, otherwise we pick
// a random instance from the eligible instances.
if !gatewayIsEligible {
return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_FILTERED_RANDOM
}
if dsp.shouldPickGateway(planCtx, instances) {
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH
} else {
// If the gateway has a disproportionate number of partitions pick a
// random instance that is not the gateway.
if planCtx.spanPartitionState.testingOverrideRandomSelection != nil {
return planCtx.spanPartitionState.testingOverrideRandomSelection(),
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
// NB: This random selection may still pick the gateway but that is
// alright as we are more interested in a uniform distribution rather
// than avoiding the gateway.
id := instances[rng.Intn(len(instances))].InstanceID
return id, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
}
return resolver, nil
}
// If no sql instances have locality information, fallback to a naive
// round-robin strategy that is completely locality-ignorant. Randomize the
// order in which we choose instances so that work is allocated fairly across
// queries.
rng.Shuffle(len(instances), func(i, j int) {
instances[i], instances[j] = instances[j], instances[i]
})
var i int
resolver := func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
id := instances[i%len(instances)].InstanceID
i++
return id, SpanPartitionReason_ROUND_ROBIN
}
return resolver, nil
}

Also, in some scenarios we use this other implementation for getSQLInstanceIDForKVNodeID here which uses the gateway node as a backup.

func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem(

Overall, I'm not confident about how uniformly the work is distributed after making this change because of thegetSQLInstanceIDForKVNodeID mapping part. I think we would need to change that to simply use the node which houses the replica which the oracle chose for us. I think this is something which can go in this PR. Note that we have changefeed.default_range_distribution_strategy=balanced_simple which calls rebalanceSpanPartitions and rebalances the partitions after distsql gives them to us. I think that setting is very good already and is used by our customers - the 2 main problems are (1) doing the rebalancing after distsql isn't optimal. Ideally distsql gives us a uniformly balanced plan; and (2) sometimes distsql gives us fewer partitions than there are available nodes, so we only rebalance on a smaller set of nodes than we could have used. I think these are problems which this PR can solve and would be helpful to solve. Using the bulk oracle + a new getSQLInstanceIDForKVNodeID function probably solves them. I'm pretty sure those are the two things which cause non-unform distributions. Btw the tests in pkg/ccl/changefeedccl/changefeed_dist_test.go are a good way to play around with planning changes you make!

Also, this PR asks the question - do we want to get rid of default_range_distribution_strategy? If we keep it, what would be its purpose?

One more thing - consider this: maybe if you have a small 10 range changefeed, you don't want to spread the work across 10 nodes. Adding a minimum number of ranges before you actually use these hyper-distributed oracles/planners might be a good idea.

We discussed this a bit offline. The high level summary is that we want to limit the number of changes in this PR for this release, so we're going to leave potential improvements for the future.

With the bulk oracle, we expect that the span partitions will be fairly evenly distributed, since they're randomly chosen among all replicas of the range fitting the locality filter. DistSQL chooses a SQL node that is the same node returned by the oracle, the closest node using the locality filter, or does round robin assignment. Therefore we expect that most of the time DistSQL assignments will not deviate too much from the bulk oracle. So most of the time we expect that we won't need to rebalance in the changefeed.

We may consider deprecating default_range_distribution_strategy in the future if it no longer becomes useful with the bulk oracle.

I could add a threshold at which we apply the bulk oracle as another safeguard, but I wanted to get your opinion on at what # of ranges (or spans) we think that it would make a difference to distribute more or less. How common are changefeeds on very small tables? I glanced at some cloud metrics but couldn't tease out the smallest changefeeds (the lowest # of ranges on a cluster running a changefeed is 85, but that isn't a full picture). It seems like we could spend a bit more time discussing whether CDC behavior is like what David said backups are, where we'd prefer to involve as many nodes as possible spreading the spans among them as evenly as possible.

Copy link
Contributor

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @dt, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist.go line 399 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

We discussed this a bit offline. The high level summary is that we want to limit the number of changes in this PR for this release, so we're going to leave potential improvements for the future.

With the bulk oracle, we expect that the span partitions will be fairly evenly distributed, since they're randomly chosen among all replicas of the range fitting the locality filter. DistSQL chooses a SQL node that is the same node returned by the oracle, the closest node using the locality filter, or does round robin assignment. Therefore we expect that most of the time DistSQL assignments will not deviate too much from the bulk oracle. So most of the time we expect that we won't need to rebalance in the changefeed.

We may consider deprecating default_range_distribution_strategy in the future if it no longer becomes useful with the bulk oracle.

I could add a threshold at which we apply the bulk oracle as another safeguard, but I wanted to get your opinion on at what # of ranges (or spans) we think that it would make a difference to distribute more or less. How common are changefeeds on very small tables? I glanced at some cloud metrics but couldn't tease out the smallest changefeeds (the lowest # of ranges on a cluster running a changefeed is 85, but that isn't a full picture). It seems like we could spend a bit more time discussing whether CDC behavior is like what David said backups are, where we'd prefer to involve as many nodes as possible spreading the spans among them as evenly as possible.

Spoke about this offline. We don't have a good answer for what the threshold should be. Also, for very small changefeeds, distributing as much as possible isn't significantly worse than assigning all the ranges to one node. In fact, it adds a problem where you could assign different changefeeds entirely to the same node, overloading it. You avoid this when you distribute as much as possible.

@rharding6373 rharding6373 force-pushed the 20230307_new_bulk_oracle_119777 branch 2 times, most recently from c2ed57f to e59c65f Compare March 20, 2024 23:33
This change uses the BulkOracle by default as part of changefeed
planning, instead of the bin packing oracle. This will allow changefeeds
to have plans that randomly assign spans to any replica, including
followers if enabled, following locality filter constraints.

A new cluster setting, `changefeed.balanced_distribution.enabled`,
protects this change. When enabled (by default), changefeeds will use
the new BulkOracle for planning. If disabled, changefeeds will use the
previous bin packing oracle.

Epic: none
Fixes: cockroachdb#119777
Fixes: cockroachdb#114611

Release note (enterprise change): Changefeeds now use the BulkOracle for
planning, which distributes work evenly across all replica in the
locality filter, including followers if enabled. This is enabled by
default with the cluster setting
`changefeed.balanced_distribution.enabled`. If disabled, changefeed
planning reverts to its previous bin packing oracle.
@rharding6373 rharding6373 force-pushed the 20230307_new_bulk_oracle_119777 branch from e59c65f to 84e0e51 Compare March 21, 2024 15:33
@rharding6373
Copy link
Collaborator Author

TFTR!

bors r+

@craig
Copy link
Contributor

craig bot commented Mar 22, 2024

@craig craig bot merged commit 5242dcb into cockroachdb:master Mar 22, 2024
20 of 36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: use new bulk oracle for planning cdc: Enable follower reads for changefeed
4 participants