Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: backup: revalidate restored indexes if added during inc backup #63314

Merged
merged 1 commit into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 108 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ func (r *restoreResumer) Resume(
// the first place, as a special case.
var newDescriptorChangeJobs []*jobs.StartableJob
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details)
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details, nil)
return err
}
if err := descs.Txn(
Expand Down Expand Up @@ -1185,8 +1185,23 @@ func (r *restoreResumer) Resume(
return errors.Wrap(err, "inserting table statistics")
}
var newDescriptorChangeJobs []*jobs.StartableJob

var devalidateIndexes map[descpb.ID][]descpb.IndexID
if toValidate := len(details.RevalidateIndexes); toValidate > 0 {
if err := r.job.RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return jobs.RunningStatus(fmt.Sprintf("re-validating %d indexes", toValidate)), nil
}); err != nil {
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(r.job.ID()))
}
bad, err := revalidateIndexes(ctx, p.ExecCfg(), r.job, details.TableDescs, details.RevalidateIndexes)
if err != nil {
return err
}
devalidateIndexes = bad
}

publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details)
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details, devalidateIndexes)
return err
}
if err := descs.Txn(
Expand Down Expand Up @@ -1250,6 +1265,73 @@ func (r *restoreResumer) Resume(
return nil
}

func revalidateIndexes(
ctx context.Context,
execCfg *sql.ExecutorConfig,
job *jobs.Job,
tables []*descpb.TableDescriptor,
indexIDs []jobspb.RestoreDetails_RevalidateIndex,
) (map[descpb.ID][]descpb.IndexID, error) {
indexIDsByTable := make(map[descpb.ID]map[descpb.IndexID]struct{})
for _, idx := range indexIDs {
if indexIDsByTable[idx.TableID] == nil {
indexIDsByTable[idx.TableID] = make(map[descpb.IndexID]struct{})
}
indexIDsByTable[idx.TableID][idx.IndexID] = struct{}{}
}

// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData()).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
})
}

invalidIndexes := make(map[descpb.ID][]descpb.IndexID)

for _, tbl := range tables {
indexes := indexIDsByTable[tbl.ID]
if len(indexes) == 0 {
continue
}
tableDesc := tabledesc.NewImmutable(*tbl)

var forward, inverted []*descpb.IndexDescriptor
for _, idx := range tableDesc.GetIndexes() {
i := idx
if _, ok := indexes[idx.ID]; ok {
switch idx.Type {
case descpb.IndexDescriptor_FORWARD:
forward = append(forward, &i)
case descpb.IndexDescriptor_INVERTED:
inverted = append(inverted, &i)
}
}
}
if len(forward) > 0 {
if err := sql.ValidateForwardIndexes(ctx, tableDesc, forward, runner, execCfg.Settings, execCfg.LeaseManager, false, true); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = invalid.Indexes
} else {
return nil, err
}
}
}
if len(inverted) > 0 {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc, inverted, runner, execCfg.Settings, execCfg.LeaseManager, true, true); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...)
} else {
return nil, err
}
}
}
}
return invalidIndexes, nil
}

// Initiate a run of CREATE STATISTICS. We don't know the actual number of
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
Expand Down Expand Up @@ -1294,7 +1376,11 @@ func insertStats(
// from r.job as the call to r.job.SetDetails will overwrite the job details
// with a new value even if this transaction does not commit.
func (r *restoreResumer) publishDescriptors(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, details jobspb.RestoreDetails,
ctx context.Context,
txn *kv.Txn,
descsCol *descs.Collection,
details jobspb.RestoreDetails,
devalidateIndexes map[descpb.ID][]descpb.IndexID,
) (newDescriptorChangeJobs []*jobs.StartableJob, err error) {
defer func() {
if err == nil {
Expand Down Expand Up @@ -1333,7 +1419,6 @@ func (r *restoreResumer) publishDescriptors(
return errors.Errorf("version mismatch for descriptor %d, expected version %d, got %v",
read.GetID(), read.GetVersion(), exp)
}

// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
for _, tbl := range details.TableDescs {
Expand All @@ -1344,12 +1429,30 @@ func (r *restoreResumer) publishDescriptors(
if err := checkVersion(mutTable, tbl.Version); err != nil {
return newDescriptorChangeJobs, err
}
badIndexes := devalidateIndexes[mutTable.ID]
for _, badIdx := range badIndexes {
found := false
for i := range mutTable.Indexes {
if mutTable.Indexes[i].ID == badIdx {
copied := mutTable.Indexes[i]
if err := mutTable.AddIndexMutation(&copied, descpb.DescriptorMutation_ADD); err != nil {
return newDescriptorChangeJobs, err
}
mutTable.Indexes = append(mutTable.Indexes[:i], mutTable.Indexes[i+1:]...)
found = true
break
}
}
if !found {
return newDescriptorChangeJobs, errors.Errorf("did not find invalid index %d in table %d (%q) to drop and re-add", badIdx, mutTable.ID, mutTable.Name)
}
}
allMutDescs = append(allMutDescs, mutTable)
newTables = append(newTables, mutTable.TableDesc())
// For cluster restores, all the jobs are restored directly from the jobs
// table, so there is no need to re-create ongoing schema change jobs,
// otherwise we'll create duplicate jobs.
if details.DescriptorCoverage != tree.AllDescriptors {
if details.DescriptorCoverage != tree.AllDescriptors || len(badIndexes) > 0 {
// Convert any mutations that were in progress on the table descriptor
// when the backup was taken, and convert them to schema change jobs.
newJobs, err := createSchemaChangeJobsFromMutations(ctx,
Expand Down
198 changes: 198 additions & 0 deletions pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -248,3 +251,198 @@ func restoreOldVersionClusterTest(exportDir string) func(t *testing.T) {
sqlDB.CheckQueryResults(t, "SELECT * FROM data.bank", [][]string{{"1"}})
}
}

/*
func TestCreateIncBackupMissingIndexEntries(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer jobs.TestingSetAdoptAndCancelIntervals(5*time.Millisecond, 5*time.Millisecond)()

const numAccounts = 10
const numBackups = 9
windowSize := int(numAccounts / 3)

blockBackfill := make(chan struct{})
defer close(blockBackfill)

backfillWaiting := make(chan struct{})
defer close(backfillWaiting)

ctx, tc, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(
t, singleNode, 0, InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
select {
case backfillWaiting <- struct{}{}:
case <-time.After(time.Second * 5):
panic("timeout blocking in knob")
}
select {
case <-blockBackfill:
case <-time.After(time.Second * 5):
panic("timeout blocking in knob")
}
return nil
},
},
}},
},
)
defer cleanupFn()
args := base.TestServerArgs{ExternalIODir: dir}
rng, _ := randutil.NewPseudoRand()

sqlDB.Exec(t, `CREATE TABLE data.jsontest (id INT PRIMARY KEY, j JSONB)`)
sqlDB.Exec(t, `INSERT INTO data.jsontest VALUES (1, '{"a": "a", "b": "b"}'), (2, '{"c": "c", "d":"d"}')`)

sqlDB.Exec(t, `CREATE TABLE data.geotest (id INT PRIMARY KEY, p geometry(point))`)
sqlDB.Exec(t, `INSERT INTO data.geotest VALUES (1, 'POINT(1.0 1.0)'), (2, 'POINT(2.0 2.0)')`)

var backupDirs []string
var checksums []uint32
{
for backupNum := 0; backupNum < numBackups; backupNum++ {
// In the following, windowSize is `w` and offset is `o`. The first
// mutation creates accounts with id [w,3w). Every mutation after
// that deletes everything less than o, leaves [o, o+w) unchanged,
// mutates [o+w,o+2w), and inserts [o+2w,o+3w).
offset := windowSize * backupNum
var buf bytes.Buffer
fmt.Fprintf(&buf, `DELETE FROM data.bank WHERE id < %d; `, offset)
buf.WriteString(`UPSERT INTO data.bank VALUES `)
for j := 0; j < windowSize*2; j++ {
if j != 0 {
buf.WriteRune(',')
}
id := offset + windowSize + j
payload := randutil.RandBytes(rng, backupRestoreRowPayloadSize)
fmt.Fprintf(&buf, `(%d, %d, '%s')`, id, backupNum, payload)
}
sqlDB.Exec(t, buf.String())
createErr := make(chan error)
go func() {
defer close(createErr)
var stmt string
switch backupNum % 3 {
case 0:
stmt = fmt.Sprintf(`CREATE INDEX test_idx_%d ON data.bank (balance)`, backupNum+1)
case 1:
stmt = fmt.Sprintf(`CREATE INDEX test_idx_%d ON data.jsontest USING GIN(j)`, backupNum+1)
case 2:
stmt = fmt.Sprintf(`CREATE INDEX test_idx_%d ON data.geotest USING GIST(p)`, backupNum+1)
}
t.Log(stmt)
_, err := sqlDB.DB.ExecContext(ctx, stmt)
createErr <- err
}()
select {
case <-backfillWaiting:
case err := <-createErr:
t.Fatal(err)
}
checksums = append(checksums, checksumBankPayload(t, sqlDB))

backupDir := fmt.Sprintf("nodelocal://0/%d", backupNum)
var from string
if backupNum > 0 {
from = fmt.Sprintf(` INCREMENTAL FROM %s`, strings.Join(backupDirs, `,`))
}
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO '%s' %s`, backupDir, from))
blockBackfill <- struct{}{}
require.NoError(t, <-createErr)

backupDirs = append(backupDirs, fmt.Sprintf(`'%s'`, backupDir))
}

// Test a regression in RESTORE where the batch end key was not
// being set correctly in Import: make an incremental backup such that
// the greatest key in the diff is less than the previous backups.
sqlDB.Exec(t, `INSERT INTO data.bank VALUES (0, -1, 'final')`)
checksums = append(checksums, checksumBankPayload(t, sqlDB))
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO '%s' %s`,
"nodelocal://0/final", fmt.Sprintf(` INCREMENTAL FROM %s`, strings.Join(backupDirs, `,`)),
))
backupDirs = append(backupDirs, `'nodelocal://0/final'`)
}
os.Rename(dir, path/to/testdata)
}
*/

// TestRestoreOldBackupMissingOfflineIndexes tests restoring a backup made by
// v20.2 prior to the introduction of excluding offline indexes in #62572 using
// the commented-out code above in TestCreateIncBackupMissingIndexEntries. Note:
// that code needs to be pasted into a branch checkout _prior_ to the inclusion
// of the mentioned PR.
func TestRestoreOldBackupMissingOfflineIndexes(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderRace(t, "times out under race cause it starts up two test servers")
ctx := context.Background()

badBackups, err := filepath.Abs("testdata/restore_old_versions/inc_missing_addsst/v20.2.7")
require.NoError(t, err)
args := base.TestServerArgs{ExternalIODir: badBackups}
backupDirs := make([]string, 9)
for i := range backupDirs {
backupDirs[i] = fmt.Sprintf("'nodelocal://0/%d'", i)
}

// Start a new cluster to restore into.
{
for i := len(backupDirs); i > 0; i-- {
restoreTC := testcluster.StartTestCluster(t, singleNode, base.TestClusterArgs{ServerArgs: args})
defer restoreTC.Stopper().Stop(context.Background())
sqlDBRestore := sqlutils.MakeSQLRunner(restoreTC.Conns[0])
from := strings.Join(backupDirs[:i], `,`)
sqlDBRestore.Exec(t, fmt.Sprintf(`RESTORE FROM %s`, from))
for j := i; j > 1; j-- {
var res int64
switch j % 3 {
case 2:
for i := 0; i < 50; i++ {
if err := sqlDBRestore.DB.QueryRowContext(ctx,
fmt.Sprintf(`SELECT count(*) FROM data.bank@test_idx_%d`, j-1),
).Scan(&res); err != nil {
if !strings.Contains(err.Error(), `not found`) {
t.Fatal(err)
}
t.Logf("index %d doesn't exist yet on attempt %d", j-1, i)
time.Sleep(time.Millisecond * 50)
continue
}
break
}
var expected int64
sqlDBRestore.QueryRow(t, `SELECT count(*) FROM data.bank@primary`).Scan(&expected)
if res != expected {
t.Fatalf("got %d, expected %d", res, expected)
}
// case 1 and 0 are both inverted, which we can't validate via SQL, so
// this is just checking that it eventually shows up, i.e. that the code
// to validate and create the schema change works.
case 0:
found := false
for i := 0; i < 50; i++ {
if err := sqlDBRestore.DB.QueryRowContext(ctx,
fmt.Sprintf(`SELECT count(*) FROM data.jsontest@test_idx_%d`, j-1),
).Scan(&res); err != nil {
if strings.Contains(err.Error(), `is inverted`) {
found = true
break
}
if !strings.Contains(err.Error(), `not found`) {
t.Fatal(err)
}
t.Logf("index %d doesn't exist yet on attempt %d", j-1, i)
time.Sleep(time.Millisecond * 50)
}
}
if !found {
t.Fatal("expected index to come back")
}
}
}
}
}
}
Loading