Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
111337: sql: PartitionSpan should only use healthy nodes in mixed-process mode r=yuzefovich a=stevendanna

Previously, when running in mixed-process mode, the DistSQLPlanner's PartitionSpans method would assume that it could directly assign a given span to the SQLInstanceID that matches the NodeID of whatever replica the current replica oracle returned, without regard to whether the SQL instance was available.

This is different from the system tenant code paths which proactively check node health and the non-mixed-process MT code paths which would use an eventually consistent view of healthy nodes.

As a result, processes that use PartitionSpans such as BACKUP may fail when a node was down.

Here, we have the mixed-process case work more like the separate process case in which we only use nodes returned by the instance reader. This list should eventually exclude any down nodes.

An alternative (or perhaps an addition) would be to allow MT planning to do direct status checks more similar to how they are done for the system tenant.

When reading this code, I also noted that we don't do DistSQL version compatibility checks like we do in the SystemTenant case. I am not sure on the impact of that.

Finally, this also adds another error to our list of non-permanent errors. Namely, if we fail to find a SQL instance, we don't tread that as permanent.

Fixes #111319

Release note (bug fix): When using a private preview of physical cluster replication, in some circumstances the source cluster would be unable to take backups when a source cluster node was unavailable.

111675: backupccl: deflake TestShowBackup r=stevendanna a=msbutler

This patch simplifies how TestShowBackup parses the stringed timestamp: it
removes the manual splitting of date and time and parses the stringed timestamp
in one call.

Fixes: #111015

Release note: none

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 4, 2023
3 parents 1eadb77 + 44fac37 + 899fb87 commit 72dad91
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 29 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Expand Up @@ -252,6 +252,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -289,6 +290,7 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Expand Up @@ -12,15 +12,18 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -29,10 +32,81 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupSharedProcessTenantNodeDown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

skip.UnderRace(t, "multi-node, multi-tenant test too slow under race")
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */
InitManualReplication, params)
defer cleanup()

hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'")

testTenantID := roachpb.MustMakeTenantID(11)
tenantApp, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: testTenantID,
TenantName: "test",
})
require.NoError(t, err)

hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES")
err = tc.Server(0).TenantController().WaitForTenantCapabilities(ctx, testTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, "")
require.NoError(t, err)

tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)")
tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)")
tenantSQL.Exec(t, "ALTER TABLE foo SCATTER")

t.Log("waiting for SQL instances")
waitStart := timeutil.Now()
for i := 1; i < multiNode; i++ {
testutils.SucceedsSoon(t, func() error {
t.Logf("waiting for server %d", i)
db, err := tc.Server(i).SystemLayer().SQLConnE("cluster:test/defaultdb")
if err != nil {
return err
}
return db.Ping()
})
}
t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart))

// Shut down a node.
t.Log("shutting down server 2 (n3)")
tc.StopServer(2)

// We use succeeds soon here since it still takes some time
// for instance-based planning to recognize the downed node.
sv := &tenantApp.ClusterSettings().SV
padding := 10 * time.Second
timeout := slinstance.DefaultTTL.Get(sv) + slinstance.DefaultHeartBeat.Get(sv) + padding
testutils.SucceedsWithin(t, func() error {
_, err := tenantDB.Exec("BACKUP INTO 'nodelocal://1/worker-failure'")
return err
}, timeout)
}

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
9 changes: 1 addition & 8 deletions pkg/ccl/backupccl/show_test.go
Expand Up @@ -120,14 +120,7 @@ ORDER BY object_type, object_name`, full)
{"t2", "incremental", beforeTS, incTS, "0", "false"},
}, res)

// Different systems output different precisions (i.e. local OS X vs CI).
// Truncate decimal places so Go's very rigid parsing will work.
// TODO(bardin): Consider using a third-party library for this, or some kind
// of time-freezing on the test cluster.
truncateBackupTimeRE := regexp.MustCompile(`^(.*\.[0-9]{2})[0-9]*\+00$`)
matchResult := truncateBackupTimeRE.FindStringSubmatch(beforeTS)
require.NotNil(t, matchResult, "%s does not match %s", beforeTS, truncateBackupTimeRE)
backupTime, err := time.Parse("2006-01-02 15:04:05.00", matchResult[1])
backupTime, err := time.Parse("2006-01-02 15:04:05.999999Z07", beforeTS)
require.NoError(t, err)
backupFolder := backupTime.Format(backupbase.DateBasedIntoFolderName)
resolvedBackupFolder := full + backupFolder
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/joberror/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql/flowinfra",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlinstance",
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/joberror/errors.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand All @@ -35,5 +36,7 @@ func IsPermanentBulkJobError(err error) bool {
!kvcoord.IsSendError(err) &&
!errors.Is(err, circuit.ErrBreakerOpen) &&
!sysutil.IsErrConnectionReset(err) &&
!sysutil.IsErrConnectionRefused(err)
!sysutil.IsErrConnectionRefused(err) &&
!errors.Is(err, sqlinstance.NonExistentInstanceError)

}
68 changes: 48 additions & 20 deletions pkg/sql/distsql_physical_planner.go
Expand Up @@ -1313,7 +1313,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1336,7 +1336,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1368,11 +1368,11 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is
// done on behalf of the system tenant. It ensures that the chosen SQL
// instance is healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1387,15 +1387,37 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for
// an instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run
// in this mixed mode or standalone, meaning if this server is in mixed mode, we
// can safely assume every other server is as well, and thus has IDs matching
// node IDs.
//
// If the given node is not healthy, the gateway node is returned.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) func(nodeID roachpb.NodeID) base.SQLInstanceID {
allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
log.Warningf(ctx, "could not get all instances: %v", err)
return dsp.alwaysUseGateway
}

healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy))
for _, n := range allHealthy {
healthyNodes[n.InstanceID] = struct{}{}
}

return func(nodeID roachpb.NodeID) base.SQLInstanceID {
sqlInstance := base.SQLInstanceID(nodeID)
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID
}
}

func (dsp *DistSQLPlanner) alwaysUseGateway(roachpb.NodeID) base.SQLInstanceID {
Expand All @@ -1409,12 +1431,18 @@ var noInstancesMatchingLocalityFilterErr = errors.New(
// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, locFilter roachpb.Locality,
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) base.SQLInstanceID, error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter

var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx)
}

if mixedProcessMode && locFilter.Empty() {
return instanceIDForKVNodeHostedInstance, nil
return mixedProcessSameNodeResolver, nil
}

// GetAllInstances only returns healthy instances.
Expand Down Expand Up @@ -1479,7 +1507,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return instanceIDForKVNodeHostedInstance(nodeID)
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
Expand Down Expand Up @@ -1598,9 +1626,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 72dad91

Please sign in to comment.