Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120462: roachtest: limit payload sizes in backup-restore/mixed-version r=dt a=dt

Previously it could randomly choose to revise only 100 rows with 16kb payloads in every revision. While this usually works when run on 512mb ranges, it can flake when run on 64mb ranges as that many large revisions exceeds the range's capacity.

Release note: none.
Epic: none.

120687: restore: re-enable online restore of tenants r=dt a=dt

Release note: none.
Epic: none.

120782: streamingccl: add replication lag to `SHOW VIRTUAL CLUSTER` results r=dt a=msbutler

With this patch, SHOW VIRTUAL CLUSTER ... WITH REPLICATION STATUS will display the replication lag.

Fixes #120647

Release note (sql change): SHOW VIRTUAL CLUSTER ... WITH REPLICATION STATUS now displays the PCR replication lag.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
3 people committed Mar 25, 2024
4 parents c5d1b23 + ff1459e + e1826be + 1fcff46 commit 4cc2bbd
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 56 deletions.
27 changes: 24 additions & 3 deletions pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error {
}
defer iter.Close()

var valueBuf []byte

for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
if valid, err := iter.Valid(); !valid || err != nil {
if err != nil {
Expand All @@ -316,16 +318,35 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error {
}
k.Key = suffix

v, err := iter.UnsafeValue()
raw, err := iter.UnsafeValue()
if err != nil {
return err
}

valueBuf = append(valueBuf[:0], raw...)
v, err := storage.DecodeValueFromMVCCValue(valueBuf)
if err != nil {
return errors.Wrapf(err, "decoding mvcc value %s", k)
}

// Checksums include the key, but *exported* keys no longer live at that key
// once they are exported, and could be restored as some other key, so zero
// out the checksum.
v.ClearChecksum()

// NB: DecodeValueFromMVCCValue does not decode the MVCCValueHeader, which
// we need to back up. In other words, if we passed v.RawBytes to the put
// call below, we would lose data. By putting valueBuf, we pass the value
// header and the cleared checksum.
//
// TODO(msbutler): create a ClearChecksum() method that can act on raw value
// bytes, and remove this hacky code.
if k.Timestamp.IsEmpty() {
if err := s.sst.PutUnversioned(k.Key, v); err != nil {
if err := s.sst.PutUnversioned(k.Key, valueBuf); err != nil {
return err
}
} else {
if err := s.sst.PutRawMVCC(k, v); err != nil {
if err := s.sst.PutRawMVCC(k, valueBuf); err != nil {
return err
}
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/backupccl/file_sst_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) {
for _, input := range tt.inputs {
for i := range input.input {
input.input[i].key = string(s2k(input.input[i].key))
v := roachpb.Value{}
v.SetBytes(input.input[i].value)

input.input[i].value = v.RawBytes
}
expected = append(expected, input.input...)

Expand Down Expand Up @@ -883,10 +887,13 @@ func (b *exportedSpanBuilder) build() exportedSpan {
buf := &bytes.Buffer{}
sst := storage.MakeBackupSSTWriter(ctx, settings, buf)
for _, d := range b.keyValues {
v := roachpb.Value{}
v.SetBytes(d.value)
v.InitChecksum(nil)
err := sst.Put(storage.MVCCKey{
Key: s2k(d.key),
Timestamp: hlc.Timestamp{WallTime: d.timestamp},
}, d.value)
}, v.RawBytes)
if err != nil {
panic(err)
}
Expand Down
32 changes: 31 additions & 1 deletion pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -174,11 +175,40 @@ func sendAddRemoteSSTWorker(
}

for entry := range restoreSpanEntriesCh {
// If we're restoring a tenant, we need to move the end key of any spans
// that ended at the exclusive end of the tenant span, i.e. the start key
// of the next tenant ID, into the prefix of this tenant span instead so
// that that end key can be rewritten to the restoring tenant ID's prefix.
// We do this be replacing any key equal to TenantSpan.EndKey (tID+1) with
// tenantSpan.Key followed by keyMax, since keyMax sorts able all table
// keys, including within the subspace of a tenant span.
if entry.ElidedPrefix == execinfrapb.ElidePrefix_Tenant {
_, id, err := keys.DecodeTenantPrefix(entry.Span.Key)
if err != nil {
return err
}
if tSpan := keys.MakeTenantSpan(id); entry.Span.EndKey.Equal(tSpan.EndKey) {
entry.Span.EndKey = append(tSpan.Key, keys.MaxKey...)
}
}
firstSplitDone := false
if err := assertCommonPrefix(entry.Span, entry.ElidedPrefix); err != nil {
return err
}
for _, file := range entry.Files {
if entry.ElidedPrefix == execinfrapb.ElidePrefix_Tenant {
_, id, err := keys.DecodeTenantPrefix(file.BackupFileEntrySpan.Key)
if err != nil {
return err
}
if tSpan := keys.MakeTenantSpan(id); file.BackupFileEntrySpan.EndKey.Equal(tSpan.EndKey) {
file.BackupFileEntrySpan.EndKey = append(tSpan.Key, keys.MaxKey...)
}
}
if err := assertCommonPrefix(file.BackupFileEntrySpan, entry.ElidedPrefix); err != nil {
return err
}

restoringSubspan := file.BackupFileEntrySpan.Intersect(entry.Span)
if !restoringSubspan.Valid() {
return errors.AssertionFailedf("file %s with span %s has no overlap with restore span %s",
Expand Down Expand Up @@ -365,7 +395,7 @@ func checkBackupElidedPrefixForOnlineCompat(
case execinfrapb.ElidePrefix_TenantAndTable:
return nil
case execinfrapb.ElidePrefix_Tenant:
return errors.AssertionFailedf("online restore disallowed for restores of tenants. previous check failed.")
return nil
case execinfrapb.ElidePrefix_None:
for oldID, rw := range rewrites {
if rw.ID != oldID {
Expand Down
42 changes: 40 additions & 2 deletions pkg/ccl/backupccl/restore_online_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -150,13 +151,50 @@ func TestOnlineRestoreTenant(t *testing.T) {

systemDB.Exec(t, fmt.Sprintf(`BACKUP TENANT 10 INTO '%s'`, externalStorage))

_, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, params)
restoreTC, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, params)
defer cleanupFnRestored()

var preRestoreTs float64
tenant10.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&preRestoreTs)

rSQLDB.ExpectErr(t, "cannot run Online Restore on a tenant", fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY", externalStorage))
// Restore the tenant twice: once below and once above the old ID, to show
// that we can rewrite it in either direction.
rSQLDB.Exec(t, fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY, TENANT_NAME = 'below', TENANT = '2'", externalStorage))
rSQLDB.Exec(t, fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY, TENANT_NAME = 'above', TENANT = '20'", externalStorage))
rSQLDB.Exec(t, "ALTER TENANT below STOP SERVICE")
rSQLDB.Exec(t, "ALTER TENANT above STOP SERVICE")
rSQLDB.CheckQueryResults(t, "SELECT fingerprint FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TENANT below]",
rSQLDB.QueryStr(t, `SELECT fingerprint FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TENANT above]`))

secondaryStopper := stop.NewStopper()
_, cBelow := serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{
TenantName: "below",
TenantID: roachpb.MustMakeTenantID(2),
Stopper: secondaryStopper,
})
_, cAbove := serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{
TenantName: "above",
TenantID: roachpb.MustMakeTenantID(20),
Stopper: secondaryStopper,
})

defer func() {
cBelow.Close()
cAbove.Close()
secondaryStopper.Stop(context.Background())
}()
dbBelow, dbAbove := sqlutils.MakeSQLRunner(cBelow), sqlutils.MakeSQLRunner(cAbove)
dbBelow.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))
dbAbove.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))

// Ensure the restore of a tenant was not mvcc
var maxRestoreMVCCTimestamp float64
dbBelow.QueryRow(t, "SELECT max(crdb_internal_mvcc_timestamp) FROM foo.bar").Scan(&maxRestoreMVCCTimestamp)
require.Greater(t, preRestoreTs, maxRestoreMVCCTimestamp)
dbAbove.QueryRow(t, "SELECT max(crdb_internal_mvcc_timestamp) FROM foo.bar").Scan(&maxRestoreMVCCTimestamp)
require.Greater(t, preRestoreTs, maxRestoreMVCCTimestamp)
}

func TestOnlineRestoreErrors(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,10 +1346,6 @@ func restorePlanHook(
}
}

if restoreStmt.Options.ExperimentalOnline && restoreStmt.Targets.TenantID.IsSet() {
return nil, nil, nil, false, errors.New("cannot run Online Restore on a tenant")
}

var newTenantID *roachpb.TenantID
var newTenantName *roachpb.TenantName
if restoreStmt.Options.AsTenant != nil || restoreStmt.Options.ForceTenantID != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlliveness",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
Expand All @@ -169,6 +170,7 @@ go_test(
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/leaktest",
Expand Down
30 changes: 16 additions & 14 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1073,33 +1075,33 @@ func TestTenantStreamingShowTenant(t *testing.T) {
replicationDetails := details.Details().(jobspb.StreamIngestionDetails)

var (
id int
dest string
status string
serviceMode string
source string
sourceUri string
jobId int
maxReplTime time.Time
protectedTime time.Time
cutoverTime []byte // should be nil
id int
dest string
status string
source string
sourceUri string
replicationLag string
maxReplTime time.Time
protectedTime time.Time
cutoverTime []byte // should be nil
)
row := c.DestSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.DestTenantName))
row.Scan(&id, &dest, &status, &serviceMode, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime, &cutoverTime)
row.Scan(&id, &dest, &source, &sourceUri, &protectedTime, &maxReplTime, &replicationLag, &cutoverTime, &status)
require.Equal(t, 2, id)
require.Equal(t, "destination", dest)
require.Equal(t, "replicating", status)
require.Equal(t, "none", serviceMode)
require.Equal(t, "source", source)
expectedURI, err := streamclient.RedactSourceURI(c.SrcURL.String())
require.NoError(t, err)
require.Equal(t, expectedURI, sourceUri)
require.Equal(t, ingestionJobID, jobId)
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
require.GreaterOrEqual(t, maxReplTime, targetReplicatedTime.GoTime())
require.GreaterOrEqual(t, protectedTime, replicationDetails.ReplicationStartTime.GoTime())
require.Nil(t, cutoverTime)
repLagDuration, err := duration.ParseInterval(duration.IntervalStyle_POSTGRES, replicationLag, types.DefaultIntervalTypeMetadata)
require.NoError(t, err)
require.Greater(t, repLagDuration.Nanos(), int64(0))
require.Less(t, repLagDuration.Nanos(), time.Second*30)

// Verify the SHOW command prints the right cutover timestamp. Adding some
// logical component to make sure we handle it correctly.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/testdata/alter_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ALTER TENANT "destination" SET REPLICATION RETENTION = '42s'
query-sql as=destination-system
SELECT crdb_internal.pb_to_json('payload', payload)->'streamIngestion'->'replicationTtlSeconds' as retention_ttl_seconds
FROM crdb_internal.system_jobs
WHERE id = (SELECT replication_job_id FROM [SHOW TENANT "destination" WITH REPLICATION STATUS])
WHERE id = (SELECT job_id FROM [SHOW JOBS] WHERE job_type='REPLICATION STREAM INGESTION')
----
42

Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/streamingccl/streamingest/testdata/simple
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ repstream job id=$_producerJobID
query-sql as=source-system
SHOW TENANT source WITH REPLICATION STATUS
----
10 source ready shared <nil> <nil> <nil> <nil> <nil> <nil>
10 source <nil> <nil> <nil> <nil> <nil> <nil> ready

exec-sql as=source-tenant
CREATE TABLE d.x (id INT PRIMARY KEY, n INT);
Expand All @@ -67,10 +67,10 @@ SHOW TENANTS
2 destination replicating none

query-sql as=destination-system
SELECT id, name, data_state, service_mode, source_tenant_name, cutover_time FROM [SHOW TENANTS WITH REPLICATION STATUS]
SELECT id, name, source_tenant_name, cutover_time, status FROM [SHOW TENANTS WITH REPLICATION STATUS]
----
1 system ready shared <nil> <nil>
2 destination replicating none source <nil>
1 system <nil> <nil> ready
2 destination source <nil> replicating

let $ts as=source-system
SELECT clock_timestamp()::timestamptz::string
Expand Down
8 changes: 7 additions & 1 deletion pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2563,8 +2563,14 @@ func tpccWorkloadCmd(
func bankWorkloadCmd(
l *logger.Logger, testRNG *rand.Rand, roachNodes option.NodeListOption, mock bool,
) (init *roachtestutil.Command, run *roachtestutil.Command) {
bankPayload := bankPossiblePayloadBytes[testRNG.Intn(len(bankPossiblePayloadBytes))]
bankRows := bankPossibleRows[testRNG.Intn(len(bankPossibleRows))]
possiblePayloads := bankPossiblePayloadBytes
// force smaller row counts to use smaller payloads too to avoid making lots
// of large revisions of a handful of keys.
if bankRows < 1000 {
possiblePayloads = []int{16, 64}
}
bankPayload := possiblePayloads[testRNG.Intn(len(possiblePayloads))]

if mock {
bankPayload = 9
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,33 +295,41 @@ func init() {
var TenantColumns = ResultColumns{
{Name: "id", Typ: types.Int},
{Name: "name", Typ: types.String},
}

// TenantColumnsNoReplication appear in all SHOW VIRTUAL CLUSTER queries, except
// for SHOW VIRTUAL CLUSTER ... WITH REPLICATION STATUS.
var TenantColumnsNoReplication = ResultColumns{
{Name: "data_state", Typ: types.String},
{Name: "service_mode", Typ: types.String},
}

// TenantColumnsWithReplication is appended to TenantColumns for
// SHOW VIRTUAL CLUSTER ... WITH REPLICATION STATUS queries.
// TenantColumnsWithReplication is appended to TenantColumns for SHOW VIRTUAL
// CLUSTER ... WITH REPLICATION STATUS queries.
var TenantColumnsWithReplication = ResultColumns{
{Name: "source_tenant_name", Typ: types.String},
{Name: "source_cluster_uri", Typ: types.String},
{Name: "replication_job_id", Typ: types.Int},
// The latest fully replicated time.
{Name: "replicated_time", Typ: types.TimestampTZ},
// The protected timestamp on the destination cluster, meaning we cannot
// cutover to before this time.
{Name: "retained_time", Typ: types.TimestampTZ},
// The latest fully replicated time.
{Name: "replicated_time", Typ: types.TimestampTZ},
{Name: "replication_lag", Typ: types.Interval},
{Name: "cutover_time", Typ: types.Decimal},
{Name: "status", Typ: types.String},
}

// TenantColumnsWithPriorReplication is appended to TenantColumns for
// SHOW VIRTUAL CLUSTER ... WITH PRIOR REPLICATION DETAILS queries.
// TenantColumnsWithPriorReplication is appended to TenantColumns and
// TenantColumnsNoReplication for SHOW VIRTUAL CLUSTER ... WITH PRIOR
// REPLICATION DETAILS queries.
var TenantColumnsWithPriorReplication = ResultColumns{
{Name: "source_id", Typ: types.String},
{Name: "activation_time", Typ: types.Decimal},
}

// TenantColumnsWithCapabilities is appended to TenantColumns for
// SHOW VIRTUAL CLUSTER ... WITH CAPABILITIES queries.
// TenantColumnsWithCapabilities is appended to TenantColumns and
// TenantColumnsNoReplication for SHOW VIRTUAL CLUSTER ... WITH CAPABILITIES
// queries.
var TenantColumnsWithCapabilities = ResultColumns{
{Name: "capability_name", Typ: types.String},
{Name: "capability_value", Typ: types.String},
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/tenant
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ SELECT * FROM [SHOW TENANTS] WHERE id = 4
id name data_state service_mode
4 two ready none

query ITTTT colnames
SELECT id, name, data_state, service_mode, source_tenant_name FROM [SHOW TENANTS WITH REPLICATION STATUS] WHERE id = 4
query ITT colnames
SELECT id, name, source_tenant_name FROM [SHOW TENANTS WITH REPLICATION STATUS] WHERE id = 4
----
id name data_state service_mode source_tenant_name
4 two ready none NULL
id name source_tenant_name
4 two NULL

statement error tenant "seven" does not exist
SHOW TENANT seven
Expand Down

0 comments on commit 4cc2bbd

Please sign in to comment.