Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 141 additions & 63 deletions store/lsm_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
snapshotBatchSize = 1000
dirPerms = 0755
metaLastCommitTS = "_meta_last_commit_ts"
spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore
)

var metaLastCommitTSBytes = []byte(metaLastCommitTS)
Expand Down Expand Up @@ -633,7 +634,7 @@ func (s *pebbleStore) Snapshot() (Snapshot, error) {
return newPebbleSnapshot(snap, lastCommitTS), nil
}

func (s *pebbleStore) restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, error) {
func restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, error) {
var kLen uint64
if err := binary.Read(r, binary.LittleEndian, &kLen); err != nil {
if errors.Is(err, io.EOF) {
Expand Down Expand Up @@ -662,11 +663,18 @@ func (s *pebbleStore) restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, e
}

func (s *pebbleStore) restoreBatchLoop(r io.Reader) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
func (*pebbleStore).restoreBatchLoop is unused (unused)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot fix golangci-lint

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot func (*pebbleStore).restoreBatchLoop is unused (unused)

batch := s.db.NewBatch()
return restoreBatchLoopInto(r, s.db)
}

// restoreBatchLoopInto reads raw Pebble key-value entries from r and writes
// them into db using batched commits. It is used for both the direct and the
// temp-dir atomic native Pebble restore paths.
func restoreBatchLoopInto(r io.Reader, db *pebble.DB) error {
batch := db.NewBatch()
batchCnt := 0

for {
eof, err := s.restoreOneEntry(r, batch)
eof, err := restoreOneEntry(r, batch)
if err != nil {
_ = batch.Close()
return err
Expand All @@ -682,7 +690,7 @@ func (s *pebbleStore) restoreBatchLoop(r io.Reader) error {
return errors.WithStack(err)
}
_ = batch.Close()
batch = s.db.NewBatch()
batch = db.NewBatch()
batchCnt = 0
}
}
Expand All @@ -700,11 +708,9 @@ func (s *pebbleStore) Restore(r io.Reader) error {

switch {
case bytes.Equal(header, pebbleSnapshotMagic[:]):
// Native Pebble format: clear the existing DB first, then stream in.
if err := s.reopenFreshDB(); err != nil {
return err
}
return s.restorePebbleNative(br)
// Native Pebble format: restorePebbleNativeAtomic performs a temp-dir
// swap so the existing DB is preserved if the restore fails midway.
return s.restorePebbleNativeAtomic(br)
case bytes.Equal(header, mvccSnapshotMagic[:]):
// Streaming MVCC format: restoreFromStreamingMVCC performs an atomic
// temp-dir swap, so the existing DB is preserved until checksum
Expand All @@ -713,6 +719,11 @@ func (s *pebbleStore) Restore(r io.Reader) error {
default:
// Legacy gob format: restoreFromLegacyGob performs an atomic
// temp-dir swap similarly.
// NOTE: Older "pre-magic" Pebble snapshots do not exist — the
// pebbleSnapshotMagic header has been present since the Pebble store
// was introduced. An unrecognised stream is treated as legacy gob and
// will produce a decode error with a clear message if the stream is
// neither gob nor a valid Pebble snapshot.
return s.restoreFromLegacyGob(br)
}
}
Expand Down Expand Up @@ -758,11 +769,14 @@ func (s *pebbleStore) reopenFreshDB() error {
}

// restorePebbleNative restores from the current Pebble snapshot format
// (magic "EKVPBBL1" + lastCommitTS + raw key-value entries).
// (magic "EKVPBBL1" + lastCommitTS + raw key-value entries) into s.db directly.
//
// Note: The pebbleSnapshotMagic header has been present in every Pebble
// snapshot since the Pebble store was first introduced. There is no previous
// Pebble snapshot format without this magic header.
// Format history / compatibility:
// - The pebbleSnapshotMagic header was introduced with this "native" Pebble
// snapshot format when the Pebble store was first added. There are no
// pre-magic Pebble snapshots. A stream dispatched to this function MUST
// start with the magic; otherwise "invalid pebble snapshot magic header"
// is returned.
func (s *pebbleStore) restorePebbleNative(r io.Reader) error {
var magic [8]byte
if _, err := io.ReadFull(r, magic[:]); err != nil {
Expand All @@ -780,7 +794,65 @@ func (s *pebbleStore) restorePebbleNative(r io.Reader) error {
if err := s.saveLastCommitTS(ts); err != nil {
return err
}
return s.restoreBatchLoop(r)
return restoreBatchLoopInto(r, s.db)
}

// restorePebbleNativeAtomic atomically restores a native Pebble snapshot.
// It writes all entries into a temporary sibling directory and only swaps the
// temp directory into s.dir after a full successful read, preserving the
// existing DB on any failure.
func (s *pebbleStore) restorePebbleNativeAtomic(r io.Reader) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
calculated cyclomatic complexity for function restorePebbleNativeAtomic is 11, max is 10 (cyclop)

var magic [8]byte
if _, err := io.ReadFull(r, magic[:]); err != nil {
return errors.WithStack(err)
}
if !bytes.Equal(magic[:], pebbleSnapshotMagic[:]) {
return errors.New("invalid pebble snapshot magic header")
}

var ts uint64
if err := binary.Read(r, binary.LittleEndian, &ts); err != nil {
return errors.WithStack(err)
}

tmpDir, err := os.MkdirTemp(filepath.Dir(filepath.Clean(s.dir)), filepath.Base(filepath.Clean(s.dir))+"-pebble-native-*")
if err != nil {
return errors.WithStack(err)
}
// MkdirTemp creates an empty directory; remove it so pebble.Open can
// initialise a fresh database at the same path.
if err := os.Remove(tmpDir); err != nil {
return errors.WithStack(err)
}
tmpDB, err := pebble.Open(tmpDir, defaultPebbleOptions())
if err != nil {
return errors.WithStack(err)
}

if err := restoreBatchLoopInto(r, tmpDB); err != nil {
_ = tmpDB.Close()
_ = os.RemoveAll(tmpDir)
return err
}

var tsBuf [timestampSize]byte
binary.LittleEndian.PutUint64(tsBuf[:], ts)
if err := tmpDB.Set(metaLastCommitTSBytes, tsBuf[:], pebble.Sync); err != nil {
_ = tmpDB.Close()
_ = os.RemoveAll(tmpDir)
return errors.WithStack(err)
}

if err := tmpDB.Close(); err != nil {
_ = os.RemoveAll(tmpDir)
return errors.WithStack(err)
}

if err := s.swapInTempDB(tmpDir); err != nil {
return err
}
s.lastCommitTS = ts
return nil
}

// restoreFromStreamingMVCC restores from the in-memory MVCCStore streaming
Expand Down Expand Up @@ -895,7 +967,10 @@ func writeMVCCEntriesToDB(body io.Reader, db *pebble.DB) error {
// to s.dir, then reopens the DB. The caller is responsible for closing tmpDB
// before calling this.
func (s *pebbleStore) swapInTempDB(tmpDir string) error {
_ = s.db.Close()
if err := s.db.Close(); err != nil {
_ = os.RemoveAll(tmpDir)
return errors.WithStack(err)
}
if err := os.RemoveAll(s.dir); err != nil {
_ = os.RemoveAll(tmpDir)
return errors.WithStack(err)
Expand Down Expand Up @@ -947,14 +1022,15 @@ func (s *pebbleStore) restoreLegacyGobToTempDB(entries []mvccSnapshotEntry, last
// restoreFromLegacyGob restores from the legacy gob-encoded MVCCStore
// snapshot format (gob payload + CRC32 trailer).
//
// The snapshot is spooled to a temporary file co-located with s.dir to avoid
// loading the entire payload into memory and to keep I/O on the same
// filesystem. Entries are written into a temporary Pebble directory and only
// swapped into place after decoding succeeds, preserving the existing store
// on failure.
// The CRC32 is computed in a single pass while spooling the gob payload to a
// temporary file co-located with s.dir. The CRC32 trailer is NOT written to
// the spool file — only the pure gob payload is stored, so the decoder can
// read the spool file directly without needing a LimitReader. Entries are
// written into a temporary Pebble directory and only swapped into place after
// decoding succeeds, preserving the existing store on failure.
func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
calculated cyclomatic complexity for function restoreFromLegacyGob is 13, max is 10 (cyclop)

// Spool to a temp file inside s.dir to keep restore I/O on the same
// filesystem as the store and avoid cross-device surprises.
// Spool gob payload to a temp file inside s.dir to keep restore I/O on
// the same filesystem as the store and avoid cross-device surprises.
tmpFile, err := os.CreateTemp(s.dir, "ekv-legacy-gob-*.tmp")
if err != nil {
return errors.WithStack(err)
Expand All @@ -970,24 +1046,59 @@ func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error {
_ = os.Remove(tmpPath)
}

if _, err := io.Copy(tmpFile, r); err != nil {
closeTmp()
return errors.WithStack(err)
// Stream r into tmpFile while computing CRC32 over the gob payload.
// The last checksumSize bytes are the CRC32 trailer (LittleEndian uint32);
// we keep them in a rolling tail buffer so that only the payload is written
// to tmpFile and hashed. This avoids reading the file twice.
hasher := crc32.NewIEEE()
buf := make([]byte, spoolBufSize)
var tail []byte
for {
n, readErr := r.Read(buf)
if n > 0 {
// Extend tail with the new chunk. Since tail is at most
// checksumSize bytes (4), this incurs minimal copy overhead.
tail = append(tail, buf[:n]...)
if len(tail) > checksumSize {
toProcessLen := len(tail) - checksumSize
toProcess := tail[:toProcessLen]
if _, err := tmpFile.Write(toProcess); err != nil {
closeTmp()
return errors.WithStack(err)
}
_, _ = hasher.Write(toProcess)
// Retain only the potential CRC32 trailer bytes; copy to a
// fresh backing array to avoid aliasing the previous slice.
tail = append([]byte(nil), tail[toProcessLen:]...)
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
closeTmp()
return errors.WithStack(readErr)
}
}

payloadSize, err := verifyCRC32Trailer(tmpFile)
if err != nil {
// After EOF, tail must be exactly the 4-byte CRC32 trailer.
if len(tail) != checksumSize {
closeTmp()
return err
return errors.WithStack(ErrInvalidChecksum)
}
storedChecksum := binary.LittleEndian.Uint32(tail)
if hasher.Sum32() != storedChecksum {
closeTmp()
return errors.WithStack(ErrInvalidChecksum)
}

// Decode the gob payload.
// Decode the gob payload from the spool file (CRC trailer not present).
if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
closeTmp()
return errors.WithStack(err)
}
var snapshot mvccSnapshot
if err := gob.NewDecoder(io.LimitReader(tmpFile, payloadSize)).Decode(&snapshot); err != nil {
if err := gob.NewDecoder(tmpFile).Decode(&snapshot); err != nil {
closeTmp()
return errors.WithStack(err)
}
Expand All @@ -1002,39 +1113,6 @@ func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error {
return nil
}

// verifyCRC32Trailer verifies the CRC32 checksum appended at the end of f.
// On success it returns the payload size (file size minus the checksum trailer).
func verifyCRC32Trailer(f *os.File) (int64, error) {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return 0, errors.WithStack(err)
}
if size < checksumSize {
return 0, errors.WithStack(ErrInvalidChecksum)
}
payloadSize := size - checksumSize

if _, err := f.Seek(-checksumSize, io.SeekEnd); err != nil {
return 0, errors.WithStack(err)
}
var storedChecksum uint32
if err := binary.Read(f, binary.LittleEndian, &storedChecksum); err != nil {
return 0, errors.WithStack(err)
}

if _, err := f.Seek(0, io.SeekStart); err != nil {
return 0, errors.WithStack(err)
}
hasher := crc32.NewIEEE()
if _, err := io.Copy(hasher, io.LimitReader(f, payloadSize)); err != nil {
return 0, errors.WithStack(err)
}
if hasher.Sum32() != storedChecksum {
return 0, errors.WithStack(ErrInvalidChecksum)
}
return payloadSize, nil
}

// writeGobEntriesToDB writes the decoded gob snapshot entries into db using
// batched commits.
func writeGobEntriesToDB(entries []mvccSnapshotEntry, db *pebble.DB) error {
Expand Down
40 changes: 40 additions & 0 deletions store/lsm_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,43 @@ func TestPebbleStore_Restore_PebbleMagicMismatch(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid pebble snapshot magic header")
}

// TestPebbleStore_Restore_NativePebbleAtomic verifies that when a native
// Pebble snapshot restore fails midway (truncated data), the existing DB
// contents are preserved and not wiped.
func TestPebbleStore_Restore_NativePebbleAtomic(t *testing.T) {
ctx := context.Background()

dir, err := os.MkdirTemp("", "pebble-atomic-restore-*")
require.NoError(t, err)
defer os.RemoveAll(dir)

s, err := NewPebbleStore(dir)
require.NoError(t, err)
defer s.Close()

// Pre-populate with known data.
require.NoError(t, s.PutAt(ctx, []byte("existing"), []byte("value"), 10, 0))
require.Equal(t, uint64(10), s.LastCommitTS())

// Build a valid snapshot then truncate it to simulate corruption.
snap, err := s.Snapshot()
require.NoError(t, err)
defer snap.Close()
var raw bytes.Buffer
_, err = snap.WriteTo(&raw)
require.NoError(t, err)

// Truncate: keep magic (8) + lastCommitTS (8) but include only a partial
// entry framing (3 bytes of what would be an 8-byte key-length field).
truncated := raw.Bytes()[:16+3] // partial key-length field (8 bytes expected)

// Restore from truncated snapshot should fail.
err = s.Restore(bytes.NewReader(truncated))
require.Error(t, err)

// The original data should still be accessible.
val, getErr := s.GetAt(ctx, []byte("existing"), 10)
require.NoError(t, getErr)
assert.Equal(t, []byte("value"), val)
}
Loading