Skip to content

Commit

Permalink
Merge #111804
Browse files Browse the repository at this point in the history
111804: streamingccl: verify backup schedule pauses after cutover r=stevendanna a=msbutler

This patch adds an e2e c2c test to verify that replicated backup schedules
pause on the destination tenant after cutover. This functionality was added
in #111578.

Informs #108028

Release note: none

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
craig[bot] and msbutler committed Oct 5, 2023
2 parents bc191fd + 195a69e commit 43e9e39
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 19 deletions.
68 changes: 52 additions & 16 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Expand Up @@ -76,6 +76,7 @@ type TenantStreamingClustersArgs struct {
MultiTenantSingleClusterTestRegions []string

NoMetamorphicExternalConnection bool
ExternalIODir string
}

var DefaultTenantStreamingClustersArgs = TenantStreamingClustersArgs{
Expand Down Expand Up @@ -124,6 +125,7 @@ func (c *TenantStreamingClusters) setupSrcTenant() {
tenantArgs := base.TestSharedProcessTenantArgs{
TenantName: c.Args.SrcTenantName,
TenantID: c.Args.SrcTenantID,
Knobs: DefaultAppTenantTestingKnobs(),
}
srcTenantServer, srcTenantConn := serverutils.StartSharedProcessTenant(c.T, c.SrcCluster.Server(0),
tenantArgs)
Expand All @@ -137,12 +139,16 @@ func (c *TenantStreamingClusters) setupSrcTenant() {
c.SrcTenantSQL = sqlutils.MakeSQLRunner(srcTenantConn)
}

func (c *TenantStreamingClusters) init() {
func (c *TenantStreamingClusters) init(ctx context.Context) {
c.SrcSysSQL.ExecMultiple(c.T, ConfigureClusterSettings(c.Args.SrcClusterSettings)...)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.SrcTenantName)
require.NoError(c.T, c.SrcCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.SrcTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, ""))
if c.Args.SrcInitFunc != nil {
c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}
Expand All @@ -154,25 +160,40 @@ func (c *TenantStreamingClusters) init() {
c.DestSysSQL.Exec(c.T, `SET CLUSTER SETTING physical_replication.enabled = true;`)
}

// StartDestTenant starts the destination tenant and returns a cleanup
// function that shuts tenant SQL instance and closes all sessions.
// This function will fail the test if ran prior to the Replication stream
// closing as the tenant will not yet be active
func (c *TenantStreamingClusters) StartDestTenant(ctx context.Context) func() error {
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)

destTenantConn := c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb")
// StartDestTenant starts the destination tenant and returns a cleanup function
// that shuts tenant SQL instance and closes all sessions. This function will
// fail the test if ran prior to the Replication stream closing as the tenant
// will not yet be active. If the caller passes withTestingKnobs, the
// destination tenant starts up via a testServer.StartSharedProcessTenant().
func (c *TenantStreamingClusters) StartDestTenant(
ctx context.Context, withTestingKnobs *base.TestingKnobs,
) func() error {
if withTestingKnobs != nil {
var err error
_, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
TenantID: c.Args.DestTenantID,
TenantName: c.Args.DestTenantName,
Knobs: *withTestingKnobs,
UseDatabase: "defaultdb",
})
require.NoError(c.T, err)
} else {
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)
c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb")
}

c.DestTenantConn = destTenantConn
c.DestTenantSQL = sqlutils.MakeSQLRunner(destTenantConn)
c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn)
testutils.SucceedsSoon(c.T, func() error {
return c.DestTenantConn.Ping()
})
// TODO (msbutler): consider granting the new tenant some capabilities.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName)

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName)
require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, ""))
return func() error {
return destTenantConn.Close()
return c.DestTenantConn.Close()
}
}

Expand Down Expand Up @@ -277,6 +298,20 @@ func (c *TenantStreamingClusters) BuildCreateTenantQuery(externalConnection stri
return streamReplStmt
}

// DefaultAppTenantTestingKnobs returns the default testing knobs for an application tenant.
func DefaultAppTenantTestingKnobs() base.TestingKnobs {
return base.TestingKnobs{
JobsTestingKnobs: defaultJobsTestingKnobs(),
}
}

func defaultJobsTestingKnobs() *jobs.TestingKnobs {
jobTestingKnobs := jobs.NewTestingKnobsWithShortIntervals()
jobTestingKnobs.SchedulerDaemonInitialScanDelay = func() time.Duration { return time.Second }
jobTestingKnobs.SchedulerDaemonScanDelay = func() time.Duration { return time.Second }
return jobTestingKnobs
}

func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
if args.TestingKnobs != nil && args.TestingKnobs.DistSQLRetryPolicy == nil {
args.TestingKnobs.DistSQLRetryPolicy = &retry.Options{
Expand All @@ -291,7 +326,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
// to system tenants. Tracked with #76378.
DefaultTestTenant: base.TODOTestTenantDisabled,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
JobsTestingKnobs: defaultJobsTestingKnobs(),
DistSQL: &execinfra.TestingKnobs{
StreamingTestingKnobs: args.TestingKnobs,
},
Expand All @@ -303,6 +338,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
EnableTenantIDReuse: true,
},
},
ExternalIODir: args.ExternalIODir,
}
}

Expand Down Expand Up @@ -364,7 +400,7 @@ func CreateMultiTenantStreamingCluster(
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
tsc.init(ctx)
return tsc, func() {
require.NoError(t, tsc.SrcTenantConn.Close())
cleanup()
Expand Down Expand Up @@ -412,7 +448,7 @@ func CreateTenantStreamingClusters(
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
tsc.init(ctx)

return tsc, func() {
require.NoError(t, tsc.SrcTenantConn.Close())
Expand Down
Expand Up @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.StartDestTenant(ctx)
cleanupTenant := c.StartDestTenant(ctx, nil)
defer func() {
require.NoError(t, cleanupTenant())
}()
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Expand Up @@ -131,10 +131,13 @@ func TestDataDriven(t *testing.T) {
case "create-replication-clusters":
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.NoMetamorphicExternalConnection = d.HasArg("no-external-conn")
tempDir, dirCleanup := testutils.TempDir(t)
args.ExternalIODir = tempDir
var cleanup func()
ds.replicationClusters, cleanup = replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
ds.cleanupFns = append(ds.cleanupFns, func() error {
cleanup()
dirCleanup()
return nil
})

Expand All @@ -151,7 +154,8 @@ func TestDataDriven(t *testing.T) {
ds.replicationClusters.WaitUntilReplicatedTime(stringToHLC(t, replicatedTimeTarget),
jobspb.JobID(ds.ingestionJobID))
case "start-replicated-tenant":
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx)
testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs()
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs)
ds.cleanupFns = append(ds.cleanupFns, cleanupTenant)
case "let":
if len(d.CmdArgs) == 0 {
Expand Down
Expand Up @@ -537,7 +537,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx)
cleanUpTenant := c.StartDestTenant(ctx, nil)
defer func() {
require.NoError(t, cleanUpTenant())
}()
Expand Down
@@ -0,0 +1,63 @@
# This test ensures that backups schedules pause when the schedule realizes it
# is being executed on a new cluster.

create-replication-clusters
----

start-replication-stream
----

# Create test schedule that will begin a backup immediately
exec-sql as=source-tenant
CREATE SCHEDULE datatest
FOR BACKUP INTO 'nodelocal://1/example-schedule'
RECURRING '@weekly' FULL BACKUP ALWAYS
WITH SCHEDULE OPTIONS first_run = 'now';
----

let $fullID as=source-tenant
WITH SCHEDULES AS (SHOW SCHEDULES FOR BACKUP) SELECT id FROM schedules WHERE label='datatest';
----

# wait for one scheduled backup to succeed
query-sql retry as=source-tenant
SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded';
----
1

let $ts as=source-system
SELECT clock_timestamp()::timestamp::string
----

cutover ts=$ts
----

start-replicated-tenant
----

# Induce the replicated schedule to begin on the restored cluster, and
# ensure the schedule pauses, since it will realize its running on a new cluster.
exec-sql as=destination-tenant
UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID
----

# An empty next run indicates the schedule is paused.
query-sql retry as=destination-tenant
SELECT next_run FROM system.scheduled_jobs WHERE schedule_id = $fullID
----
<nil>


# Unpause the schedule and force it to run immediately. When we Resumed the
# schedule by setting next_run to now above, the schedule's clusterID was
# updated, so the schedule should not pause again.
exec-sql as=destination-tenant
UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID
----


# Wait for above backup schedule to succeed
query-sql retry as=destination-tenant
SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded'
----
2

0 comments on commit 43e9e39

Please sign in to comment.