Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: skip backing up excluded spans #108627

Merged
merged 1 commit into from Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Expand Up @@ -156,6 +156,23 @@ func backup(
}
}

// Add the spans for any tables that are excluded from backup to the set of
// already-completed spans, as there is nothing to do for them.
descs := iterFactory.NewDescIter(ctx)
defer descs.Close()
for ; ; descs.Next() {
if ok, err := descs.Valid(); err != nil {
return roachpb.RowCount{}, 0, err
} else if !ok {
break
}

if tbl, _, _, _, _ := descpb.GetDescriptors(descs.Value()); tbl != nil && tbl.ExcludeDataFromBackup {
prefix := execCtx.ExecCfg().Codec.TablePrefix(uint32(tbl.ID))
completedSpans = append(completedSpans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()})
dt marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Subtract out any completed spans.
spans := filterSpans(backupManifest.Spans, completedSpans)
introducedSpans := filterSpans(backupManifest.IntroducedSpans, completedIntroducedSpans)
Expand Down
58 changes: 47 additions & 11 deletions pkg/ccl/backupccl/backup_test.go
Expand Up @@ -6454,6 +6454,15 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
})
}

// Check if export request is from a lease for a descriptor to avoid picking
// up on wrong export requests
func isLeasingExportRequest(r *kvpb.ExportRequest) bool {
_, tenantID, _ := keys.DecodeTenantPrefix(r.Key)
codec := keys.MakeSQLCodec(tenantID)
return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) &&
r.EndKey.Equal(r.Key.PrefixEnd())
}

func TestPaginatedBackupTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -6489,14 +6498,6 @@ func TestPaginatedBackupTenant(t *testing.T) {
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

// Check if export request is from a lease for a descriptor to avoid picking
// up on wrong export requests
isLeasingExportRequest := func(r *kvpb.ExportRequest) bool {
_, tenantID, _ := keys.DecodeTenantPrefix(r.Key)
codec := keys.MakeSQLCodec(tenantID)
return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) &&
r.EndKey.Equal(r.Key.PrefixEnd())
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range request.Requests {
Expand Down Expand Up @@ -9437,6 +9438,8 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var exportReqsAtomic int64

tc, sqlDB, iodir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 10,
InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand All @@ -9448,6 +9451,17 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
SpanConfig: &spanconfig.TestingKnobs{
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range request.Requests {
if exportRequest, ok := ru.GetInner().(*kvpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
atomic.AddInt64(&exportReqsAtomic, 1)
}
}
return nil
},
},
},
},
})
Expand All @@ -9468,8 +9482,11 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
conn := tc.Conns[0]

sqlDB.Exec(t, `CREATE TABLE data.foo (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.foo select * from generate_series(1,10)`)
sqlDB.Exec(t, `INSERT INTO data.foo select * from generate_series(1,5)`)

sqlDB.Exec(t, `BACKUP DATABASE data INTO $1`, localFoo)

sqlDB.Exec(t, `INSERT INTO data.foo select * from generate_series(6,10)`)
// Create another table.
sqlDB.Exec(t, `CREATE TABLE data.bar (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.bar select * from generate_series(1,10)`)
Expand All @@ -9489,11 +9506,30 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
}
return true, nil
})
sqlDB.Exec(t, `BACKUP DATABASE data INTO $1`, localFoo)

sqlDB.Exec(t, `BACKUP DATABASE data INTO LATEST IN $1`, localFoo)
sqlDB.Exec(t, `CREATE TABLE data.baz (id INT)`)
sqlDB.Exec(t, `ALTER TABLE data.baz SET (exclude_data_from_backup = true)`)
sqlDB.Exec(t, `INSERT INTO data.baz select * from generate_series(1,10)`)

waitForReplicaFieldToBeSet(t, tc, conn, "baz", "data", func(r *kvserver.Replica) (bool, error) {
if !r.ExcludeDataFromBackup() {
return false, errors.New("waiting for the range containing table data.foo to split")
}
return true, nil
})

sqlDB.Exec(t, `BACKUP DATABASE data INTO LATEST IN $1`, localFoo)

restoreDB.Exec(t, `RESTORE DATABASE data FROM LATEST IN $1`, localFoo)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.foo`), 0)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.foo`), 5)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.bar`), 10)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.baz`), 0)

before := atomic.LoadInt64(&exportReqsAtomic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so at this point data.foo is going to be empty as asserted above. Can we add a few rows to it before attempt a backup.

Also s/BACKUP TO/BACKUP INTO/g

sqlDB.Exec(t, `BACKUP data.foo TO $1`, localFoo+"/tbl")
after := atomic.LoadInt64(&exportReqsAtomic)
require.Equal(t, before, after)
}

// TestExportRequestBelowGCThresholdOnDataExcludedFromBackup tests that a
Expand Down