Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: PartitionSpan should only use healthy nodes in mixed-process mode #111337

Merged
merged 1 commit into from Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Expand Up @@ -252,6 +252,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -289,6 +290,7 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Expand Up @@ -12,15 +12,18 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -29,10 +32,81 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupSharedProcessTenantNodeDown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

skip.UnderRace(t, "multi-node, multi-tenant test too slow under race")
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */
InitManualReplication, params)
defer cleanup()

hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'")

testTenantID := roachpb.MustMakeTenantID(11)
tenantApp, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: testTenantID,
TenantName: "test",
})
require.NoError(t, err)

hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES")
err = tc.Server(0).TenantController().WaitForTenantCapabilities(ctx, testTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, "")
require.NoError(t, err)

tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)")
tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)")
tenantSQL.Exec(t, "ALTER TABLE foo SCATTER")

t.Log("waiting for SQL instances")
waitStart := timeutil.Now()
for i := 1; i < multiNode; i++ {
testutils.SucceedsSoon(t, func() error {
t.Logf("waiting for server %d", i)
db, err := tc.Server(i).SystemLayer().SQLConnE("cluster:test/defaultdb")
if err != nil {
return err
}
return db.Ping()
})
}
t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart))

// Shut down a node.
t.Log("shutting down server 2 (n3)")
tc.StopServer(2)

// We use succeeds soon here since it still takes some time
// for instance-based planning to recognize the downed node.
sv := &tenantApp.ClusterSettings().SV
padding := 10 * time.Second
timeout := slinstance.DefaultTTL.Get(sv) + slinstance.DefaultHeartBeat.Get(sv) + padding
testutils.SucceedsWithin(t, func() error {
_, err := tenantDB.Exec("BACKUP INTO 'nodelocal://1/worker-failure'")
return err
}, timeout)
}

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/joberror/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql/flowinfra",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlinstance",
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/joberror/errors.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand All @@ -35,5 +36,7 @@ func IsPermanentBulkJobError(err error) bool {
!kvcoord.IsSendError(err) &&
!errors.Is(err, circuit.ErrBreakerOpen) &&
!sysutil.IsErrConnectionReset(err) &&
!sysutil.IsErrConnectionRefused(err)
!sysutil.IsErrConnectionRefused(err) &&
!errors.Is(err, sqlinstance.NonExistentInstanceError)

}
68 changes: 48 additions & 20 deletions pkg/sql/distsql_physical_planner.go
Expand Up @@ -1313,7 +1313,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1336,7 +1336,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1368,11 +1368,11 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is
// done on behalf of the system tenant. It ensures that the chosen SQL
// instance is healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1387,15 +1387,37 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for
// an instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run
// in this mixed mode or standalone, meaning if this server is in mixed mode, we
// can safely assume every other server is as well, and thus has IDs matching
// node IDs.
//
// If the given node is not healthy, the gateway node is returned.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) func(nodeID roachpb.NodeID) base.SQLInstanceID {
allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
log.Warningf(ctx, "could not get all instances: %v", err)
return dsp.alwaysUseGateway
}

healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy))
for _, n := range allHealthy {
healthyNodes[n.InstanceID] = struct{}{}
}

return func(nodeID roachpb.NodeID) base.SQLInstanceID {
sqlInstance := base.SQLInstanceID(nodeID)
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID
}
}

func (dsp *DistSQLPlanner) alwaysUseGateway(roachpb.NodeID) base.SQLInstanceID {
Expand All @@ -1409,12 +1431,18 @@ var noInstancesMatchingLocalityFilterErr = errors.New(
// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, locFilter roachpb.Locality,
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) base.SQLInstanceID, error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter

var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx)
}

if mixedProcessMode && locFilter.Empty() {
return instanceIDForKVNodeHostedInstance, nil
return mixedProcessSameNodeResolver, nil
}

// GetAllInstances only returns healthy instances.
Expand Down Expand Up @@ -1479,7 +1507,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return instanceIDForKVNodeHostedInstance(nodeID)
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
Expand Down Expand Up @@ -1598,9 +1626,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
}
Expand Down