Skip to content

Commit

Permalink
Merge pull request #79553 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-77284

release-22.1: backupccl: Backup LATEST file is no longer overwritten
  • Loading branch information
DarrylWong committed Apr 7, 2022
2 parents 7b6a3cd + 9ac51e0 commit 58ddc3e
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 20 deletions.
88 changes: 74 additions & 14 deletions pkg/ccl/backupccl/backup_destination.go
Expand Up @@ -10,23 +10,28 @@ package backupccl

import (
"context"
"encoding/hex"
"fmt"
"net/url"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -331,20 +336,52 @@ func readLatestFile(
}

// findLatestFile returns a ioctx.ReaderCloserCtx of the most recent LATEST
// file. First it tries reading from the latest-history directory. If
// file. First it tries reading from the latest directory. If
// the backup is from an older version, it may not exist there yet so
// it tries reading in the base directory if the first attempt fails.
func findLatestFile(
ctx context.Context, exportStore cloud.ExternalStorage,
) (ioctx.ReadCloserCtx, error) {
latestFile, err := exportStore.ReadFile(ctx, latestHistoryDirectory+"/"+latestFileName)
if err != nil {
latestFile, err = exportStore.ReadFile(ctx, latestFileName)
if err != nil {
return nil, errors.Wrap(err, "LATEST file could not be read in base or metadata directory")
var latestFile string
var latestFileFound bool
// First try reading from the metadata/latest directory. If the backup
// is from an older version, it may not exist there yet so try reading
// in the base directory if the first attempt fails.

// We name files such that the most recent latest file will always
// be at the top, so just grab the first filename.
err := exportStore.List(ctx, latestHistoryDirectory, "", func(p string) error {
p = strings.TrimPrefix(p, "/")
latestFile = p
latestFileFound = true
// We only want the first latest file so return an error that it is
// done listing.
return cloud.ErrListingDone
})
// If the list failed because the storage used does not support listing,
// such as http, we can try reading the non-timestamped backup latest
// file directly. This can still fail if it is a mixed cluster and the
// latest file was written in the base directory.
if errors.Is(err, cloud.ErrListingUnsupported) {
r, err := exportStore.ReadFile(ctx, latestHistoryDirectory+"/"+latestFileName)
if err == nil {
return r, nil
}
} else if err != nil && !errors.Is(err, cloud.ErrListingDone) {
return nil, err
}

if latestFileFound {
return exportStore.ReadFile(ctx, latestHistoryDirectory+"/"+latestFile)
}

// The latest file couldn't be found in the latest directory,
// try the base directory instead.
r, err := exportStore.ReadFile(ctx, latestFileName)
if err != nil {
return nil, errors.Wrap(err, "LATEST file could not be read in base or metadata directory")
}
return latestFile, err
return r, nil
}

// writeNewLatestFile writes a new LATEST file to both the base directory
Expand All @@ -353,14 +390,37 @@ func writeNewLatestFile(
ctx context.Context, settings *cluster.Settings, exportStore cloud.ExternalStorage, suffix string,
) error {
// If the cluster is still running on a mixed version, we want to write
// to the base directory as well the progress directory. That way if
// an old node resumes a backup, it doesn't have to start over.
// to the base directory instead of the metadata/latest directory. That
// way an old node can still find the LATEST file.
if !settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) {
err := cloud.WriteFile(ctx, exportStore, latestFileName, strings.NewReader(suffix))
if err != nil {
return err
}
return cloud.WriteFile(ctx, exportStore, latestFileName, strings.NewReader(suffix))
}

return cloud.WriteFile(ctx, exportStore, latestHistoryDirectory+"/"+latestFileName, strings.NewReader(suffix))
// HTTP storage does not support listing and so we cannot rely on the
// above-mentioned List method to return us the most recent latest file.
// Instead, we disregard write once semantics and always read and write
// a non-timestamped latest file for HTTP.
if exportStore.Conf().Provider == roachpb.ExternalStorageProvider_http {
return cloud.WriteFile(ctx, exportStore, latestFileName, strings.NewReader(suffix))
}

// We timestamp the latest files in order to enforce write once backups.
// When the job goes to read these timestamped files, it will List
// the latest files and pick the file whose name is lexicographically
// sorted to the top. This will be the last latest file we write. It
// Takes the one's complement of the timestamp so that files are sorted
// lexicographically such that the most recent is always the top.
return cloud.WriteFile(ctx, exportStore, newTimestampedLatestFileName(), strings.NewReader(suffix))
}

// newTimestampedLatestFileName returns a string of a new latest filename
// with a suffixed version. It returns it in the format of LATEST-<version>
// where version is a hex encoded one's complement of the timestamp.
// This means that as long as the supplied timestamp is correct, the filenames
// will adhere to a lexicographical/utf-8 ordering such that the most
// recent file is at the top.
func newTimestampedLatestFileName() string {
var buffer []byte
buffer = encoding.EncodeStringDescending(buffer, timeutil.Now().String())
return fmt.Sprintf("%s/%s-%s", latestHistoryDirectory, latestFileName, hex.EncodeToString(buffer))
}
113 changes: 111 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud/azure"
"github.com/cockroachdb/cockroach/pkg/cloud/gcp"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -59,6 +60,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -10415,13 +10417,120 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) {
var actual string
err = store.List(ctx, "/progress/", "", func(f string) error {
actual = f
return cloud.ErrListingUnsupported
return cloud.ErrListingDone
})
require.Equal(t, err, cloud.ErrListingUnsupported)
require.Equal(t, err, cloud.ErrListingDone)
require.Equal(t, expectedCheckpoint, actual)
for _, checkpoint := range checkpoints {
require.NoError(t, store.Delete(ctx, "/progress/"+checkpoint))
}
})
}
}

// TestBackupNoOverwriteCheckpoint tests BACKUP to see that it no longer
// overwrites the latest file.
func TestBackupNoOverwriteLatest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1
const userfile = "'userfile:///a'"
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()

execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, "userfile:///a", security.RootUserName())
require.NoError(t, err)
findNumLatestFiles := func() (int, string) {
var numLatestFiles int
var latestFile string
err = store.List(ctx, latestHistoryDirectory, "", func(p string) error {
if numLatestFiles == 0 {
latestFile = p
}
numLatestFiles++
return nil
})
require.NoError(t, err)
return numLatestFiles, latestFile
}

query := fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)
numLatest, firstLatest := findNumLatestFiles()
require.Equal(t, numLatest, 1)

query = fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)
numLatest, secondLatest := findNumLatestFiles()
require.Equal(t, numLatest, 2)
require.NotEqual(t, firstLatest, secondLatest)

query = fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)
numLatest, thirdLatest := findNumLatestFiles()
require.Equal(t, numLatest, 3)
require.NotEqual(t, firstLatest, thirdLatest)
}

// TestBackupLatestInBaseDirectory tests to see that a LATEST
// file in the base directory can be properly read when one is not found
// in metadata/latest. This can occur when an older version node creates
// the backup.
func TestBackupLatestInBaseDirectory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

disableUpgradeCh := make(chan struct{})
const numAccounts = 1
const userfile = "'userfile:///a'"
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: clusterversion.ByKey(clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint - 1),
DisableAutomaticVersionUpgrade: disableUpgradeCh,
},
},
},
}

tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, args)
defer cleanupFn()
execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, "userfile:///a", security.RootUserName())
require.NoError(t, err)

query := fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)

// Confirm that the LATEST file was written to the base directory.
r, err := store.ReadFile(ctx, latestFileName)
require.NoError(t, err)
r.Close(ctx)

// Close the channel so that the cluster version is upgraded.
close(disableUpgradeCh)
// Check the cluster version is bumped to newVersion.
testutils.SucceedsSoon(t, func() error {
var version string
sqlDB.QueryRow(t, "SELECT value FROM system.settings WHERE name = 'version'").Scan(&version)
var v clusterversion.ClusterVersion
if err := protoutil.Unmarshal([]byte(version), &v); err != nil {
return err
}
version = v.String()
if version != clusterversion.TestingBinaryVersion.String() {
return errors.Errorf("cluster version is still %s, should be %s", version, clusterversion.TestingBinaryVersion.String())
}
return nil
})

// Take an incremental backup on the new version using the latest file
// written by the old version in the base directory.
query = fmt.Sprintf("BACKUP INTO LATEST IN %s", userfile)
sqlDB.Exec(t, query)
}
12 changes: 9 additions & 3 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Expand Up @@ -12,9 +12,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"io/ioutil"
"net/url"
"path"
"regexp"
"sort"
"strconv"
Expand All @@ -28,12 +26,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -797,7 +797,13 @@ INSERT INTO t1 values (-1), (10), (-100);
}

// Verify backup.
latest, err := ioutil.ReadFile(path.Join(th.iodir, "backup", testName, latestHistoryDirectory+"/"+latestFileName))
execCfg := th.server.ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, destination, security.RootUserName())
require.NoError(t, err)
r, err := findLatestFile(ctx, store)
require.NoError(t, err)
latest, err := ioctx.ReadAll(ctx, r)
require.NoError(t, err)
backedUp := th.sqlDB.QueryStr(t,
`SELECT database_name, object_name FROM [SHOW BACKUP $1] WHERE object_type='table' ORDER BY database_name, object_name`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/manifest_handling.go
Expand Up @@ -1288,7 +1288,7 @@ func readLatestCheckpointFile(
checkpoint = strings.TrimSuffix(p, backupManifestChecksumSuffix)
checkpointFound = true
// We only want the first checkpoint so return an error that it is
// listing.
// done listing.
return cloud.ErrListingDone
})
// If the list failed because the storage used does not support listing,
Expand Down

0 comments on commit 58ddc3e

Please sign in to comment.