Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: freezer index repair #29792

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func (batch *freezerTableBatch) maybeCommit() error {
return nil
}

// commit writes the batched items to the backing freezerTable.
// commit writes the batched items to the backing freezerTable. Note index
// file isn't fsync'd after the file write, the recent write can be lost
// after the power failure.
func (batch *freezerTableBatch) commit() error {
// Write data. The head file is fsync'd after write to ensure the
// data is truly transferred to disk.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
Expand All @@ -194,15 +194,10 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the sync call above as well, if we extend/modify the repair?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we have no mechanism to detect the whether the data file (batch.t.head) contains garbage or not after the power failure. It's the reason I leave the sync operation for data file.

However, I think we can do the background file sync for data file, with a specific time interval. For example, mongo uses this strategy by flushing the file every 100 milliseconds

MongoDB also uses a write-ahead logging via a separate journal. Each write operation is appended to this journal, which is then flushed to disk every 100 milliseconds, taking aspects of the group commit and async commit approaches. If you cannot tolerate losing up to 100 milliseconds of data, you can optionally force an early flush of the journal when issuing a write, increasing the commit latency.

batch.dataBuffer = batch.dataBuffer[:0]

// Write indices. The index file is fsync'd after write to ensure the
// data indexes are truly transferred to disk.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
if err := batch.t.index.Sync(); err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]

Expand Down
130 changes: 129 additions & 1 deletion core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rawdb

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
Expand All @@ -26,6 +27,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -219,7 +221,13 @@ func (t *freezerTable) repair() error {
return err
} // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
// Validate the index file as it might contain some garbage data after the
// power failures.
if err := t.repairIndex(); err != nil {
return err
}
// Retrieve the file sizes and prepare for truncation. Note the file size
// might be changed after index validation.
if stat, err = t.index.Stat(); err != nil {
return err
}
Expand Down Expand Up @@ -364,6 +372,126 @@ func (t *freezerTable) repair() error {
return nil
}

// repairIndex validates the integrity of the index file. According to the design,
// the initial entry in the file denotes the earliest data file along with the
// count of deleted items. Following this, all subsequent entries in the file must
// be in order. This function identifies any corrupted entries and truncates items
// occurring after the corruption point.
//
// corruption can occur because of the power failure. In the Linux kernel, the
// file metadata update and data update are not necessarily performed at the
// same time. Typically, the metadata will be flushed/journalled ahead of the file
// data. Therefore, we make the pessimistic assumption that the file is first
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
// the correct data replaces the garbage. As all the items in index file are
// supposed to be in-order, the leftover garbage must be truncated before the
// index data is utilized.
//
// It's important to note an exception that's unfortunately undetectable: when
// all index entries in the file are zero. Distinguishing whether they represent
// leftover garbage or if all items in the table have zero size is impossible.
// In such instances, the file will remain unchanged to prevent potential data
// loss or misinterpretation.
func (t *freezerTable) repairIndex() error {
// Retrieve the file sizes and prepare for validation
stat, err := t.index.Stat()
if err != nil {
return err
}
size := stat.Size()

// Move the read cursor to the beginning of the file
_, err = t.index.Seek(0, io.SeekStart)
if err != nil {
return err
}
fr := bufio.NewReader(t.index)

var (
start = time.Now()
buff = make([]byte, indexEntrySize)
prev indexEntry
head indexEntry

read = func() (indexEntry, error) {
n, err := io.ReadFull(fr, buff)
if err != nil {
return indexEntry{}, err
}
if n != indexEntrySize {
return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n)
}
var entry indexEntry
entry.unmarshalBinary(buff)
return entry, nil
}
truncate = func(offset int64) error {
if t.readonly {
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
}
if err := truncateFreezerFile(t.index, offset); err != nil {
return err
}
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions could just be methods on freezerTable

)
for offset := int64(0); offset < size; offset += indexEntrySize {
entry, err := read()
if err != nil {
return err
}
if offset == 0 {
head = entry
continue
}
// Ensure that the first non-head index refers to the earliest file,
// or the next file if the earliest file is not sufficient to
// place the first item.
if offset == indexEntrySize {
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 {
log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum)
return truncate(offset)
}
prev = entry
continue
}
// ensure two consecutive index items are in order
if err := t.checkIndexItems(prev, entry); err != nil {
log.Error("Corrupted index item detected", "err", err)
return truncate(offset)
Copy link
Contributor

@fjl fjl May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should log the error here. Maybe pass the error into truncate to log it as part of the warning it prints.

}
prev = entry
}
// Move the read cursor to the end of the file. While theoretically, the
// cursor should reach the end by reading all the items in the file, perform
// the seek operation anyway as a precaution.
_, err = t.index.Seek(0, io.SeekEnd)
if err != nil {
return err
}
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

// checkIndexItems checks the validity of two consecutive index items. The index
// item is regarded as invalid if:
// - file number of two index items are not same and not monotonically increasing
// - data offset of two index items with same file number are out of order
// - zero data offset with an increasing file number
func (t *freezerTable) checkIndexItems(a, b indexEntry) error {
if b.filenum != a.filenum && b.filenum != a.filenum+1 {
return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum)
}
if b.filenum == a.filenum && b.offset < a.offset {
return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset)
}
if b.filenum == a.filenum+1 && b.offset == 0 {
return fmt.Errorf("index items with zero offset, file number: %d", b.filenum)
}
return nil
}

// preopen opens all files that the freezer will need. This method should be called from an init-context,
// since it assumes that it doesn't have to bother with locking
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
Expand Down
66 changes: 66 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) {
t.Fatal(err)
}
}

func TestIndexValidation(t *testing.T) {
const (
items = 30
dataSize = 10
)
garbage := indexEntry{
filenum: 100,
offset: 200,
}
var cases = []struct {
offset int64
data []byte
expItems int
}{
// extend index file with zero bytes at the end
{
offset: (items + 1) * indexEntrySize,
data: make([]byte, indexEntrySize),
expItems: 30,
},
// write garbage in the first non-head item
{
offset: indexEntrySize,
data: garbage.append(nil),
expItems: 0,
},
// write garbage in the first non-head item
{
offset: (items/2 + 1) * indexEntrySize,
data: garbage.append(nil),
expItems: items / 2,
},
}
for _, c := range cases {
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
writeChunks(t, f, items, dataSize)

// write corrupted data
f.index.WriteAt(c.data, c.offset)
f.Close()

// reopen the table, corruption should be truncated
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
for i := 0; i < c.expItems; i++ {
exp := getChunk(10, i)
got, err := f.Retrieve(uint64(i))
if err != nil {
t.Fatalf("Failed to read from table, %v", err)
}
if !bytes.Equal(exp, got) {
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
}
}
if f.items.Load() != uint64(c.expItems) {
t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load())
}
}
}
4 changes: 2 additions & 2 deletions triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
Expand Down Expand Up @@ -268,7 +268,7 @@ func (dl *diskLayer) setBufferSize(size int) error {
if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
}

// size returns the approximate size of cached nodes in the disk layer.
Expand Down
13 changes: 10 additions & 3 deletions triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ func (b *nodebuffer) empty() bool {

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
b.limit = uint64(size)
return b.flush(db, clean, id, false)
return b.flush(db, freezer, clean, id, false)
}

// allocBatch returns a database batch with pre-allocated buffer.
Expand All @@ -215,7 +215,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
if b.size <= b.limit && !force {
return nil
}
Expand All @@ -228,6 +228,13 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
batch = b.allocBatch(db)
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
return err
}
}
nodes := writeNodes(batch, b.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Expand Down
Loading