diff --git a/store/lsm_store.go b/store/lsm_store.go index 0dd44e3d4..a8e84ca88 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -25,6 +25,7 @@ const ( snapshotBatchSize = 1000 dirPerms = 0755 metaLastCommitTS = "_meta_last_commit_ts" + spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore ) var metaLastCommitTSBytes = []byte(metaLastCommitTS) @@ -633,7 +634,7 @@ func (s *pebbleStore) Snapshot() (Snapshot, error) { return newPebbleSnapshot(snap, lastCommitTS), nil } -func (s *pebbleStore) restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, error) { +func restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, error) { var kLen uint64 if err := binary.Read(r, binary.LittleEndian, &kLen); err != nil { if errors.Is(err, io.EOF) { @@ -662,11 +663,18 @@ func (s *pebbleStore) restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, e } func (s *pebbleStore) restoreBatchLoop(r io.Reader) error { - batch := s.db.NewBatch() + return restoreBatchLoopInto(r, s.db) +} + +// restoreBatchLoopInto reads raw Pebble key-value entries from r and writes +// them into db using batched commits. It is used for both the direct and the +// temp-dir atomic native Pebble restore paths. +func restoreBatchLoopInto(r io.Reader, db *pebble.DB) error { + batch := db.NewBatch() batchCnt := 0 for { - eof, err := s.restoreOneEntry(r, batch) + eof, err := restoreOneEntry(r, batch) if err != nil { _ = batch.Close() return err @@ -682,7 +690,7 @@ func (s *pebbleStore) restoreBatchLoop(r io.Reader) error { return errors.WithStack(err) } _ = batch.Close() - batch = s.db.NewBatch() + batch = db.NewBatch() batchCnt = 0 } } @@ -700,11 +708,9 @@ func (s *pebbleStore) Restore(r io.Reader) error { switch { case bytes.Equal(header, pebbleSnapshotMagic[:]): - // Native Pebble format: clear the existing DB first, then stream in. - if err := s.reopenFreshDB(); err != nil { - return err - } - return s.restorePebbleNative(br) + // Native Pebble format: restorePebbleNativeAtomic performs a temp-dir + // swap so the existing DB is preserved if the restore fails midway. + return s.restorePebbleNativeAtomic(br) case bytes.Equal(header, mvccSnapshotMagic[:]): // Streaming MVCC format: restoreFromStreamingMVCC performs an atomic // temp-dir swap, so the existing DB is preserved until checksum @@ -713,6 +719,11 @@ func (s *pebbleStore) Restore(r io.Reader) error { default: // Legacy gob format: restoreFromLegacyGob performs an atomic // temp-dir swap similarly. + // NOTE: Older "pre-magic" Pebble snapshots do not exist — the + // pebbleSnapshotMagic header has been present since the Pebble store + // was introduced. An unrecognised stream is treated as legacy gob and + // will produce a decode error with a clear message if the stream is + // neither gob nor a valid Pebble snapshot. return s.restoreFromLegacyGob(br) } } @@ -758,11 +769,14 @@ func (s *pebbleStore) reopenFreshDB() error { } // restorePebbleNative restores from the current Pebble snapshot format -// (magic "EKVPBBL1" + lastCommitTS + raw key-value entries). +// (magic "EKVPBBL1" + lastCommitTS + raw key-value entries) into s.db directly. // -// Note: The pebbleSnapshotMagic header has been present in every Pebble -// snapshot since the Pebble store was first introduced. There is no previous -// Pebble snapshot format without this magic header. +// Format history / compatibility: +// - The pebbleSnapshotMagic header was introduced with this "native" Pebble +// snapshot format when the Pebble store was first added. There are no +// pre-magic Pebble snapshots. A stream dispatched to this function MUST +// start with the magic; otherwise "invalid pebble snapshot magic header" +// is returned. func (s *pebbleStore) restorePebbleNative(r io.Reader) error { var magic [8]byte if _, err := io.ReadFull(r, magic[:]); err != nil { @@ -780,7 +794,65 @@ func (s *pebbleStore) restorePebbleNative(r io.Reader) error { if err := s.saveLastCommitTS(ts); err != nil { return err } - return s.restoreBatchLoop(r) + return restoreBatchLoopInto(r, s.db) +} + +// restorePebbleNativeAtomic atomically restores a native Pebble snapshot. +// It writes all entries into a temporary sibling directory and only swaps the +// temp directory into s.dir after a full successful read, preserving the +// existing DB on any failure. +func (s *pebbleStore) restorePebbleNativeAtomic(r io.Reader) error { + var magic [8]byte + if _, err := io.ReadFull(r, magic[:]); err != nil { + return errors.WithStack(err) + } + if !bytes.Equal(magic[:], pebbleSnapshotMagic[:]) { + return errors.New("invalid pebble snapshot magic header") + } + + var ts uint64 + if err := binary.Read(r, binary.LittleEndian, &ts); err != nil { + return errors.WithStack(err) + } + + tmpDir, err := os.MkdirTemp(filepath.Dir(filepath.Clean(s.dir)), filepath.Base(filepath.Clean(s.dir))+"-pebble-native-*") + if err != nil { + return errors.WithStack(err) + } + // MkdirTemp creates an empty directory; remove it so pebble.Open can + // initialise a fresh database at the same path. + if err := os.Remove(tmpDir); err != nil { + return errors.WithStack(err) + } + tmpDB, err := pebble.Open(tmpDir, defaultPebbleOptions()) + if err != nil { + return errors.WithStack(err) + } + + if err := restoreBatchLoopInto(r, tmpDB); err != nil { + _ = tmpDB.Close() + _ = os.RemoveAll(tmpDir) + return err + } + + var tsBuf [timestampSize]byte + binary.LittleEndian.PutUint64(tsBuf[:], ts) + if err := tmpDB.Set(metaLastCommitTSBytes, tsBuf[:], pebble.Sync); err != nil { + _ = tmpDB.Close() + _ = os.RemoveAll(tmpDir) + return errors.WithStack(err) + } + + if err := tmpDB.Close(); err != nil { + _ = os.RemoveAll(tmpDir) + return errors.WithStack(err) + } + + if err := s.swapInTempDB(tmpDir); err != nil { + return err + } + s.lastCommitTS = ts + return nil } // restoreFromStreamingMVCC restores from the in-memory MVCCStore streaming @@ -895,7 +967,10 @@ func writeMVCCEntriesToDB(body io.Reader, db *pebble.DB) error { // to s.dir, then reopens the DB. The caller is responsible for closing tmpDB // before calling this. func (s *pebbleStore) swapInTempDB(tmpDir string) error { - _ = s.db.Close() + if err := s.db.Close(); err != nil { + _ = os.RemoveAll(tmpDir) + return errors.WithStack(err) + } if err := os.RemoveAll(s.dir); err != nil { _ = os.RemoveAll(tmpDir) return errors.WithStack(err) @@ -947,14 +1022,15 @@ func (s *pebbleStore) restoreLegacyGobToTempDB(entries []mvccSnapshotEntry, last // restoreFromLegacyGob restores from the legacy gob-encoded MVCCStore // snapshot format (gob payload + CRC32 trailer). // -// The snapshot is spooled to a temporary file co-located with s.dir to avoid -// loading the entire payload into memory and to keep I/O on the same -// filesystem. Entries are written into a temporary Pebble directory and only -// swapped into place after decoding succeeds, preserving the existing store -// on failure. +// The CRC32 is computed in a single pass while spooling the gob payload to a +// temporary file co-located with s.dir. The CRC32 trailer is NOT written to +// the spool file — only the pure gob payload is stored, so the decoder can +// read the spool file directly without needing a LimitReader. Entries are +// written into a temporary Pebble directory and only swapped into place after +// decoding succeeds, preserving the existing store on failure. func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error { - // Spool to a temp file inside s.dir to keep restore I/O on the same - // filesystem as the store and avoid cross-device surprises. + // Spool gob payload to a temp file inside s.dir to keep restore I/O on + // the same filesystem as the store and avoid cross-device surprises. tmpFile, err := os.CreateTemp(s.dir, "ekv-legacy-gob-*.tmp") if err != nil { return errors.WithStack(err) @@ -970,24 +1046,59 @@ func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error { _ = os.Remove(tmpPath) } - if _, err := io.Copy(tmpFile, r); err != nil { - closeTmp() - return errors.WithStack(err) + // Stream r into tmpFile while computing CRC32 over the gob payload. + // The last checksumSize bytes are the CRC32 trailer (LittleEndian uint32); + // we keep them in a rolling tail buffer so that only the payload is written + // to tmpFile and hashed. This avoids reading the file twice. + hasher := crc32.NewIEEE() + buf := make([]byte, spoolBufSize) + var tail []byte + for { + n, readErr := r.Read(buf) + if n > 0 { + // Extend tail with the new chunk. Since tail is at most + // checksumSize bytes (4), this incurs minimal copy overhead. + tail = append(tail, buf[:n]...) + if len(tail) > checksumSize { + toProcessLen := len(tail) - checksumSize + toProcess := tail[:toProcessLen] + if _, err := tmpFile.Write(toProcess); err != nil { + closeTmp() + return errors.WithStack(err) + } + _, _ = hasher.Write(toProcess) + // Retain only the potential CRC32 trailer bytes; copy to a + // fresh backing array to avoid aliasing the previous slice. + tail = append([]byte(nil), tail[toProcessLen:]...) + } + } + if readErr == io.EOF { + break + } + if readErr != nil { + closeTmp() + return errors.WithStack(readErr) + } } - payloadSize, err := verifyCRC32Trailer(tmpFile) - if err != nil { + // After EOF, tail must be exactly the 4-byte CRC32 trailer. + if len(tail) != checksumSize { closeTmp() - return err + return errors.WithStack(ErrInvalidChecksum) + } + storedChecksum := binary.LittleEndian.Uint32(tail) + if hasher.Sum32() != storedChecksum { + closeTmp() + return errors.WithStack(ErrInvalidChecksum) } - // Decode the gob payload. + // Decode the gob payload from the spool file (CRC trailer not present). if _, err := tmpFile.Seek(0, io.SeekStart); err != nil { closeTmp() return errors.WithStack(err) } var snapshot mvccSnapshot - if err := gob.NewDecoder(io.LimitReader(tmpFile, payloadSize)).Decode(&snapshot); err != nil { + if err := gob.NewDecoder(tmpFile).Decode(&snapshot); err != nil { closeTmp() return errors.WithStack(err) } @@ -1002,39 +1113,6 @@ func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error { return nil } -// verifyCRC32Trailer verifies the CRC32 checksum appended at the end of f. -// On success it returns the payload size (file size minus the checksum trailer). -func verifyCRC32Trailer(f *os.File) (int64, error) { - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, errors.WithStack(err) - } - if size < checksumSize { - return 0, errors.WithStack(ErrInvalidChecksum) - } - payloadSize := size - checksumSize - - if _, err := f.Seek(-checksumSize, io.SeekEnd); err != nil { - return 0, errors.WithStack(err) - } - var storedChecksum uint32 - if err := binary.Read(f, binary.LittleEndian, &storedChecksum); err != nil { - return 0, errors.WithStack(err) - } - - if _, err := f.Seek(0, io.SeekStart); err != nil { - return 0, errors.WithStack(err) - } - hasher := crc32.NewIEEE() - if _, err := io.Copy(hasher, io.LimitReader(f, payloadSize)); err != nil { - return 0, errors.WithStack(err) - } - if hasher.Sum32() != storedChecksum { - return 0, errors.WithStack(ErrInvalidChecksum) - } - return payloadSize, nil -} - // writeGobEntriesToDB writes the decoded gob snapshot entries into db using // batched commits. func writeGobEntriesToDB(entries []mvccSnapshotEntry, db *pebble.DB) error { diff --git a/store/lsm_store_test.go b/store/lsm_store_test.go index c879485c2..511f38245 100644 --- a/store/lsm_store_test.go +++ b/store/lsm_store_test.go @@ -411,3 +411,43 @@ func TestPebbleStore_Restore_PebbleMagicMismatch(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "invalid pebble snapshot magic header") } + +// TestPebbleStore_Restore_NativePebbleAtomic verifies that when a native +// Pebble snapshot restore fails midway (truncated data), the existing DB +// contents are preserved and not wiped. +func TestPebbleStore_Restore_NativePebbleAtomic(t *testing.T) { + ctx := context.Background() + + dir, err := os.MkdirTemp("", "pebble-atomic-restore-*") + require.NoError(t, err) + defer os.RemoveAll(dir) + + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + // Pre-populate with known data. + require.NoError(t, s.PutAt(ctx, []byte("existing"), []byte("value"), 10, 0)) + require.Equal(t, uint64(10), s.LastCommitTS()) + + // Build a valid snapshot then truncate it to simulate corruption. + snap, err := s.Snapshot() + require.NoError(t, err) + defer snap.Close() + var raw bytes.Buffer + _, err = snap.WriteTo(&raw) + require.NoError(t, err) + + // Truncate: keep magic (8) + lastCommitTS (8) but include only a partial + // entry framing (3 bytes of what would be an 8-byte key-length field). + truncated := raw.Bytes()[:16+3] // partial key-length field (8 bytes expected) + + // Restore from truncated snapshot should fail. + err = s.Restore(bytes.NewReader(truncated)) + require.Error(t, err) + + // The original data should still be accessible. + val, getErr := s.GetAt(ctx, []byte("existing"), 10) + require.NoError(t, getErr) + assert.Equal(t, []byte("value"), val) +}