Skip to content

Commit

Permalink
streamingest: support reversing replication direction
Browse files Browse the repository at this point in the history
After promoting a standby that was replicating from some primary to be
its own active cluster, turning it into the new primary, it is often
desirable to reverse the replication direction, so that changes made to
this now-primary cluster are replicated _back_ to the former primary,
now operating as a standby.

Turning a formerly active, primary cluster into a replicating standby
cluster is particularly common during "failback" flows, where the once
primary cluster is returned to primary status after the standby had
temporarily been made the active cluster.

Re-promoting the primary in such cases requires it have a virtual cluster
that is fully caught up with the promoted standby cluster that is serving
traffic, then performing cut-over from that standby back to the primary.

This _could_ be performed by creating a completely new virtual cluster
in the primary cluster from a replication stream of the temporarily active
standby; just like the creation of a normal secondary replicating cluster
this would start by backfilling all data from the source -- the promoted
standby -- and then continuously applying changes as they are streamed
to it.

However, in cases where this is being done on a cluster _that previously
was the primary cluster_, the cluster may still have a nearly up to date
copy of the virtual cluster, with only those writes that have been applied
by the promoted standby after cutover missing from it. In such cases,
backfilling a completely new virtual cluster from the promoted standby
involves copying far more data than needed; most of that data is _already
on the primary_.

Instead, the new syntax `ALTER VIRTUAL CLUSTER a START REPLICATION FROM a ON x`
can be used to indicate the virtual cluster 'a' should be rewound back to
the time at which virtual cluster 'a' on physical cluster 'x' -- the promoted
standby -- diverged from it. This will check with cluster x to confirm that
its virtual cluster a was indeed replicated from the cluster running the
command, and then communicate the time after which they diverged, once
cluster x was made active and started accepting new writes. The cluster
rewinds virtual cluster x back to that timestamp, then starts replicating
from cluster x at that timestamp.

Release note (enterprise change): A virtual cluster which was previously being
used as the source for physical cluster replication into a standby in another
cluster which has since been activated can now be reconfigured to become a
standby of that now-promoted cluster, reversing the direction of the replication
stream, and does so by reusing the existing data as much as possible.

Epic: CRDB-34233.
  • Loading branch information
dt committed Jan 11, 2024
1 parent 1c0d4de commit 92993b4
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 10 deletions.
108 changes: 107 additions & 1 deletion pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package streamingest

import (
"context"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -104,7 +106,8 @@ func alterReplicationJobTypeCheck(
if err := exprutil.TypeCheck(
ctx, alterReplicationJobOp, p.SemaCtx(),
exprutil.TenantSpec{TenantSpec: alterStmt.TenantSpec},
exprutil.Strings{alterStmt.Options.Retention},
exprutil.TenantSpec{TenantSpec: alterStmt.ReplicationSourceTenantName},
exprutil.Strings{alterStmt.Options.Retention, alterStmt.ReplicationSourceAddress},
); err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -182,6 +185,24 @@ func alterReplicationJobHook(
return nil, nil, nil, false, err
}

var srcAddr, srcTenant string
if alterTenantStmt.ReplicationSourceAddress != nil {
srcAddr, err = exprEval.String(ctx, alterTenantStmt.ReplicationSourceAddress)
if err != nil {
return nil, nil, nil, false, err
}

_, _, srcTenant, err = exprEval.TenantSpec(ctx, alterTenantStmt.ReplicationSourceTenantName)
if err != nil {
return nil, nil, nil, false, err
}
}

retentionTTLSeconds := defaultRetentionTTLSeconds
if ret, ok := options.GetRetention(); ok {
retentionTTLSeconds = ret
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings,
Expand All @@ -198,6 +219,91 @@ func alterReplicationJobHook(
if err != nil {
return err
}

// If a source address is being provided, we're enabling replication into an
// existing virtual cluster. It must be inactive, and we'll verify that it
// was the cluster from which the one it will replicate was replicated, i.e.
// that we're reversing the direction of replication. We will then revert it
// to the time they diverged and pick up from there.
if alterTenantStmt.ReplicationSourceAddress != nil {
dstTenantID, err := roachpb.MakeTenantID(tenInfo.ID)
if err != nil {
return err
}

// Here, we try to prevent the user from making a few
// mistakes. Starting a replication stream into an
// existing tenant requires both that it is offline and
// that it is consistent as of the provided timestamp.
if tenInfo.ServiceMode != mtinfopb.ServiceModeNone {
return errors.Newf("cannot start replication for tenant %q (%s) in service mode %s; service mode must be %s",
tenInfo.Name,
dstTenantID,
tenInfo.ServiceMode,
mtinfopb.ServiceModeNone,
)
}

streamAddress := streamingccl.StreamAddress(srcAddr)
streamURL, err := streamAddress.URL()
if err != nil {
return errors.Wrap(err, "url")
}
streamAddress = streamingccl.StreamAddress(streamURL.String())

client, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(srcAddr), p.ExecCfg().InternalDB)
if err != nil {
return errors.Wrap(err, "creating client")
}
srcID, resumeTS, err := client.PriorReplicationDetails(ctx, roachpb.TenantName(srcTenant))
if err != nil {
return errors.CombineErrors(errors.Wrap(err, "fetching prior replication details"), client.Close(ctx))
}
if err := client.Close(ctx); err != nil {
return err
}

if expected := fmt.Sprintf("%s:%s", p.ExtendedEvalContext().ClusterID, dstTenantID); srcID != expected {
return errors.Newf(
"tenant %q on specified cluster reports it was replicated from %q; %q cannot be rewound to start replication",
srcTenant, srcID, expected,
)
}

var revertFirst bool
if !tenInfo.LastRevertTenantTimestamp.Equal(resumeTS) {
revertFirst = true
}

jobID := p.ExecCfg().JobRegistry.MakeJobID()
// Reset the last revert timestamp.
tenInfo.LastRevertTenantTimestamp = hlc.Timestamp{}
tenInfo.PhysicalReplicationConsumerJobID = jobID
tenInfo.DataState = mtinfopb.DataStateAdd
if err := sql.UpdateTenantRecord(ctx, p.ExecCfg().Settings,
p.InternalSQLTxn(), tenInfo); err != nil {
return err
}

return errors.Wrap(createReplicationJob(
ctx,
p,
streamAddress,
srcTenant,
dstTenantID,
retentionTTLSeconds,
resumeTS,
revertFirst,
jobID,
&tree.CreateTenantFromReplication{
TenantSpec: alterTenantStmt.TenantSpec,
ReplicationSourceTenantName: alterTenantStmt.ReplicationSourceTenantName,
ReplicationSourceAddress: alterTenantStmt.ReplicationSourceAddress,
Options: alterTenantStmt.Options,
},
), "creating replication job")
}

if tenInfo.PhysicalReplicationConsumerJobID == 0 {
return errors.Newf("tenant %q (%d) does not have an active replication job",
tenInfo.Name, tenInfo.ID)
Expand Down
31 changes: 29 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/revertccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -67,9 +68,35 @@ func startDistIngestion(
} else {
heartbeatTimestamp = initialScanTimestamp
}

msg := redact.Sprintf("resuming stream (producer job %d) from %s", streamID, heartbeatTimestamp)
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)

if streamProgress.InitialRevertRequired {
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, "reverting existing data to prepare for replication")

log.Infof(ctx, "reverting tenant %s to time %s before starting replication", details.DestinationTenantID, replicatedTime)

spanToRevert := keys.MakeTenantSpan(details.DestinationTenantID)
if err := revertccl.RevertSpansFanout(ctx, execCtx.ExecCfg().DB, execCtx,
[]roachpb.Span{spanToRevert},
replicatedTime,
false, /* ignoreGCThreshold */
revertccl.RevertDefaultBatchSize,
nil, /* onCompletedCallback */
); err != nil {
return err
}

if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().InitialRevertRequired = false
ju.UpdateProgress(md.Progress)
updateRunningStatusInternal(md, ju, jobspb.InitializingReplication, string(msg))
return nil
}); err != nil {
return errors.Wrap(err, "failed to update job progress")
}
} else {
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)
}

client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB,
streamclient.WithStreamID(streamID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE")
waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f")
t.Logf("starting replication g->f")
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f RESET DATA TO SYSTEM TIME $1::decimal", ts1)
sqlA.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER f FROM REPLICATION OF g ON $1 WITH RESUME TIMESTAMP = '%s'", ts1), serverBURL.String())
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1", serverBURL.String())
_, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f"))
t.Logf("waiting for f@%s", ts2)
replicationtestutils.WaitUntilReplicatedTime(t,
Expand Down Expand Up @@ -281,8 +280,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g STOP SERVICE")
waitUntilTenantServerStopped(t, serverB.SystemLayer(), "g")
t.Logf("starting replication f->g")
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g RESET DATA TO SYSTEM TIME $1::decimal", ts3)
sqlB.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH RESUME TIMESTAMP = '%s'", ts3), serverAURL.String())
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START REPLICATION OF f ON $1", serverAURL.String())
_, consumerGJobID = replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g"))
t.Logf("waiting for g@%s", ts3)
replicationtestutils.WaitUntilReplicatedTime(t,
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func ingestionPlanHook(
sourceTenant, dstTenantName, dstTenantID)
}

var revertFirst bool
// If we don't have a resume timestamp, make a new tenant
jobID := p.ExecCfg().JobRegistry.MakeJobID()
var destinationTenantID roachpb.TenantID
Expand Down Expand Up @@ -214,6 +215,8 @@ func ingestionPlanHook(
// clause and the tenant already existed. Nothing else to do.
return nil
}
// No revert required since this is a new tenant.
revertFirst = false
} else {
tenantRecord, err := sql.GetTenantRecordByName(
ctx, p.ExecCfg().Settings,
Expand Down Expand Up @@ -250,6 +253,8 @@ func ingestionPlanHook(
tenantRecord.LastRevertTenantTimestamp,
)
}
// No revert required, since we verified that we were already reverted.
revertFirst = false

// Reset the last revert timestamp.
tenantRecord.LastRevertTenantTimestamp = hlc.Timestamp{}
Expand All @@ -273,6 +278,7 @@ func ingestionPlanHook(
destinationTenantID,
retentionTTLSeconds,
options.resumeTimestamp,
revertFirst,
jobID,
ingestionStmt,
)
Expand All @@ -289,6 +295,7 @@ func createReplicationJob(
destinationTenantID roachpb.TenantID,
retentionTTLSeconds int32,
resumeTimestamp hlc.Timestamp,
revertFirst bool,
jobID jobspb.JobID,
stmt *tree.CreateTenantFromReplication,
) error {
Expand Down Expand Up @@ -345,7 +352,8 @@ func createReplicationJob(
Description: jobDescription,
Username: p.User(),
Progress: jobspb.StreamIngestionProgress{
ReplicatedTime: resumeTimestamp,
ReplicatedTime: resumeTimestamp,
InitialRevertRequired: revertFirst,
},
Details: streamIngestionDetails,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ message StreamIngestionProgress {
// the source tenant.
bool initial_split_complete = 9;

// InitialRevertRequiredd is true if the stream requires an initial revert to
// the start time before it can continue (e.g. when reusing a tenant's data).
bool initial_revert_required = 10;

// Next Id: 10
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exprutil/type_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (b Bools) typeCheck(ctx context.Context, op string, semaCtx *tree.SemaConte
}

func (ts TenantSpec) typeCheck(ctx context.Context, op string, semaCtx *tree.SemaContext) error {
if ts.All {
if ts.TenantSpec == nil || ts.All {
return nil
}
if ts.IsName {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/sem/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,7 +2250,9 @@ func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) {
// NB: we do not anonymize the tenant name because we assume that tenant names
// do not contain sensitive information.
ctx.FormatNode(node.TenantSpec)
ctx.FormatNode(node.Like)
if node.Like != nil {
ctx.FormatNode(node.Like)
}

if node.ReplicationSourceAddress != nil {
ctx.WriteString(" FROM REPLICATION OF ")
Expand Down

0 comments on commit 92993b4

Please sign in to comment.