Skip to content

Commit

Permalink
storage: fix PebbleFileRegistry bug that drops entry on rollover
Browse files Browse the repository at this point in the history
The writeToRegistryFile method first writes the new batch, containing file
mappings, to the registry file, and then if the registry file is too big,
creates a new registry file. The new registry file is populated with the
contents of the map, which doesn't yet contain the edits in the batch,
resulting in a loss of these edits when the file registry is reopened. This
PR changes the logic to first rollover if the registry file is too big,
and then writes the batch to the new file.

This bug has existed since the record writer based registry was implemented
239377a.
When it leads to a loss of a file mapping in the registry, it will be
noticed by Pebble as a corruption (so not a silent failure) since the file
corresponding to the mapping will be assumed to be unencrypted, but can't
be successfully read as an unencrypted file. Since we have not seen this
occur in production settings, we suspect that an observable mapping loss
is rare because compactions typically rewrite the files in those lost
mappings before the file registry is reopened.

Epic: none

Fixes: #106617

Release note: None
  • Loading branch information
sumeerbhola committed Jul 20, 2023
1 parent 11834b9 commit f5ab7a3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
31 changes: 17 additions & 14 deletions pkg/storage/pebble_file_registry.go
Expand Up @@ -398,10 +398,23 @@ func (r *PebbleFileRegistry) applyBatch(batch *enginepb.RegistryUpdateBatch) {
}

func (r *PebbleFileRegistry) writeToRegistryFile(batch *enginepb.RegistryUpdateBatch) error {
// Create a new file registry file if one doesn't exist yet.
if r.mu.registryWriter == nil {
// Create a new file registry file if one doesn't exist yet, or we need to
// rollover.
nilWriter := r.mu.registryWriter == nil
if nilWriter || r.mu.registryWriter.Size() > maxRegistrySize {
// If !nilWriter, exceeded the size threshold: create a new file registry
// file to hopefully shrink the size of the file.
//
// TODO(storage-team): fix this rollover logic to not rollover with each
// change if the total number of entries is large enough to exceed
// maxRegistrySize. Do something similar to what we do with the Pebble
// MANIFEST rollover.
if err := r.createNewRegistryFile(); err != nil {
return errors.Wrap(err, "creating new registry file")
if nilWriter {
return errors.Wrap(err, "creating new registry file")
} else {
return errors.Wrap(err, "rotating registry file")
}
}
}
w, err := r.mu.registryWriter.Next()
Expand All @@ -418,17 +431,7 @@ func (r *PebbleFileRegistry) writeToRegistryFile(batch *enginepb.RegistryUpdateB
if err := r.mu.registryWriter.Flush(); err != nil {
return err
}
if err := r.mu.registryFile.Sync(); err != nil {
return err
}
// Create a new file registry file to hopefully shrink the size of the file
// if we have exceeded the max registry size.
if r.mu.registryWriter.Size() > maxRegistrySize {
if err := r.createNewRegistryFile(); err != nil {
return errors.Wrap(err, "rotating registry file")
}
}
return nil
return r.mu.registryFile.Sync()
}

func makeRegistryFilename(iter uint64) string {
Expand Down
76 changes: 76 additions & 0 deletions pkg/storage/pebble_file_registry_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"fmt"
"io"
"math/rand"
"os"
"runtime/debug"
"sort"
Expand Down Expand Up @@ -504,3 +505,78 @@ func (f loggingFile) Sync() error {
fmt.Fprintf(f.w, "sync(%q)\n", f.name)
return f.File.Sync()
}

func TestFileRegistryRollover(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const dir = "/mydb"
mem := vfs.NewMem()
require.NoError(t, mem.MkdirAll(dir, 0755))
registry := &PebbleFileRegistry{FS: mem, DBDir: dir}
require.NoError(t, registry.Load())

// All the registry files created so far. Some may have been subsequently
// deleted.
var registryFiles []string
accumRegistryFiles := func() {
registry.mu.Lock()
defer registry.mu.Unlock()
n := len(registryFiles)
if registry.mu.registryFilename != "" &&
(n == 0 || registry.mu.registryFilename != registryFiles[n-1]) {
registryFiles = append(registryFiles, registry.mu.registryFilename)
n++
}
}
numAddedEntries := 0
// Large settings slice, so that the test rolls over registry files quickly.
encryptionSettings := make([]byte, 1<<20)
rand.Read(encryptionSettings)
// Check that the entries we have added are in the file registry and there
// isn't an additional entry.
checkAddedEntries := func() {
for i := 0; i < numAddedEntries; i++ {
filename := fmt.Sprintf("%04d", i)
entry := registry.GetFileEntry(filename)
require.NotNil(t, entry)
require.Equal(t, filename, string(entry.EncryptionSettings[len(entry.EncryptionSettings)-4:]))
}
// Does not have an additional entry.
filename := fmt.Sprintf("%04d", numAddedEntries)
entry := registry.GetFileEntry(filename)
require.Nil(t, entry)
}
addEntry := func() {
filename := fmt.Sprintf("%04d", numAddedEntries)
// Create a file for this added entry so that it doesn't get cleaned up
// when we reopen the file registry.
f, err := mem.Create(mem.PathJoin(dir, filename))
require.NoError(t, err)
require.NoError(t, f.Sync())
require.NoError(t, f.Close())
fileEntry := &enginepb.FileEntry{EnvType: enginepb.EnvType_Data}
fileEntry.EncryptionSettings = append(encryptionSettings, []byte(filename)...)
require.NoError(t, registry.SetFileEntry(filename, fileEntry))
numAddedEntries++
}
for {
created := len(registryFiles)
accumRegistryFiles()
if created != len(registryFiles) {
// Rolled over.
checkAddedEntries()
}
// Rollover a few times.
if len(registryFiles) == 4 {
break
}
addEntry()
}
require.NoError(t, registry.Close())
registry = &PebbleFileRegistry{FS: mem, DBDir: dir}
require.NoError(t, registry.Load())
// Check added entries again.
checkAddedEntries()
require.NoError(t, registry.Close())
}

0 comments on commit f5ab7a3

Please sign in to comment.