Skip to content

Commit

Permalink
sql: prevent gateway from always being picked
Browse files Browse the repository at this point in the history
Previously, the instance resoler would always
assign the partition span to the gateway if the
gateway was in the set of eligible instances and
we did not find an eligible instance with a better
locality match. In large clusters during backup/cdc
running with execution locality, this could cause
the gateway to get the lions share of work thereby
causing it to OOM or severely throttle performance.

This change make span partitioning a little more
stateful. Concretely, we now track how many partition
spans have been assigned to each node in the `planCtx`
that is used throughout the planning of a single statement.
This distribution is then used to limit the number of
partition spans we default to the gateway. Currently, by
default we allow the gateway to have:

`2 * average number of partition spans across the other instances`

If the gateway does not satisfy this heuristic we randomly
pick one of the other eligible instances. Note, if there
are no eligible instances except for the gateway, or the
gateway has received no spans yet, we will pick the gateway.

This change also adds a new session variable `distsql_plan_gateway_bias`
to control how many times the gateway will be picked as the default
target for a partition relative to the distribution of
partition spans across other nodes.

Fixes: #114079
Release note (bug fix): fixes a bug where large jobs running
with execution locality could result in the gateway being assigned
most of the work causing performance degradation and cluster
instability
  • Loading branch information
adityamaru committed Nov 15, 2023
1 parent ca4b6b0 commit 4d7aa01
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 37 deletions.
132 changes: 110 additions & 22 deletions pkg/sql/distsql_physical_planner.go
Expand Up @@ -815,13 +815,44 @@ const (
NodeDistSQLVersionIncompatible
)

// spanPartitionState captures information about the current state of the
// partitioning that has occurred during the planning process.
type spanPartitionState struct {
// partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of
// times we have picked an instance for that reason.
partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int

// partitionSpans is a mapping from a SQLInstanceID to the number of
// partition spans that have been assigned to that node.
partitionSpans map[base.SQLInstanceID]int

// totalPartitionSpans is the total number of partitions that have been processed
// so far.
totalPartitionSpans int

testingOverrideRandomSelection func() base.SQLInstanceID
}

// update updates the spanPartitionState with the information about the new span partition.
func (p *spanPartitionState) update(
partitionNode base.SQLInstanceID, partitionReason SpanPartitionReason,
) {
p.totalPartitionSpans++
p.partitionSpanDecisions[partitionReason]++
p.partitionSpans[partitionNode]++
}

// PlanningCtx contains data used and updated throughout the planning process of
// a single query.
type PlanningCtx struct {
ExtendedEvalCtx *extendedEvalContext

localityFilter roachpb.Locality

// spanPartitionState captures information about the current state of the
// partitioning that has occurred during the planning process.
spanPartitionState *spanPartitionState

spanIter physicalplan.SpanResolverIterator
// nodeStatuses contains info for all SQLInstanceIDs that are referenced by
// any PhysicalPlan we generate with this context.
Expand Down Expand Up @@ -1094,40 +1125,44 @@ type SpanPartitionReason int32

const (
// SpanPartitionReason_UNSPECIFIED is reported when the reason is unspecified.
SpanPartitionReason_UNSPECIFIED SpanPartitionReason = 0
SpanPartitionReason_UNSPECIFIED SpanPartitionReason = iota
// SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY is reported when the target
// node is unhealthy and so we default to the gateway node.
SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY SpanPartitionReason = 1
SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY
// SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES is reported when there are
// no healthy instances and so we default to the gateway node.
SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES SpanPartitionReason = 2
SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES
// SpanPartitionReason_GATEWAY_ON_ERROR is reported when there is an error and
// so we default to the gateway node.
SpanPartitionReason_GATEWAY_ON_ERROR SpanPartitionReason = 3
SpanPartitionReason_GATEWAY_ON_ERROR
// SpanPartitionReason_TARGET_HEALTHY is reported when the target node is
// healthy.
SpanPartitionReason_TARGET_HEALTHY SpanPartitionReason = 4
SpanPartitionReason_TARGET_HEALTHY
// SpanPartitionReason_CLOSEST_LOCALITY_MATCH is reported when we picked an
// instance with the closest match to the provided locality filter.
SpanPartitionReason_CLOSEST_LOCALITY_MATCH SpanPartitionReason = 5
SpanPartitionReason_CLOSEST_LOCALITY_MATCH
// SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH is reported when there is no
// match to the provided locality filter and so we default to the gateway.
SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH SpanPartitionReason = 6
// SpanPartitionReason_LOCALITY_AWARE_RANDOM is reported when there is no
SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH
// SpanPartitionReason_LOCALITY_FILTERED_RANDOM is reported when there is no
// match to the provided locality filter and the gateway is not eligible. In
// this case we pick a random available instance.
SpanPartitionReason_LOCALITY_AWARE_RANDOM SpanPartitionReason = 7
SpanPartitionReason_LOCALITY_FILTERED_RANDOM
// SpanPartitionReason_ROUND_ROBIN is reported when there is no locality info
// on any of the instances and so we default to a naive round-robin strategy.
SpanPartitionReason_ROUND_ROBIN SpanPartitionReason = 8

SpanPartitionReason_ROUND_ROBIN
// SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY is reported when the
// target node retrieved via gossip is deemed unhealthy. In this case we
// default to the gateway node.
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY SpanPartitionReason = 9
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY
// SpanPartitionReason_GOSSIP_TARGET_HEALTHY is reported when the
// target node retrieved via gossip is deemed healthy.
SpanPartitionReason_GOSSIP_TARGET_HEALTHY SpanPartitionReason = 10
SpanPartitionReason_GOSSIP_TARGET_HEALTHY
// SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED is reported
// when there is no match to the provided locality filter and the gateway is
// eligible but overloaded with other partitions. In this case we pick a
// random instance apart from the gateway.
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
)

func (r SpanPartitionReason) String() string {
Expand All @@ -1146,14 +1181,16 @@ func (r SpanPartitionReason) String() string {
return "closest-locality-match"
case SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH:
return "gateway-no-locality-match"
case SpanPartitionReason_LOCALITY_AWARE_RANDOM:
return "locality-aware-random"
case SpanPartitionReason_LOCALITY_FILTERED_RANDOM:
return "locality-filtered-random"
case SpanPartitionReason_ROUND_ROBIN:
return "round-robin"
case SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY:
return "gossip-gateway-target-unhealthy"
case SpanPartitionReason_GOSSIP_TARGET_HEALTHY:
return "gossip-target-healthy"
case SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED:
return "locality-filtered-random-gateway-overloaded"
default:
return "unknown"
}
Expand Down Expand Up @@ -1364,6 +1401,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
}

sqlInstanceID, reason := getSQLInstanceIDForKVNodeID(replDesc.NodeID)
planCtx.spanPartitionState.update(sqlInstanceID, reason)
partitionIdx, inNodeMap := nodeMap[sqlInstanceID]
if !inNodeMap {
partitionIdx = len(partitions)
Expand Down Expand Up @@ -1548,6 +1586,38 @@ var noInstancesMatchingLocalityFilterErr = errors.New(
"no healthy sql instances available matching locality requirement",
)

// shouldPickGateway determines whether the gateway node should be picked for a
// particular partition.
func (dsp *DistSQLPlanner) shouldPickGateway(
planCtx *PlanningCtx, instances []sqlinstance.InstanceInfo,
) bool {
numEligibleInstancesExcludingGateway := len(instances) - 1
if numEligibleInstancesExcludingGateway <= 0 {
return true
}

partitionsOnGateway := planCtx.spanPartitionState.partitionSpans[dsp.gatewaySQLInstanceID]
averageDistributionOnNonGatewayInstances :=
(planCtx.spanPartitionState.totalPartitionSpans - partitionsOnGateway) / numEligibleInstancesExcludingGateway

// If the gateway does not have very many partitions yet, we should use the
// gateway. This is to avoid the situation where we are partitioning spans to
// remote nodes even when the overall number of partitions is not that high.
minPartitionsOnGateway := 10
if dsp.distSQLSrv.TestingKnobs.MinimumNumberOfGatewayPartitions != 0 {
minPartitionsOnGateway = dsp.distSQLSrv.TestingKnobs.MinimumNumberOfGatewayPartitions
}
if partitionsOnGateway < minPartitionsOnGateway {
return true
}

// If the gateway has span partitions >= twice (by default) the average span
// partitions across other nodes we should distribute the partition to another
// node.
bias := int(planCtx.ExtendedEvalCtx.SessionData().DistsqlPlanGatewayBias)
return partitionsOnGateway < bias*averageDistributionOnNonGatewayInstances
}

// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID.
func (dsp *DistSQLPlanner) makeInstanceResolver(
Expand Down Expand Up @@ -1646,13 +1716,27 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH
}

// No instances had any locality tiers in common with the node locality so
// just return the gateway if it is eligible. If it isn't, just pick a
// random instance from the eligible instances.
if gatewayIsEligible {
// No instances had any locality tiers in common with the node locality.
// At this point we pick the gateway if it is eligible, otherwise we pick
// a random instance from the eligible instances.
if !gatewayIsEligible {
return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_FILTERED_RANDOM
}
if dsp.shouldPickGateway(planCtx, instances) {
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH
} else {
// If the gateway has a disproportionate number of partitions pick a
// random instance that is not the gateway.
if planCtx.spanPartitionState.testingOverrideRandomSelection != nil {
return planCtx.spanPartitionState.testingOverrideRandomSelection(),
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
// NB: This random selection may still pick the gateway but that is
// alright as we are more interested in a uniform distribution rather
// than avoiding the gateway.
id := instances[rng.Intn(len(instances))].InstanceID
return id, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
}
return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_AWARE_RANDOM
}
return resolver, nil
}
Expand Down Expand Up @@ -1757,11 +1841,12 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
if err != nil {
return 0, err
}
sqlInstanceID, _ := resolver(replDesc.NodeID)
sqlInstanceID, reason := resolver(replDesc.NodeID)
planCtx.spanPartitionState.update(sqlInstanceID, reason)
return sqlInstanceID, nil
}

func (dsp *DistSQLPlanner) useGossipPlanning(ctx context.Context, planCtx *PlanningCtx) bool {
func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool {
// TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1)
return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty()
}
Expand Down Expand Up @@ -4802,6 +4887,9 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn, oracle)
planCtx.nodeStatuses = make(map[base.SQLInstanceID]NodeStatus)
planCtx.nodeStatuses[dsp.gatewaySQLInstanceID] = NodeOK
planCtx.spanPartitionState = &spanPartitionState{
partitionSpans: make(map[base.SQLInstanceID]int),
}
return planCtx
}

Expand Down

0 comments on commit 4d7aa01

Please sign in to comment.