Skip to content

Commit

Permalink
Merge pull request #115709 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-114616
  • Loading branch information
pav-kv committed Dec 7, 2023
2 parents 004cb44 + f8af9ff commit 6fc66ab
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 38 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Expand Up @@ -36,6 +36,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//raftpb",
Expand Down Expand Up @@ -71,6 +72,8 @@ go_test(
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_etcd_go_raft_v3//raftpb",
"@org_golang_x_time//rate",
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/logstore/sideload.go
Expand Up @@ -34,6 +34,9 @@ type SideloadStorage interface {
// Writes the given contents to the file specified by the given index and
// term. Overwrites the file if it already exists.
Put(_ context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm, contents []byte) error
// Sync syncs the underlying filesystem metadata so that all the preceding
// mutations, such as Put and TruncateTo, are durable.
Sync() error
// Load the file at the given index and term. Return errSideloadedFileNotFound when no
// such file is present.
Get(_ context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm) ([]byte, error)
Expand Down Expand Up @@ -140,8 +143,12 @@ func MaybeSideloadEntries(
sideloadedEntriesSize += int64(len(dataToSideload))
}

if output == nil {
// We never saw a sideloaded command.
if output != nil { // there is at least one sideloaded command
// Sync the sideloaded storage directory so that the commands are durable.
if err := sideloaded.Sync(); err != nil {
return nil, 0, 0, 0, err
}
} else { // we never saw a sideloaded command
output = input
}

Expand Down
106 changes: 88 additions & 18 deletions pkg/kv/kvserver/logstore/sideload_disk.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand All @@ -37,11 +38,10 @@ var _ SideloadStorage = &DiskSideloadStorage{}
//
// TODO(pavelkalinnikov): remove the interface, this type is the only impl.
type DiskSideloadStorage struct {
st *cluster.Settings
limiter *rate.Limiter
dir string
dirCreated bool
eng storage.Engine
st *cluster.Settings
limiter *rate.Limiter
dir string
eng storage.Engine
}

func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
Expand All @@ -50,7 +50,7 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
// per directory, respectively. Newer FS typically have no such limitation,
// but still.
//
// For example, r1828 will end up in baseDir/r1XXX/r1828.
// For example, r1828 will end up in baseDir/sideloading/r1XXX/r1828.
return filepath.Join(
baseDir,
"sideloading",
Expand All @@ -76,12 +76,6 @@ func NewDiskSideloadStorage(
}
}

func (ss *DiskSideloadStorage) createDir() error {
err := ss.eng.MkdirAll(ss.dir, os.ModePerm)
ss.dirCreated = ss.dirCreated || err == nil
return err
}

// Dir implements SideloadStorage.
func (ss *DiskSideloadStorage) Dir() string {
return ss.dir
Expand All @@ -102,15 +96,37 @@ func (ss *DiskSideloadStorage) Put(
} else if !oserror.IsNotExist(err) {
return err
}
// createDir() ensures ss.dir exists but will not create any subdirectories
// within ss.dir because filename() does not make subdirectories in ss.dir.
if err := ss.createDir(); err != nil {
// Ensure that ss.dir exists. The filename() is placed directly in ss.dir,
// so the next loop iteration should succeed.
if err := mkdirAllAndSyncParents(ss.eng, ss.dir, os.ModePerm); err != nil {
return err
}
continue
}
}

// Sync implements SideloadStorage.
func (ss *DiskSideloadStorage) Sync() error {
dir, err := ss.eng.OpenDir(ss.dir)
// The directory can be missing because we did not Put() any entry to it yet,
// or it has been removed by TruncateTo() or Clear().
//
// TODO(pavelkalinnikov): if ss.dir existed and has been removed, we should
// sync the parent of ss.dir, to persist the removal. Otherwise it may come
// back after a restart. Alternatively, and more likely, we should cleanup
// leftovers upon restart - we have other TODOs for that.
if oserror.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := dir.Sync(); err != nil {
_ = dir.Close()
return err
}
return dir.Close()
}

// Get implements SideloadStorage.
func (ss *DiskSideloadStorage) Get(
ctx context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm,
Expand Down Expand Up @@ -170,9 +186,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (

// Clear implements SideloadStorage.
func (ss *DiskSideloadStorage) Clear(_ context.Context) error {
err := ss.eng.RemoveAll(ss.dir)
ss.dirCreated = ss.dirCreated && err != nil
return err
return ss.eng.RemoveAll(ss.dir)
}

// TruncateTo implements SideloadStorage.
Expand All @@ -198,6 +212,7 @@ func (ss *DiskSideloadStorage) possiblyTruncateTo(
return nil
}
if index < from {
// TODO(pavelkalinnikov): these files may never be removed. Clean them up.
return nil
}
// index is in [from, to)
Expand All @@ -222,6 +237,8 @@ func (ss *DiskSideloadStorage) possiblyTruncateTo(
// Not worth trying to figure out which one, just try to delete.
err := ss.eng.Remove(ss.dir)
if err != nil && !oserror.IsNotExist(err) {
// TODO(pavelkalinnikov): this is possible because deletedAll can be left
// true despite existence of files with index < from which are skipped.
log.Infof(ctx, "unable to remove sideloaded dir %s: %v", ss.dir, err)
err = nil // handled
}
Expand Down Expand Up @@ -285,3 +302,56 @@ func (ss *DiskSideloadStorage) String() string {
fmt.Fprintf(&buf, "(%d files)\n", count)
return buf.String()
}

// mkdirAllAndSyncParents creates the given directory and all its missing
// parents if any. For every newly created directly, it syncs the corresponding
// parent directory. The directories are created using the provided permissions
// mask, with the same semantics as in os.MkdirAll.
//
// For example, if path is "/x/y/z", and "/x" previously existed, then this func
// creates "/x/y" and "/x/y/z", and syncs directories "/x" and "/x/y".
//
// TODO(pavelkalinnikov): this does not work well with paths containing . and ..
// elements inside the data-dir directory. We don't construct the path this way
// though, right now any non-canonical part of the path would be only in the
// <data-dir> path.
//
// TODO(pavelkalinnikov): have a type-safe canonical path type which can be
// iterated without thinking about . and .. placeholders.
func mkdirAllAndSyncParents(fs vfs.FS, path string, perm os.FileMode) error {
// Find the lowest existing directory in the hierarchy.
var exists string
for dir, parent := path, ""; ; dir = parent {
if _, err := fs.Stat(dir); err == nil {
exists = dir
break
} else if !oserror.IsNotExist(err) {
return errors.Wrapf(err, "could not get dir info: %s", dir)
}
parent = fs.PathDir(dir)
// NB: not checking against the separator, to be platform-agnostic.
if dir == "." || parent == dir { // reached the topmost dir or the root
return errors.Newf("topmost dir does not exist: %s", dir)
}
}

// Create the destination directory and any of its missing parents.
if err := fs.MkdirAll(path, perm); err != nil {
return errors.Wrapf(err, "could not create all directories: %s", path)
}

// Sync parent directories up to the lowest existing ancestor, included.
for dir, parent := path, ""; dir != exists; dir = parent {
parent = fs.PathDir(dir)
if handle, err := fs.OpenDir(parent); err != nil {
return errors.Wrapf(err, "could not open parent dir: %s", parent)
} else if err := handle.Sync(); err != nil {
_ = handle.Close()
return errors.Wrapf(err, "could not sync parent dir: %s", parent)
} else if err := handle.Close(); err != nil {
return errors.Wrapf(err, "could not close parent dir: %s", parent)
}
}

return nil
}

0 comments on commit 6fc66ab

Please sign in to comment.