Skip to content

Commit

Permalink
backupccl: add a unit test for backup pts target selection
Browse files Browse the repository at this point in the history
Informs: cockroachdb#73727

Release note: None
  • Loading branch information
adityamaru committed Dec 28, 2021
1 parent 859c54d commit c7dde69
Showing 1 changed file with 111 additions and 0 deletions.
111 changes: 111 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -84,6 +85,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
Expand Down Expand Up @@ -9172,3 +9174,112 @@ func TestBackupRestoreSeparateIncrementalPrefix(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE inc_fkdb;")
}
}

func TestGetProtectedTimestampTargetForBackup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var allowRequest chan struct{}
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, ru := range ba.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
<-allowRequest
}
}
return nil
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 0, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE foo;`)
var dbID descpb.ID
sqlDB.QueryRow(t, `SELECT id from system.namespace WHERE name = 'foo'`).Scan(&dbID)

sqlDB.Exec(t, `CREATE TABLE foo.bar (id INT);`)
var barID descpb.ID
sqlDB.QueryRow(t, `SELECT id from system.namespace WHERE name = 'bar'`).Scan(&barID)

sqlDB.Exec(t, `CREATE TABLE foo.baz (id INT);`)
var bazID descpb.ID
sqlDB.QueryRow(t, `SELECT id from system.namespace WHERE name = 'baz'`).Scan(&bazID)

sqlDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")

ctx := context.Background()
s := tc.Server(0)
ptp := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
getPTSRecord := func(ptsID uuid.UUID) (r *ptpb.Record, err error) {
err = s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
r, err = ptp.GetRecord(ctx, txn, ptsID)
return err
})
return r, err
}

for _, test := range []struct {
name string
backupStmt string
expectedRecordTarget *ptpb.Target
}{
{
"cluster-backup",
`BACKUP INTO 'nodelocal://1/cluster' WITH detached`,
ptpb.MakeRecordClusterTarget(),
},
{
"db-backup",
`BACKUP DATABASE foo INTO 'nodelocal://1/cluster' WITH detached`,
ptpb.MakeRecordSchemaObjectsTarget([]descpb.ID{dbID}),
},
{
"wildcard-backup",
`BACKUP TABLE foo.* INTO 'nodelocal://1/cluster' WITH detached`,
ptpb.MakeRecordSchemaObjectsTarget([]descpb.ID{dbID}),
},
{
"table-backup",
`BACKUP TABLE foo.baz INTO 'nodelocal://1/cluster' WITH detached`,
ptpb.MakeRecordSchemaObjectsTarget([]descpb.ID{bazID}),
},
} {
t.Run(test.name, func(t *testing.T) {
allowRequest = make(chan struct{})
var jobID jobspb.JobID
sqlDB.QueryRow(t, test.backupStmt).Scan(&jobID)
const stmt = "SELECT payload FROM system.jobs WHERE id = $1"
rows := sqlDB.Query(t, stmt, jobID)
defer rows.Close()
for rows.Next() {
var payloadBytes []byte
if err := rows.Scan(&payloadBytes); err != nil {
t.Fatal(err)
}

payload := &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
t.Fatal("cannot unmarshal job payload from system.jobs")
}

backupPayload, ok := payload.Details.(*jobspb.Payload_Backup)
if !ok {
t.Logf("job %T is not a backup: %v", payload.Details, payload.Details)
continue
}
backupDetails := backupPayload.Backup
ptsRecord, err := getPTSRecord(*backupDetails.ProtectedTimestampRecord)
require.NoError(t, err)
require.Equal(t, test.expectedRecordTarget, ptsRecord.Target)
}
if err := rows.Err(); err != nil {
t.Fatalf("unexpected error querying jobs: %s", err.Error())
}
close(allowRequest)
jobutils.WaitForJob(t, sqlDB, jobID)
})
}
}

0 comments on commit c7dde69

Please sign in to comment.