Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ var retiredSettings = map[InternalKey]struct{}{

// removed as of 25.4
"storage.columnar_blocks.enabled": {},

// removed as of 26.1
"sql.distsql_planning.use_gossip.enabled": {},
}

// grandfatheredDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down
76 changes: 0 additions & 76 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,13 +1314,6 @@ const (
// SpanPartitionReason_ROUND_ROBIN is reported when there is no locality info
// on any of the instances and so we default to a naive round-robin strategy.
SpanPartitionReason_ROUND_ROBIN
// SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY is reported when the
// target node retrieved via gossip is deemed unhealthy. In this case we
// default to the gateway node.
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY
// SpanPartitionReason_GOSSIP_TARGET_HEALTHY is reported when the
// target node retrieved via gossip is deemed healthy.
SpanPartitionReason_GOSSIP_TARGET_HEALTHY
// SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED is reported
// when there is no match to the provided locality filter and the gateway is
// eligible but overloaded with other partitions. In this case we pick a
Expand Down Expand Up @@ -1350,10 +1343,6 @@ func (r SpanPartitionReason) String() string {
return "locality-filtered-random"
case SpanPartitionReason_ROUND_ROBIN:
return "round-robin"
case SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY:
return "gossip-gateway-target-unhealthy"
case SpanPartitionReason_GOSSIP_TARGET_HEALTHY:
return "gossip-target-healthy"
case SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED:
return "locality-filtered-random-gateway-overloaded"
default:
Expand Down Expand Up @@ -1527,9 +1516,6 @@ func (dsp *DistSQLPlanner) partitionSpansEx(
return []SpanPartition{{SQLInstanceID: dsp.gatewaySQLInstanceID, Spans: spans}},
true /* ignoreMisplannedRanges */, nil
}
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans)
}
return dsp.partitionSpans(ctx, planCtx, spans, bound)
}

Expand Down Expand Up @@ -1651,27 +1637,6 @@ func (dsp *DistSQLPlanner) partitionSpan(
return partitions, lastPartitionIdx, nil
}

// deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans
// for a system tenant.
func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
return dsp.deprecatedHealthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
partitions, _, err = dsp.partitionSpan(
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges,
)
if err != nil {
return nil, false, err
}
}
return partitions, ignoreMisplannedRanges, nil
}

// partitionSpans assigns SQL instances to spans. In mixed sql and KV mode it
// generally assigns each span to the instance hosted on the KV node chosen by
// the configured replica oracle, while in clusters operating with standalone
Expand Down Expand Up @@ -1730,26 +1695,6 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedHealthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is done on
// behalf of the system tenant. It ensures that the chosen SQL instance is
// healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) (base.SQLInstanceID, SpanPartitionReason) {
sqlInstanceID := base.SQLInstanceID(nodeID)
status := dsp.checkInstanceHealthAndVersionSystem(ctx, planCtx, sqlInstanceID)
// If the node is unhealthy, use the gateway to process this span instead of
// the unhealthy host. An empty address indicates an unhealthy host.
reason := SpanPartitionReason_GOSSIP_TARGET_HEALTHY
if status != NodeOK {
log.VEventf(ctx, 2, "not planning on node %d: %s", sqlInstanceID, status)
sqlInstanceID = dsp.gatewaySQLInstanceID
reason = SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY
}
return sqlInstanceID, reason
}

// checkInstanceHealth returns the instance health status by dialing the node.
// It also caches the result to avoid redialing for a query.
func (dsp *DistSQLPlanner) checkInstanceHealth(
Expand Down Expand Up @@ -2129,10 +2074,6 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
return 0, err
}

if dsp.useGossipPlanning(ctx, planCtx) {
sqlInstanceID, _ := dsp.deprecatedHealthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID)
return sqlInstanceID, nil
}
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
Expand All @@ -2142,23 +2083,6 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
return sqlInstanceID, nil
}

// TODO(yuzefovich): retire this setting altogether in 25.3 release.
var useGossipPlanning = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.distsql_planning.use_gossip.enabled",
"if enabled, the DistSQL physical planner falls back to gossip-based planning",
false,
)

func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool {
var gossipPlanningEnabled bool
// Some of the planCtx fields can be left unset in tests.
if planCtx.ExtendedEvalCtx != nil && planCtx.ExtendedEvalCtx.Settings != nil {
gossipPlanningEnabled = useGossipPlanning.Get(&planCtx.ExtendedEvalCtx.Settings.SV)
}
return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() && gossipPlanningEnabled
}

// convertOrdering maps the columns in props.ordering to the output columns of a
// processor.
func (dsp *DistSQLPlanner) convertOrdering(
Expand Down
107 changes: 0 additions & 107 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
Expand Down Expand Up @@ -1751,112 +1750,6 @@ func TestShouldPickGatewayNode(t *testing.T) {
}
}

// Test that a node whose descriptor info is not accessible through gossip is
// not used. This is to simulate nodes that have been decomisioned and also
// nodes that have been "replaced" by another node at the same address (which, I
// guess, is also a type of decomissioning).
func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The spans that we're going to plan for.
span := roachpb.Span{Key: roachpb.Key("A"), EndKey: roachpb.Key("Z")}
gatewayNode := roachpb.NodeID(2)
ranges := []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}}

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
codec := keys.SystemSQLCodec

mockGossip := gossip.NewTest(roachpb.NodeID(1), stopper, metric.NewRegistry())
var nodeDescs []*roachpb.NodeDescriptor
for i := 1; i <= 2; i++ {
sqlInstanceID := base.SQLInstanceID(i)
desc := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(sqlInstanceID),
Address: util.UnresolvedAddr{AddressField: fmt.Sprintf("addr%d", i)},
}
if i == 2 {
if err := mockGossip.SetNodeDescriptor(desc); err != nil {
t.Fatal(err)
}
}
// All the nodes advertise that they are not draining. This is to
// simulate the "node overridden by another node at the same address"
// case mentioned in the test comment - for such a node, the descriptor
// would be taken out of the gossip data, but other datums it advertised
// are left in place.
if err := mockGossip.AddInfoProto(
gossip.MakeDistSQLDrainingKey(sqlInstanceID),
&execinfrapb.DistSQLDrainingInfo{
Draining: false,
},
0, // ttl - no expiration
); err != nil {
t.Fatal(err)
}

nodeDescs = append(nodeDescs, desc)
}
tsp := &testSpanResolver{
nodes: nodeDescs,
ranges: ranges,
}

st := cluster.MakeTestingClusterSettings()
gw := gossip.MakeOptionalGossip(mockGossip)
dsp := DistSQLPlanner{
st: st,
gatewaySQLInstanceID: base.SQLInstanceID(tsp.nodes[gatewayNode-1].NodeID),
stopper: stopper,
spanResolver: tsp,
gossip: gw,
nodeHealth: distSQLNodeHealth{
gossip: gw,
connHealthSystem: func(node roachpb.NodeID, _ rpcbase.ConnectionClass) error {
_, _, err := mockGossip.GetNodeIDAddress(node)
return err
},
isAvailable: func(base.SQLInstanceID) bool {
return true
},
},
codec: codec,
}

ctx := context.Background()
// This test is specific to gossip-based planning.
useGossipPlanning.Override(ctx, &st.SV, true)
planCtx := dsp.NewPlanningCtx(
ctx, &extendedEvalContext{Context: eval.Context{Codec: codec, Settings: st}},
nil /* planner */, nil /* txn */, FullDistribution,
)
partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}, PartitionSpansBoundDefault)
if err != nil {
t.Fatal(err)
}

resMap := make(map[base.SQLInstanceID][][2]string)
for _, p := range partitions {
if _, ok := resMap[p.SQLInstanceID]; ok {
t.Fatalf("node %d shows up in multiple partitions", p.SQLInstanceID)
}
var spans [][2]string
for _, s := range p.Spans {
spans = append(spans, [2]string{string(s.Key), string(s.EndKey)})
}
resMap[p.SQLInstanceID] = spans
}

expectedPartitions :=
map[base.SQLInstanceID][][2]string{
2: {{"A", "Z"}},
}
if !reflect.DeepEqual(resMap, expectedPartitions) {
t.Errorf("expected partitions:\n %v\ngot:\n %v", expectedPartitions, resMap)
}
}

func TestCheckNodeHealth(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql_plan_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (dsp *DistSQLPlanner) SetupAllNodesPlanningWithOracle(
localityFilter roachpb.Locality,
) (*PlanningCtx, []base.SQLInstanceID, error) {
if dsp.codec.ForSystemTenant() {
// TODO(yuzefovich): evaluate whether we can remove system tenant
// specific code.
return dsp.setupAllNodesPlanningSystem(ctx, evalCtx, execCfg, oracle, localityFilter)
}
return dsp.setupAllNodesPlanningTenant(ctx, evalCtx, execCfg, oracle, localityFilter)
Expand Down