diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 9731b17c64d9..b462db4af73e 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -76,6 +76,7 @@ type TenantStreamingClustersArgs struct { MultiTenantSingleClusterTestRegions []string NoMetamorphicExternalConnection bool + ExternalIODir string } var DefaultTenantStreamingClustersArgs = TenantStreamingClustersArgs{ @@ -124,6 +125,7 @@ func (c *TenantStreamingClusters) setupSrcTenant() { tenantArgs := base.TestSharedProcessTenantArgs{ TenantName: c.Args.SrcTenantName, TenantID: c.Args.SrcTenantID, + Knobs: DefaultAppTenantTestingKnobs(), } srcTenantServer, srcTenantConn := serverutils.StartSharedProcessTenant(c.T, c.SrcCluster.Server(0), tenantArgs) @@ -137,12 +139,16 @@ func (c *TenantStreamingClusters) setupSrcTenant() { c.SrcTenantSQL = sqlutils.MakeSQLRunner(srcTenantConn) } -func (c *TenantStreamingClusters) init() { +func (c *TenantStreamingClusters) init(ctx context.Context) { c.SrcSysSQL.ExecMultiple(c.T, ConfigureClusterSettings(c.Args.SrcClusterSettings)...) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, c.Args.SrcTenantName) + c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.SrcTenantName) + require.NoError(c.T, c.SrcCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.SrcTenantID, map[tenantcapabilities.ID]string{ + tenantcapabilities.CanUseNodelocalStorage: "true", + }, "")) if c.Args.SrcInitFunc != nil { c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL) } @@ -154,25 +160,40 @@ func (c *TenantStreamingClusters) init() { c.DestSysSQL.Exec(c.T, `SET CLUSTER SETTING physical_replication.enabled = true;`) } -// StartDestTenant starts the destination tenant and returns a cleanup -// function that shuts tenant SQL instance and closes all sessions. -// This function will fail the test if ran prior to the Replication stream -// closing as the tenant will not yet be active -func (c *TenantStreamingClusters) StartDestTenant(ctx context.Context) func() error { - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName) - - destTenantConn := c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb") +// StartDestTenant starts the destination tenant and returns a cleanup function +// that shuts tenant SQL instance and closes all sessions. This function will +// fail the test if ran prior to the Replication stream closing as the tenant +// will not yet be active. If the caller passes withTestingKnobs, the +// destination tenant starts up via a testServer.StartSharedProcessTenant(). +func (c *TenantStreamingClusters) StartDestTenant( + ctx context.Context, withTestingKnobs *base.TestingKnobs, +) func() error { + if withTestingKnobs != nil { + var err error + _, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ + TenantID: c.Args.DestTenantID, + TenantName: c.Args.DestTenantName, + Knobs: *withTestingKnobs, + UseDatabase: "defaultdb", + }) + require.NoError(c.T, err) + } else { + c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName) + c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb") + } - c.DestTenantConn = destTenantConn - c.DestTenantSQL = sqlutils.MakeSQLRunner(destTenantConn) + c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn) testutils.SucceedsSoon(c.T, func() error { return c.DestTenantConn.Ping() }) // TODO (msbutler): consider granting the new tenant some capabilities. c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName) - + c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName) + require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{ + tenantcapabilities.CanUseNodelocalStorage: "true", + }, "")) return func() error { - return destTenantConn.Close() + return c.DestTenantConn.Close() } } @@ -277,6 +298,20 @@ func (c *TenantStreamingClusters) BuildCreateTenantQuery(externalConnection stri return streamReplStmt } +// DefaultAppTenantTestingKnobs returns the default testing knobs for an application tenant. +func DefaultAppTenantTestingKnobs() base.TestingKnobs { + return base.TestingKnobs{ + JobsTestingKnobs: defaultJobsTestingKnobs(), + } +} + +func defaultJobsTestingKnobs() *jobs.TestingKnobs { + jobTestingKnobs := jobs.NewTestingKnobsWithShortIntervals() + jobTestingKnobs.SchedulerDaemonInitialScanDelay = func() time.Duration { return time.Second } + jobTestingKnobs.SchedulerDaemonScanDelay = func() time.Duration { return time.Second } + return jobTestingKnobs +} + func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { if args.TestingKnobs != nil && args.TestingKnobs.DistSQLRetryPolicy == nil { args.TestingKnobs.DistSQLRetryPolicy = &retry.Options{ @@ -291,7 +326,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { // to system tenants. Tracked with #76378. DefaultTestTenant: base.TODOTestTenantDisabled, Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + JobsTestingKnobs: defaultJobsTestingKnobs(), DistSQL: &execinfra.TestingKnobs{ StreamingTestingKnobs: args.TestingKnobs, }, @@ -303,6 +338,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { EnableTenantIDReuse: true, }, }, + ExternalIODir: args.ExternalIODir, } } @@ -364,7 +400,7 @@ func CreateMultiTenantStreamingCluster( Rng: rng, } tsc.setupSrcTenant() - tsc.init() + tsc.init(ctx) return tsc, func() { require.NoError(t, tsc.SrcTenantConn.Close()) cleanup() @@ -412,7 +448,7 @@ func CreateTenantStreamingClusters( Rng: rng, } tsc.setupSrcTenant() - tsc.init() + tsc.init(ctx) return tsc, func() { require.NoError(t, tsc.SrcTenantConn.Close()) diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 2507c33ef690..c55a7c41e5ff 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) { cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) require.Equal(t, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanupTenant := c.StartDestTenant(ctx) + cleanupTenant := c.StartDestTenant(ctx, nil) defer func() { require.NoError(t, cleanupTenant()) }() diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 2e7a841ecaa3..5290961a5ed6 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -131,10 +131,13 @@ func TestDataDriven(t *testing.T) { case "create-replication-clusters": args := replicationtestutils.DefaultTenantStreamingClustersArgs args.NoMetamorphicExternalConnection = d.HasArg("no-external-conn") + tempDir, dirCleanup := testutils.TempDir(t) + args.ExternalIODir = tempDir var cleanup func() ds.replicationClusters, cleanup = replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) ds.cleanupFns = append(ds.cleanupFns, func() error { cleanup() + dirCleanup() return nil }) @@ -151,7 +154,8 @@ func TestDataDriven(t *testing.T) { ds.replicationClusters.WaitUntilReplicatedTime(stringToHLC(t, replicatedTimeTarget), jobspb.JobID(ds.ingestionJobID)) case "start-replicated-tenant": - cleanupTenant := ds.replicationClusters.StartDestTenant(ctx) + testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs() + cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs) ds.cleanupFns = append(ds.cleanupFns, cleanupTenant) case "let": if len(d.CmdArgs) == 0 { diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 3afefdc43c73..3bfa20036c3d 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -537,7 +537,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanUpTenant := c.StartDestTenant(ctx) + cleanUpTenant := c.StartDestTenant(ctx, nil) defer func() { require.NoError(t, cleanUpTenant()) }() diff --git a/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover b/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover new file mode 100644 index 000000000000..5f81c4f6354c --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover @@ -0,0 +1,63 @@ +# This test ensures that backups schedules pause when the schedule realizes it +# is being executed on a new cluster. + +create-replication-clusters +---- + +start-replication-stream +---- + +# Create test schedule that will begin a backup immediately +exec-sql as=source-tenant +CREATE SCHEDULE datatest +FOR BACKUP INTO 'nodelocal://1/example-schedule' +RECURRING '@weekly' FULL BACKUP ALWAYS +WITH SCHEDULE OPTIONS first_run = 'now'; +---- + +let $fullID as=source-tenant +WITH SCHEDULES AS (SHOW SCHEDULES FOR BACKUP) SELECT id FROM schedules WHERE label='datatest'; +---- + +# wait for one scheduled backup to succeed +query-sql retry as=source-tenant +SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded'; +---- +1 + +let $ts as=source-system +SELECT clock_timestamp()::timestamp::string +---- + +cutover ts=$ts +---- + +start-replicated-tenant +---- + +# Induce the replicated schedule to begin on the restored cluster, and +# ensure the schedule pauses, since it will realize its running on a new cluster. +exec-sql as=destination-tenant +UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID +---- + +# An empty next run indicates the schedule is paused. +query-sql retry as=destination-tenant +SELECT next_run FROM system.scheduled_jobs WHERE schedule_id = $fullID +---- + + + +# Unpause the schedule and force it to run immediately. When we Resumed the +# schedule by setting next_run to now above, the schedule's clusterID was +# updated, so the schedule should not pause again. +exec-sql as=destination-tenant +UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID +---- + + +# Wait for above backup schedule to succeed +query-sql retry as=destination-tenant +SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded' +---- +2