From 716adb2d4dda8f0211fc7f6c91a44ada4c39d960 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 23 Nov 2021 14:02:06 +0800 Subject: [PATCH 01/10] core/rawdb, cmd, ethdb, eth: implement freezer tail deletion --- core/blockchain.go | 6 +- core/rawdb/accessors_chain.go | 6 +- core/rawdb/database.go | 16 +- core/rawdb/freezer.go | 70 +++++- core/rawdb/freezer_batch.go | 2 +- core/rawdb/freezer_meta.go | 236 ++++++++++++++++++ core/rawdb/freezer_meta_test.go | 69 ++++++ core/rawdb/freezer_table.go | 409 +++++++++++++++++++++++-------- core/rawdb/freezer_table_test.go | 262 +++++++++++++++++--- core/rawdb/freezer_test.go | 6 +- core/rawdb/freezer_utils.go | 83 +++++++ core/rawdb/freezer_utils_test.go | 76 ++++++ core/rawdb/table.go | 18 +- ethdb/database.go | 16 +- 14 files changed, 1119 insertions(+), 156 deletions(-) create mode 100644 core/rawdb/freezer_meta.go create mode 100644 core/rawdb/freezer_meta_test.go create mode 100644 core/rawdb/freezer_utils.go create mode 100644 core/rawdb/freezer_utils_test.go diff --git a/core/blockchain.go b/core/blockchain.go index fa7e39fb0189f..fc5275dc70a89 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num); err != nil { + if err := bc.db.TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -991,7 +991,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += int64(batch.ValueSize()) if err = batch.Write(); err != nil { fastBlock := bc.CurrentFastBlock().NumberU64() - if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + if err := bc.db.TruncateHead(fastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, err @@ -1009,7 +1009,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 8e9706ea6fdb9..7e4a06df924b9 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -83,8 +83,8 @@ type NumberHash struct { Hash common.Hash } -// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, -// both canonical and reorged forks included. +// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain +// heights, both canonical and reorged forks included. // This method considers both limits to be _inclusive_. func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { var ( @@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { WriteHeader(db, block.Header()) } -// WriteAncientBlock writes entire block data into ancient store and returns the total written size. +// WriteAncientBlocks writes entire block data into ancient store and returns the total written size. func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) { var ( tdSum = new(big.Int).Set(td) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 5ef64d26a2057..64cc2862bb37e 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported } +// Tail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Tail() (uint64, error) { + return 0, errNotSupported +} + // AncientSize returns an error as we don't have a backing chain freezer. func (db *nofreezedb) AncientSize(kind string) (uint64, error) { return 0, errNotSupported @@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e return 0, errNotSupported } -// TruncateAncients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) TruncateAncients(items uint64) error { +// TruncateHead returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateHead(items uint64) error { + return errNotSupported +} + +// TruncateTail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateTail(items uint64) error { return errNotSupported } @@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // Block #1 is still in the database, we're allowed to init a new feezer } // Otherwise, the head header is still the genesis, we're allowed to init a new - // feezer. + // freezer. } } // Freezer is consistent with the key-value database, permit combining the two diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 88c72625eedee..3f66f983e5fad 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -59,14 +59,14 @@ const ( freezerRecheckInterval = time.Minute // freezerBatchLimit is the maximum number of blocks to freeze in one batch - // before doing an fsync and deleting it from the key-value store. + // before doing a fsync and deleting it from the key-value store. freezerBatchLimit = 30000 // freezerTableSize defines the maximum size of freezer data files. freezerTableSize = 2 * 1000 * 1000 * 1000 ) -// freezer is an memory mapped append-only database to store immutable chain data +// freezer is a memory mapped append-only database to store immutable chain data // into flat files: // // - The append only nature ensures that disk writes are minimized. @@ -78,6 +78,7 @@ type freezer struct { // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). frozen uint64 // Number of blocks already frozen + tail uint64 // Number of the first stored item in the freezer threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) // This lock synchronizes writers and the truncate operation, as well as @@ -226,6 +227,11 @@ func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil } +// Tail returns the number of first stored item in the freezer. +func (f *freezer) Tail() (uint64, error) { + return atomic.LoadUint64(&f.tail), nil +} + // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { // This needs the write lock to avoid data races on table fields. @@ -261,7 +267,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize if err != nil { // The write operation has failed. Go back to the previous item position. for name, table := range f.tables { - err := table.truncate(prevItem) + err := table.truncateHead(prevItem) if err != nil { log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) } @@ -281,8 +287,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize return writeSize, nil } -// TruncateAncients discards any recent data above the provided threshold number. -func (f *freezer) TruncateAncients(items uint64) error { +// TruncateHead discards any recent data above the provided threshold number. +func (f *freezer) TruncateHead(items uint64) error { if f.readonly { return errReadOnly } @@ -292,12 +298,42 @@ func (f *freezer) TruncateAncients(items uint64) error { if atomic.LoadUint64(&f.frozen) <= items { return nil } + var frozen uint64 + for _, table := range f.tables { + if err := table.truncateHead(items); err != nil { + return err + } + // Tables should be aligned, only check the first table. + if frozen == 0 { + frozen = atomic.LoadUint64(&table.items) + } + } + atomic.StoreUint64(&f.frozen, frozen) + return nil +} + +// TruncateTail discards any recent data below the provided threshold number. +func (f *freezer) TruncateTail(tail uint64) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + if atomic.LoadUint64(&f.tail) >= tail { + return nil + } + var truncated uint64 for _, table := range f.tables { - if err := table.truncate(items); err != nil { + if err := table.truncateTail(tail); err != nil { return err } + if truncated == 0 { + // Tables should be aligned, only check the first table. + truncated = table.tail() + } } - atomic.StoreUint64(&f.frozen, items) + atomic.StoreUint64(&f.tail, truncated) return nil } @@ -344,19 +380,29 @@ func (f *freezer) validate() error { // repair truncates all data tables to the same length. func (f *freezer) repair() error { - min := uint64(math.MaxUint64) + var ( + head = uint64(math.MaxUint64) + tail = uint64(0) + ) for _, table := range f.tables { items := atomic.LoadUint64(&table.items) - if min > items { - min = items + if head > items { + head = items + } + if table.tail() > tail { + tail = table.tail() } } for _, table := range f.tables { - if err := table.truncate(min); err != nil { + if err := table.truncateHead(head); err != nil { + return err + } + if err := table.truncateTail(tail); err != nil { return err } } - atomic.StoreUint64(&f.frozen, min) + atomic.StoreUint64(&f.frozen, head) + atomic.StoreUint64(&f.tail, tail) return nil } diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 762fa8f25f19d..864a7f5e98bfe 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error { dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] - // Write index. + // Write indices. _, err = batch.t.index.Write(batch.indexBuffer) if err != nil { return err diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go new file mode 100644 index 0000000000000..4df449d53587d --- /dev/null +++ b/core/rawdb/freezer_meta.go @@ -0,0 +1,236 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "bytes" + "errors" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + freezerVersion = 1 // The version tag of freezer table structure + metaLength = 1024 // The number of bytes allocated for the freezer table metadata +) + +var errIncompatibleVersion = errors.New("incompatible version") + +type incompatibleError struct { + version uint16 + expect uint16 + err error +} + +func newIncompatibleError(version uint16) *incompatibleError { + return &incompatibleError{ + version: version, + expect: freezerVersion, + err: errIncompatibleVersion, + } +} + +// Unwrap returns the internal evm error which allows us for further +// analysis outside. +func (err *incompatibleError) Unwrap() error { + return err.err +} + +func (err *incompatibleError) Error() string { + return fmt.Sprintf("%v, get %d, expect %d", err.err, err.version, err.expect) +} + +// freezerTableMeta wraps all the metadata of the freezer table. +type freezerTableMeta struct { + version uint16 // Freezer table version descriptor + tailId uint32 // The number of the earliest file + deleted uint64 // The number of items that have been removed from the table + hidden uint64 // The number of items that have been hidden in the table +} + +// newMetadata initializes the metadata object with the given parameters. +func newMetadata(tailId uint32, deleted uint64, hidden uint64) *freezerTableMeta { + return &freezerTableMeta{ + version: freezerVersion, + tailId: tailId, + deleted: deleted, + hidden: hidden, + } +} + +// encodeMetadata encodes the given parameters as the freezer table metadata. +func encodeMetadata(meta *freezerTableMeta) ([]byte, error) { + buffer := new(bytes.Buffer) + if err := rlp.Encode(buffer, meta.version); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.tailId); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.deleted); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.hidden); err != nil { + return nil, err + } + buffer.Write(make([]byte, metaLength-buffer.Len())) // Right pad zero bytes to the specified length + return buffer.Bytes(), nil +} + +// decodeMetadata decodes the freezer-table metadata from the given +// rlp stream. +func decodeMetadata(r *rlp.Stream) (*freezerTableMeta, error) { + var version uint16 + if err := r.Decode(&version); err != nil { + return nil, err + } + if version != freezerVersion { + return nil, newIncompatibleError(version) + } + var tailId uint32 + if err := r.Decode(&tailId); err != nil { + return nil, err + } + var deleted, hidden uint64 + if err := r.Decode(&deleted); err != nil { + return nil, err + } + if err := r.Decode(&hidden); err != nil { + return nil, err + } + return newMetadata(tailId, deleted, hidden), nil +} + +// storeMetadata stores the metadata of the freezer table into the +// given index file. +func storeMetadata(index *os.File, meta *freezerTableMeta) error { + encoded, err := encodeMetadata(meta) + if err != nil { + return err + } + if _, err := index.WriteAt(encoded, 0); err != nil { + return err + } + return nil +} + +// loadMetadata loads the metadata of the freezer table from the +// given index file. Return the error if the version of loaded +// metadata is not expected. +func loadMetadata(index *os.File) (*freezerTableMeta, error) { + stat, err := index.Stat() + if err != nil { + return nil, err + } + if stat.Size() < metaLength { + return nil, newIncompatibleError(0) + } + buffer := make([]byte, metaLength) + if _, err := index.ReadAt(buffer, 0); err != nil { + return nil, err + } + return decodeMetadata(rlp.NewStream(bytes.NewReader(buffer), 0)) +} + +// upgradeV0TableIndex extracts the indexes from version-0 index file and +// encodes/stores them into the latest version index file. +func upgradeV0TableIndex(index *os.File) error { + // Create a temporary offset buffer to read indexEntry info + buffer := make([]byte, indexEntrySize) + + // Read index zero, determine what file is the earliest + // and how many entries are deleted from the freezer table. + var first indexEntry + if _, err := index.ReadAt(buffer, 0); err != nil { + return err + } + first.unmarshalBinary(buffer) + + encoded, err := encodeMetadata(newMetadata(first.filenum, uint64(first.offset), 0)) + if err != nil { + return err + } + // Close the origin index file. + if err := index.Close(); err != nil { + return err + } + return copyFrom(index.Name(), index.Name(), indexEntrySize, func(f *os.File) error { + _, err := f.Write(encoded) + return err + }) +} + +// upgradeTableIndex upgrades the legacy index file to the latest version. +// This function should be responsible for closing the origin index file +// and return the re-opened one. +func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableMeta, error) { + switch version { + case 0: + if err := upgradeV0TableIndex(index); err != nil { + return nil, nil, err + } + default: + return nil, nil, errors.New("unknown freezer table index") + } + // Reopen the upgraded index file and load the metadata from it + index, err := os.Open(index.Name()) + if err != nil { + return nil, nil, err + } + meta, err := loadMetadata(index) + if err != nil { + return nil, nil, err + } + return index, meta, nil +} + +// repairTableIndex repairs the given index file of freezer table and returns +// the stored metadata inside. If the index file is be rewritten, the function +// should be responsible for closing the origin one and return the new handler. +// If the table is empty, commit the empty metadata; +// If the table is legacy, upgrade it to the latest version; +func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) { + stat, err := index.Stat() + if err != nil { + return nil, nil, err + } + if stat.Size() == 0 { + meta := newMetadata(0, 0, 0) + if err := storeMetadata(index, meta); err != nil { + return nil, nil, err + } + // Shift file cursor to the end for next write operation + _, err = index.Seek(0, 2) + if err != nil { + return nil, nil, err + } + return index, meta, nil + } + meta, err := loadMetadata(index) + if err != nil { + if !errors.Is(err, errIncompatibleVersion) { + return nil, nil, err + } + index, meta, err = upgradeTableIndex(index, err.(*incompatibleError).version) + } + if err != nil { + return nil, nil, err + } + return index, meta, nil +} diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go new file mode 100644 index 0000000000000..3340450f984a9 --- /dev/null +++ b/core/rawdb/freezer_meta_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "errors" + "io/ioutil" + "os" + "testing" +) + +func TestStoreLoadFreezerTableMeta(t *testing.T) { + var cases = []struct { + version uint16 + deleted uint64 + hidden uint64 + expectErr error + }{ + { + freezerVersion, 100, 200, nil, + }, + { + 0, 100, 200, errIncompatibleVersion, // legacy version + }, + } + for _, c := range cases { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + err = storeMetadata(f, &freezerTableMeta{ + version: c.version, + deleted: c.deleted, + hidden: c.hidden, + }) + if err != nil { + t.Fatalf("Failed to store metadata %v", err) + } + meta, err := loadMetadata(f) + if !errors.Is(err, c.expectErr) { + t.Fatalf("Unexpected error %v", err) + } + if c.expectErr == nil { + if meta.version != c.version { + t.Fatalf("Unexpected version field") + } + if meta.deleted != c.deleted { + t.Fatalf("Unexpected deleted field") + } + if meta.hidden != c.hidden { + t.Fatalf("Unexpected hidden field") + } + } + } +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 7cfba70c5004a..d57fc4a1798b8 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -46,21 +46,20 @@ var ( errNotSupported = errors.New("this operation is not supported") ) -// indexEntry contains the number/id of the file that the data resides in, aswell as the -// offset within the file to the end of the data +// indexEntry contains the number/id of the file that the data resides in, as well as the +// offset within the file to the end of the data. // In serialized form, the filenum is stored as uint16. type indexEntry struct { - filenum uint32 // stored as uint16 ( 2 bytes) - offset uint32 // stored as uint32 ( 4 bytes) + filenum uint32 // stored as uint16 ( 2 bytes ) + offset uint32 // stored as uint32 ( 4 bytes ) } const indexEntrySize = 6 // unmarshalBinary deserializes binary b into the rawIndex entry. -func (i *indexEntry) unmarshalBinary(b []byte) error { +func (i *indexEntry) unmarshalBinary(b []byte) { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) - return nil } // append adds the encoded entry to the end of b. @@ -75,14 +74,14 @@ func (i *indexEntry) append(b []byte) []byte { // bounds returns the start- and end- offsets, and the file number of where to // read there data item marked by the two index entries. The two entries are // assumed to be sequential. -func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { - if start.filenum != end.filenum { +func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { + if i.filenum != end.filenum { // If a piece of data 'crosses' a data-file, // it's actually in one piece on the second data-file. // We return a zero-indexEntry for the second file as start return 0, end.offset, end.filenum } - return start.offset, end.offset, end.filenum + return i.offset, end.offset, end.filenum } // freezerTable represents a single chained data table within the freezer (e.g. blocks). @@ -92,7 +91,15 @@ type freezerTable struct { // WARNING: The `items` field is accessed atomically. On 32 bit platforms, only // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). - items uint64 // Number of items stored in the table (including items removed from tail) + items uint64 // Number of items stored in the table (including items removed from tail) + itemOffset uint64 // Number of items removed from the table + + // itemHidden is the number of items marked as deleted they are not removed + // from the table yet. Since the tail deletion is only supported at file level + // which means the actual deletion will be delayed until the total "marked as + // deleted" data reach the threshold. Before that these items will be hidden + // to prevent being visited again. + itemHidden uint64 noCompression bool // if true, disables snappy compression. Note: does not work retroactively readonly bool @@ -106,10 +113,6 @@ type freezerTable struct { tailId uint32 // number of the earliest file index *os.File // File descriptor for the indexEntry file of the table - // In the case that old items are deleted (from the tail), we use itemOffset - // to count how many historic items have gone missing. - itemOffset uint32 // Offset (number of discarded items) - headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -163,7 +166,7 @@ func truncateFreezerFile(file *os.File, size int64) error { } // newTable opens a freezer table, creating the data and index files if they are -// non existent. Both files are truncated to the shortest common length to ensure +// non-existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file @@ -172,28 +175,26 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr } var idxName string if noCompression { - // Raw idx - idxName = fmt.Sprintf("%s.ridx", name) + idxName = fmt.Sprintf("%s.ridx", name) // raw index file } else { - // Compressed idx - idxName = fmt.Sprintf("%s.cidx", name) + idxName = fmt.Sprintf("%s.cidx", name) // compressed index file } var ( - err error - offsets *os.File + err error + index *os.File ) if readonly { // Will fail if table doesn't exist - offsets, err = openFreezerFileForReadOnly(filepath.Join(path, idxName)) + index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName)) } else { - offsets, err = openFreezerFileForAppend(filepath.Join(path, idxName)) + index, err = openFreezerFileForAppend(filepath.Join(path, idxName)) } if err != nil { return nil, err } // Create the table and repair any past inconsistency tab := &freezerTable{ - index: offsets, + index: index, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -220,26 +221,29 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return tab, nil } -// repair cross checks the head and the index file and truncates them to +// repair cross-checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { - // Create a temporary offset buffer to init files with and read indexEntry into - buffer := make([]byte, indexEntrySize) + index, meta, err := repairTableIndex(t.index) + if err != nil { + return err + } + t.index = index // index file may be reopened, update it + t.tailId, t.itemOffset, t.itemHidden = meta.tailId, meta.deleted, meta.hidden - // If we've just created the files, initialize the index with the 0 indexEntry + // Ensure the index is a multiple of indexEntrySize bytes. The assumption + // is held that index file at least has metaLength bytes for storing meta- + // data. stat, err := t.index.Stat() if err != nil { return err } - if stat.Size() == 0 { - if _, err := t.index.Write(buffer); err != nil { + if overflow := (stat.Size() - metaLength) % indexEntrySize; overflow != 0 { + err := truncateFreezerFile(t.index, stat.Size()-overflow) + if err != nil { return err } } - // Ensure the index is a multiple of indexEntrySize bytes - if overflow := stat.Size() % indexEntrySize; overflow != 0 { - truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path - } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { return err @@ -248,29 +252,40 @@ func (t *freezerTable) repair() error { // Open the head file var ( - firstIndex indexEntry - lastIndex indexEntry contentSize int64 contentExp int64 + lastIndex *indexEntry ) - // Read index zero, determine what file is the earliest - // and what item offset to use - t.index.ReadAt(buffer, 0) - firstIndex.unmarshalBinary(buffer) - - t.tailId = firstIndex.filenum - t.itemOffset = firstIndex.offset - - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) - lastIndex.unmarshalBinary(buffer) - if t.readonly { - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) - } else { - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) - } + // Read last index, determine what file is the latest and + // what's the current head item + items, err := t.indexLen() if err != nil { return err } + if items == 0 { + if t.readonly { + t.head, err = t.openFile(t.tailId, openFreezerFileForReadOnly) + } else { + t.head, err = t.openFile(t.tailId, openFreezerFileForAppend) + } + if err != nil { + return err + } + lastIndex = &indexEntry{filenum: t.tailId, offset: 0} + } else { + lastIndex, err = t.getIndex(0, 1) + if err != nil { + return err + } + if t.readonly { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) + } else { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + } + if err != nil { + return err + } + } if stat, err = t.head.Stat(); err != nil { return err } @@ -278,7 +293,6 @@ func (t *freezerTable) repair() error { // Keep truncating both files until they come in sync contentExp = int64(lastIndex.offset) - for contentExp != contentSize { // Truncate the head file to the last offset pointer if contentExp < contentSize { @@ -294,15 +308,23 @@ func (t *freezerTable) repair() error { if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } - offsetsSize -= indexEntrySize - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) - var newLastIndex indexEntry - newLastIndex.unmarshalBinary(buffer) + // Load the previous index entry from the index file + offsetsSize, items = offsetsSize-indexEntrySize, items-1 + + var newLast *indexEntry + if items == 0 { + newLast = &indexEntry{filenum: t.tailId, offset: 0} + } else { + newLast, err = t.getIndex(0, 1) + if err != nil { + return err + } + } // We might have slipped back into an earlier head-file here - if newLastIndex.filenum != lastIndex.filenum { + if newLast.filenum != lastIndex.filenum { // Release earlier opened file t.releaseFile(lastIndex.filenum) - if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { + if t.head, err = t.openFile(newLast.filenum, openFreezerFileForAppend); err != nil { return err } if stat, err = t.head.Stat(); err != nil { @@ -312,7 +334,7 @@ func (t *freezerTable) repair() error { } contentSize = stat.Size() } - lastIndex = newLastIndex + lastIndex = newLast contentExp = int64(lastIndex.offset) } } @@ -327,10 +349,16 @@ func (t *freezerTable) repair() error { } } // Update the item and byte counters and return - t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file + t.items = t.itemOffset + uint64(items) t.headBytes = contentSize t.headId = lastIndex.filenum + // Delete the leftover files because of head deletion + t.releaseFilesAfter(t.headId, true) + + // Delete the leftover files because of tail deletion + t.releaseFilesBefore(t.tailId, true) + // Close opened files and preopen all files if err := t.preopen(); err != nil { return err @@ -346,6 +374,7 @@ func (t *freezerTable) repair() error { func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) + // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { @@ -361,8 +390,14 @@ func (t *freezerTable) preopen() (err error) { return err } -// truncate discards any recent data above the provided threshold number. -func (t *freezerTable) truncate(items uint64) error { +// tail returns the index of the first stored item in the freezer table. +// It can also be interpreted as the number of deleted items from the tail. +func (t *freezerTable) tail() uint64 { + return atomic.LoadUint64(&t.itemHidden) + atomic.LoadUint64(&t.itemOffset) +} + +// truncateHead discards any recent data above the provided threshold number. +func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() @@ -382,17 +417,33 @@ func (t *freezerTable) truncate(items uint64) error { log = t.logger.Warn // Only loud warn if we delete multiple items } log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { + + // Calculate the relative offset between the new head and tail, use + // it to access the corresponding index entry. If the requested target + // is even below the freezer tail, reject it. + var ( + itemOffset = atomic.LoadUint64(&t.itemOffset) + itemHidden = atomic.LoadUint64(&t.itemHidden) + tail = itemOffset + itemHidden + ) + if items < tail { + return errors.New("truncation below tail") + } + offset := items - itemOffset + + if err := truncateFreezerFile(t.index, int64(offset)*indexEntrySize+metaLength); err != nil { return err } // Calculate the new expected size of the data file and truncate it - buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { - return err + var expected *indexEntry + if offset == 0 { + expected = &indexEntry{filenum: t.tailId, offset: 0} + } else { + expected, err = t.getIndex(int64(offset-1), 0) + if err != nil { + return err + } } - var expected indexEntry - expected.unmarshalBinary(buffer) - // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -421,7 +472,94 @@ func (t *freezerTable) truncate(items uint64) error { return err } t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} +func (t *freezerTable) truncateIndexFile(originDeleted, deleted, hidden uint64, tailId uint32) error { + encoded, err := encodeMetadata(newMetadata(tailId, deleted, hidden)) + if err != nil { + return err + } + err = copyFrom(t.index.Name(), t.index.Name(), metaLength+indexEntrySize*(deleted-originDeleted), func(f *os.File) error { + _, err := f.Write(encoded) + return err + }) + if err != nil { + return err + } + if err := t.index.Close(); err != nil { + return err + } + offsets, err := openFreezerFileForAppend(t.index.Name()) + if err != nil { + return err + } + t.index = offsets + return nil +} + +// truncateHead discards any recent data before the provided threshold number. +func (t *freezerTable) truncateTail(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Ensure the given truncate target falls in the correct range + var ( + deleted = atomic.LoadUint64(&t.itemOffset) + hidden = atomic.LoadUint64(&t.itemHidden) + ) + if deleted+hidden >= items { + return nil + } + head := atomic.LoadUint64(&t.items) + if head < items { + return errors.New("truncation above head") + } + // Load the index of new tail item after the deletion. + newTail, err := t.getIndex(int64(items-deleted), 0) + if err != nil { + return err + } + // Freezer only supports deletion by file, just mark the entries as hidden + if t.tailId == newTail.filenum { + atomic.StoreUint64(&t.itemHidden, items-deleted) + return storeMetadata(t.index, newMetadata(t.tailId, deleted, items-deleted)) + } + if t.tailId > newTail.filenum { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTail.filenum) + } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Count how many items can be deleted from the file. + var newDeleted = items + for current := items - 1; current >= deleted; current -= 1 { + cur, err := t.getIndex(int64(current-deleted), 0) + if err != nil { + return err + } + if cur.filenum != newTail.filenum { + break + } + newDeleted = current + } + if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, newTail.filenum); err != nil { + return err + } + // Release any files before the current tail + t.tailId = newTail.filenum + atomic.StoreUint64(&t.itemOffset, newDeleted) + atomic.StoreUint64(&t.itemHidden, items-newDeleted) + t.releaseFilesBefore(t.tailId, true) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -490,6 +628,69 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { } } +// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum < num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + +// indexLen returns the total index entries stored in the index file. +// This number can also be counted as the data entries stored in the +// freezer table. +func (t *freezerTable) indexLen() (int64, error) { + stat, err := t.index.Stat() + if err != nil { + return 0, err + } + size := stat.Size() + if size < metaLength { + return 0, errors.New("invalid index file") + } + indexSize := size - metaLength + if indexSize%indexEntrySize != 0 { + return 0, errors.New("invalid index file") + } + return indexSize / indexEntrySize, nil +} + +// getIndex returns a single index from the index file, with the given offset +// interpreted according to whence: 0 means relative to the origin of the file +// and 1 means relative to the end. +func (t *freezerTable) getIndex(offset int64, whence int) (*indexEntry, error) { + count, err := t.indexLen() + if err != nil { + return nil, err + } + var ( + off int64 + index indexEntry + buffer = make([]byte, indexEntrySize) + ) + if whence == 0 { + if offset >= count { + return nil, errors.New("out of range") + } + off = metaLength + offset*indexEntrySize + } else { + if offset >= count { + return nil, errors.New("out of range") + } + off = metaLength + (count-1-offset)*indexEntrySize + } + if _, err := t.index.ReadAt(buffer, off); err != nil { + return nil, err + } + index.unmarshalBinary(buffer[:]) + return &index, nil +} + // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -497,32 +698,32 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // so that the items are within bounds. If this method is used to read out of bounds, // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { - // Apply the table-offset - from = from - uint64(t.itemOffset) - // For reading N items, we need N+1 indices. - buffer := make([]byte, (count+1)*indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { - return nil, err + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + var indices []*indexEntry + from = from - atomic.LoadUint64(&t.itemOffset) + if from == 0 { + indices = append(indices, &indexEntry{ + filenum: t.tailId, + offset: 0, + }) + count = count - 1 + from = from + 1 } + // For reading N items, we need N+1 indices. var ( - indices []*indexEntry - offset int + buffer = make([]byte, (count+1)*indexEntrySize) + offset = metaLength + int64(from-1)*indexEntrySize ) - for i := from; i <= from+count; i++ { + if _, err := t.index.ReadAt(buffer, offset); err != nil { + return nil, err + } + for i := 0; i <= int(count); i++ { index := new(indexEntry) - index.unmarshalBinary(buffer[offset:]) - offset += indexEntrySize + index.unmarshalBinary(buffer[i*indexEntrySize:]) indices = append(indices, index) } - if from == 0 { - // Special case if we're reading the first item in the freezer. We assume that - // the first item always start from zero(regarding the deletion, we - // only support deletion by files, so that the assumption is held). - // This means we can use the first item metadata to carry information about - // the 'global' offset, for the deletion-case - indices[0].offset = 0 - indices[0].filenum = indices[1].filenum - } return indices, nil } @@ -583,18 +784,23 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i t.lock.RLock() defer t.lock.RUnlock() - // Ensure the table and the item is accessible + // Ensure the table and the item are accessible if t.index == nil || t.head == nil { return nil, nil, errClosed } - itemCount := atomic.LoadUint64(&t.items) // max number + var ( + items = atomic.LoadUint64(&t.items) // the total items(head + 1) + deleted = atomic.LoadUint64(&t.itemOffset) // the number of deleted items + hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items + tail = deleted + hidden + ) // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { + if items <= start || tail > start || count == 0 { return nil, nil, errOutOfBounds } - if start+count > itemCount { - count = itemCount - start + if start+count > items { + count = items - start } var ( output = make([]byte, maxBytes) // Buffer to read data into @@ -670,10 +876,10 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i return output[:outputSize], sizes, nil } -// has returns an indicator whether the specified number data -// exists in the freezer table. +// has returns an indicator whether the specified number data is still accessible +// in the freezer table. func (t *freezerTable) has(number uint64) bool { - return atomic.LoadUint64(&t.items) > number + return atomic.LoadUint64(&t.items) > number && t.tail() <= number } // size returns the total data size in the freezer table. @@ -744,13 +950,20 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { + meta, err := loadMetadata(t.index) + if err != nil { + fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) + return + } + fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, meta.deleted, meta.hidden) + buf := make([]byte, indexEntrySize) fmt.Fprintf(w, "| number | fileno | offset |\n") fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { - if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { + if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)+metaLength); err != nil { break } var entry indexEntry diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 15464e1bd768f..eb6d422f9c8ca 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -19,6 +19,7 @@ package rawdb import ( "bytes" "fmt" + "io/ioutil" "math/rand" "os" "path/filepath" @@ -203,8 +204,8 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Fatalf("Failed to open index file: %v", err) } // Remove everything but the first item, and leave data unaligned - // 0-indexEntry, 1-indexEntry, corrupt-indexEntry - idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) + // metadata, 1-indexEntry, corrupt-indexEntry + idxFile.Truncate(metaLength + indexEntrySize + indexEntrySize/2) idxFile.Close() // Now open it again @@ -387,7 +388,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } defer f.Close() - f.truncate(10) // 150 bytes + f.truncateHead(10) // 150 bytes if f.items != 10 { t.Fatalf("expected %d items, got %d", 10, f.items) } @@ -504,7 +505,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Now, truncate back to zero - f.truncate(0) + f.truncateHead(0) // Write the data again batch := f.newBatch() @@ -559,26 +560,23 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 7*indexEntrySize) + indexBuf := make([]byte, 6*indexEntrySize+metaLength) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, offset = 4 ] at index zero - - tailId := uint32(2) // First file is 2 - itemOffset := uint32(4) // We have removed four items - zeroIndex := indexEntry{ - filenum: tailId, - offset: itemOffset, + // [ file = 2, deleted = 4, hidden = 0 ] as meta + blob, err := encodeMetadata(newMetadata(2, 4, 0)) + if err != nil { + t.Fatal(err) } - buf := zeroIndex.append(nil) - // Overwrite index zero - copy(indexBuf, buf) + copy(indexBuf, blob) + // Remove the four next indices by overwriting - copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) + copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*4:]) indexFile.WriteAt(indexBuf, 0) + // Need to truncate the moved index items - indexFile.Truncate(indexEntrySize * (1 + 2)) + indexFile.Truncate(indexEntrySize*2 + metaLength) indexFile.Close() } @@ -617,22 +615,22 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 3*indexEntrySize) + indexBuf := make([]byte, 2*indexEntrySize+metaLength) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, offset = 1M ] at index zero - - tailId := uint32(2) // First file is 2 - itemOffset := uint32(1000000) // We have removed 1M items - zeroIndex := indexEntry{ - offset: itemOffset, - filenum: tailId, + // [ file = 2, deleted = 1M, hidden = 0 ] as meta + blob, err := encodeMetadata(newMetadata(2, 1000000, 0)) + if err != nil { + t.Fatal(err) } - buf := zeroIndex.append(nil) - // Overwrite index zero - copy(indexBuf, buf) + copy(indexBuf, blob) + + // Remove the four 2 indices by overwriting + copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*2:]) indexFile.WriteAt(indexBuf, 0) + + indexFile.Truncate(indexEntrySize*2 + metaLength) indexFile.Close() } @@ -659,6 +657,214 @@ func TestFreezerOffset(t *testing.T) { } } +func TestTruncateTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + // nothing to do, all the items should still be there. + f.truncateTail(0) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate single element( item 0 ), deletion is only supported at file level + f.truncateTail(1) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the deletion information should be persisted as well + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate two elements( item 0, item 1 ), the file 0 should be deleted + f.truncateTail(2) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the above testing should still pass + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate all, the entire freezer should be deleted + f.truncateTail(6) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + 5: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 6: getChunk(20, 0x11), + }) +} + +func TestTruncateHeadBelowTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + f.truncateTail(4) // Tail = 4 + + // NewHead is required to be 3, the entire table should be truncated + f.truncateHead(4) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, // Deleted by tail + 1: errOutOfBounds, // Deleted by tail + 2: errOutOfBounds, // Deleted by tail + 3: errOutOfBounds, // Deleted by tail + 4: errOutOfBounds, // Deleted by Head + 5: errOutOfBounds, // Deleted by Head + 6: errOutOfBounds, // Deleted by Head + }) + + // Append new items + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + f.truncateTail(5) // Lazy deleted the item-4, it's hidden + f.truncateHead(5) // New head is reset to item-4 + checkRetrieveError(t, f, map[uint64]error{ + 4: errOutOfBounds, // Hidden item + }) +} + +func TestUpgradeLegacyFreezerTable(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + + index := &indexEntry{ + filenum: 100, + offset: 200, + } + encoded := index.append(nil) + f.Write(encoded) + + newf, meta, err := repairTableIndex(f) + if err != nil { + t.Fatal(err) + } + if newf.Name() != f.Name() { + t.Fatal("Unexpected file name") + } + if meta.tailId != 100 { + t.Fatal("Unexpected tail file") + } + if meta.deleted != 200 { + t.Fatal("Unexpected deleted items") + } + if meta.hidden != 0 { + t.Fatal("Unexpected hidden items") + } + if meta.version != freezerVersion { + t.Fatal("Unexpected freezer version") + } +} + func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { t.Helper() diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index d5c3749e5d218..74e3d660cb106 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -186,7 +186,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) { wg.Wait() } -// This test runs ModifyAncients and TruncateAncients concurrently with each other. +// This test runs ModifyAncients and TruncateHead concurrently with each other. func TestFreezerConcurrentModifyTruncate(t *testing.T) { f, dir := newFreezerForTesting(t, freezerTestTableDef) defer os.RemoveAll(dir) @@ -196,7 +196,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { for i := 0; i < 1000; i++ { // First reset and write 100 items. - if err := f.TruncateAncients(0); err != nil { + if err := f.TruncateHead(0); err != nil { t.Fatal("truncate failed:", err) } _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { @@ -231,7 +231,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { wg.Done() }() go func() { - truncateErr = f.TruncateAncients(10) + truncateErr = f.TruncateHead(10) wg.Done() }() go func() { diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go new file mode 100644 index 0000000000000..c7510d4da847c --- /dev/null +++ b/core/rawdb/freezer_utils.go @@ -0,0 +1,83 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'. +// The 'destPath' is created if it doesn't exist, otherwise it is overwritten. +// Before the copy is executed, there is a callback can be registered to +// manipulate the dest file. +// It is perfectly valid to have destPath == srcPath. +func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) error) error { + // Create a temp file in the same dir where we want it to wind up + f, err := ioutil.TempFile(filepath.Dir(destPath), "*") + if err != nil { + return err + } + fname := f.Name() + + // Clean up the leftover file + defer func() { + if f != nil { + f.Close() + } + os.Remove(fname) + }() + + // Apply the given function if it's not nil before we copy + // the content from the src. + if before != nil { + if err := before(f); err != nil { + return err + } + } + // Open the source file + src, err := os.Open(srcPath) + if err != nil { + return err + } + if _, err = src.Seek(int64(offset), 0); err != nil { + src.Close() + return err + } + // io.Copy uses 32K buffer internally. + _, err = io.Copy(f, src) + if err != nil { + src.Close() + return err + } + // Rename the temporary file to the specified dest name. + // src may be same as dest, so needs to be closed before + // we do the final move. + src.Close() + + if err := f.Close(); err != nil { + return err + } + f = nil + + if err := os.Rename(fname, destPath); err != nil { + return err + } + return nil +} diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go new file mode 100644 index 0000000000000..de8087f9b9361 --- /dev/null +++ b/core/rawdb/freezer_utils_test.go @@ -0,0 +1,76 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "io/ioutil" + "os" + "testing" +) + +func TestCopyFrom(t *testing.T) { + var ( + content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8} + prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf} + ) + var cases = []struct { + src, dest string + offset uint64 + writePrefix bool + }{ + {"foo", "bar", 0, false}, + {"foo", "bar", 1, false}, + {"foo", "bar", 8, false}, + {"foo", "foo", 0, false}, + {"foo", "foo", 1, false}, + {"foo", "foo", 8, false}, + {"foo", "bar", 0, true}, + {"foo", "bar", 1, true}, + {"foo", "bar", 8, true}, + } + for _, c := range cases { + ioutil.WriteFile(c.src, content, 0644) + + if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error { + if !c.writePrefix { + return nil + } + f.Write(prefix) + return nil + }); err != nil { + os.Remove(c.src) + t.Fatalf("Failed to copy %v", err) + } + + blob, err := ioutil.ReadFile(c.dest) + if err != nil { + os.Remove(c.src) + os.Remove(c.dest) + t.Fatalf("Failed to read %v", err) + } + want := content[c.offset:] + if c.writePrefix { + want = append(prefix, want...) + } + if !bytes.Equal(blob, want) { + t.Fatal("Unexpected value") + } + os.Remove(c.src) + os.Remove(c.dest) + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 91fc31b660d67..f52f6989d765a 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -74,6 +74,12 @@ func (t *table) Ancients() (uint64, error) { return t.db.Ancients() } +// Tail is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Tail() (uint64, error) { + return t.db.Tail() +} + // AncientSize is a noop passthrough that just forwards the request to the underlying // database. func (t *table) AncientSize(kind string) (uint64, error) { @@ -89,10 +95,16 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err err return t.db.ReadAncients(fn) } -// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// TruncateHead is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) TruncateHead(items uint64) error { + return t.db.TruncateHead(items) +} + +// TruncateTail is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) TruncateAncients(items uint64) error { - return t.db.TruncateAncients(items) +func (t *table) TruncateTail(items uint64) error { + return t.db.TruncateTail(items) } // Sync is a noop passthrough that just forwards the request to the underlying diff --git a/ethdb/database.go b/ethdb/database.go index 0a5729c6c1ecc..fafa712523262 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -86,6 +86,10 @@ type AncientReader interface { // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) + // Tail returns the number of first stored item in the freezer. + // This number can also be interpreted as the total deleted item numbers. + Tail() (uint64, error) + // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) } @@ -106,8 +110,16 @@ type AncientWriter interface { // The integer return value is the total size of the written data. ModifyAncients(func(AncientWriteOp) error) (int64, error) - // TruncateAncients discards all but the first n ancient data from the ancient store. - TruncateAncients(n uint64) error + // TruncateHead discards all but the first n ancient data from the ancient store. + // After the truncation, the latest item can be accessed it item_n-1(start from 0). + TruncateHead(n uint64) error + + // TruncateTail discards the first n ancient data from the ancient store. The already + // deleted items are ignored. After the truncation, the earliest item can be accessed + // is item_n(start from 0). The deleted items may not be removed from the ancient store + // immediately, but only when the accumulated deleted data reach the threshold then + // will be removed all together. + TruncateTail(n uint64) error // Sync flushes all in-memory ancient store data to disk. Sync() error From 539c9dfcd0858a0045ef8345f8ca82aad7ef8fad Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 11 Jan 2022 10:51:22 +0800 Subject: [PATCH 02/10] core/rawdb: address comments from martin and sina --- core/rawdb/accessors_chain.go | 2 +- core/rawdb/freezer.go | 16 +++------------- core/rawdb/freezer_meta.go | 25 +++++++------------------ core/rawdb/freezer_table.go | 8 ++++---- core/rawdb/freezer_table_test.go | 18 +++++++++--------- 5 files changed, 24 insertions(+), 45 deletions(-) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 7e4a06df924b9..f9c224dfa8f86 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -83,7 +83,7 @@ type NumberHash struct { Hash common.Hash } -// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain +// ReadAllHashesInRange retrieves all the hashes assigned to blocks at certain // heights, both canonical and reorged forks included. // This method considers both limits to be _inclusive_. func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 3f66f983e5fad..cc81701462d3c 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -59,7 +59,7 @@ const ( freezerRecheckInterval = time.Minute // freezerBatchLimit is the maximum number of blocks to freeze in one batch - // before doing a fsync and deleting it from the key-value store. + // before doing an fsync and deleting it from the key-value store. freezerBatchLimit = 30000 // freezerTableSize defines the maximum size of freezer data files. @@ -298,17 +298,12 @@ func (f *freezer) TruncateHead(items uint64) error { if atomic.LoadUint64(&f.frozen) <= items { return nil } - var frozen uint64 for _, table := range f.tables { if err := table.truncateHead(items); err != nil { return err } - // Tables should be aligned, only check the first table. - if frozen == 0 { - frozen = atomic.LoadUint64(&table.items) - } } - atomic.StoreUint64(&f.frozen, frozen) + atomic.StoreUint64(&f.frozen, items) return nil } @@ -323,17 +318,12 @@ func (f *freezer) TruncateTail(tail uint64) error { if atomic.LoadUint64(&f.tail) >= tail { return nil } - var truncated uint64 for _, table := range f.tables { if err := table.truncateTail(tail); err != nil { return err } - if truncated == 0 { - // Tables should be aligned, only check the first table. - truncated = table.tail() - } } - atomic.StoreUint64(&f.tail, truncated) + atomic.StoreUint64(&f.tail, tail) return nil } diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 4df449d53587d..a77e563f9a39a 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -151,27 +151,16 @@ func loadMetadata(index *os.File) (*freezerTableMeta, error) { // upgradeV0TableIndex extracts the indexes from version-0 index file and // encodes/stores them into the latest version index file. func upgradeV0TableIndex(index *os.File) error { - // Create a temporary offset buffer to read indexEntry info - buffer := make([]byte, indexEntrySize) - - // Read index zero, determine what file is the earliest - // and how many entries are deleted from the freezer table. - var first indexEntry - if _, err := index.ReadAt(buffer, 0); err != nil { - return err - } - first.unmarshalBinary(buffer) - - encoded, err := encodeMetadata(newMetadata(first.filenum, uint64(first.offset), 0)) - if err != nil { - return err - } // Close the origin index file. if err := index.Close(); err != nil { return err } return copyFrom(index.Name(), index.Name(), indexEntrySize, func(f *os.File) error { - _, err := f.Write(encoded) + encoded, err := encodeMetadata(newMetadata(0, 0, 0)) + if err != nil { + return err + } + _, err = f.Write(encoded) return err }) } @@ -201,8 +190,8 @@ func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableM } // repairTableIndex repairs the given index file of freezer table and returns -// the stored metadata inside. If the index file is be rewritten, the function -// should be responsible for closing the origin one and return the new handler. +// the stored metadata inside. If the index file is to be rewritten, the function +// should be responsible for closing the origin one and returning the new handler. // If the table is empty, commit the empty metadata; // If the table is legacy, upgrade it to the latest version; func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) { diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d57fc4a1798b8..aaf81a0c757e9 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -94,10 +94,10 @@ type freezerTable struct { items uint64 // Number of items stored in the table (including items removed from tail) itemOffset uint64 // Number of items removed from the table - // itemHidden is the number of items marked as deleted they are not removed - // from the table yet. Since the tail deletion is only supported at file level - // which means the actual deletion will be delayed until the total "marked as - // deleted" data reach the threshold. Before that these items will be hidden + // itemHidden is the number of items marked as deleted which are not removed + // from the table yet. Tail deletion is only supported at file level which + // means the actual deletion will be delayed until the total "marked as + // deleted" data reaches the threshold. Before that these items will be hidden // to prevent being visited again. itemHidden uint64 diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index eb6d422f9c8ca..dbbbabac19405 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -663,7 +663,7 @@ func TestTruncateTail(t *testing.T) { fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) // Fill table - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil { t.Fatal(err) } @@ -709,7 +709,7 @@ func TestTruncateTail(t *testing.T) { // Reopen the table, the deletion information should be persisted as well f.Close() - f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil { t.Fatal(err) } @@ -741,7 +741,7 @@ func TestTruncateTail(t *testing.T) { // Reopen the table, the above testing should still pass f.Close() - f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil { t.Fatal(err) } @@ -780,7 +780,7 @@ func TestTruncateHeadBelowTail(t *testing.T) { fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) // Fill table - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil { t.Fatal(err) } @@ -838,8 +838,8 @@ func TestUpgradeLegacyFreezerTable(t *testing.T) { defer os.Remove(f.Name()) index := &indexEntry{ - filenum: 100, - offset: 200, + filenum: 0, + offset: 0, } encoded := index.append(nil) f.Write(encoded) @@ -851,10 +851,10 @@ func TestUpgradeLegacyFreezerTable(t *testing.T) { if newf.Name() != f.Name() { t.Fatal("Unexpected file name") } - if meta.tailId != 100 { - t.Fatal("Unexpected tail file") + if meta.tailId != 0 { + t.Fatal("Unexpected tail file", meta.tailId) } - if meta.deleted != 200 { + if meta.deleted != 0 { t.Fatal("Unexpected deleted items") } if meta.hidden != 0 { From 964ae61cba914c721bd963a0cf97c0b699266d3d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 15 Feb 2022 14:46:04 +0800 Subject: [PATCH 03/10] core/rawdb: fixes cornercase in tail deletion --- core/rawdb/freezer_meta.go | 5 ++- core/rawdb/freezer_table.go | 66 +++++++++++++++++++------------- core/rawdb/freezer_table_test.go | 14 ++----- 3 files changed, 45 insertions(+), 40 deletions(-) diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index a77e563f9a39a..cf0aadb5b66dd 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "io" "os" "github.com/ethereum/go-ethereum/rlp" @@ -175,7 +176,7 @@ func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableM return nil, nil, err } default: - return nil, nil, errors.New("unknown freezer table index") + return nil, nil, errors.New("unknown freezer table version") } // Reopen the upgraded index file and load the metadata from it index, err := os.Open(index.Name()) @@ -205,7 +206,7 @@ func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) { return nil, nil, err } // Shift file cursor to the end for next write operation - _, err = index.Seek(0, 2) + _, err = index.Seek(0, io.SeekEnd) if err != nil { return nil, nil, err } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index aaf81a0c757e9..8c0671a29f6cc 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -406,18 +406,6 @@ func (t *freezerTable) truncateHead(items uint64) error { if existing <= items { return nil } - // We need to truncate, save the old size for metrics tracking - oldSize, err := t.sizeNolock() - if err != nil { - return err - } - // Something's out of sync, truncate the table's offset index - log := t.logger.Debug - if existing > items+1 { - log = t.logger.Warn // Only loud warn if we delete multiple items - } - log("Truncating freezer table", "items", existing, "limit", items) - // Calculate the relative offset between the new head and tail, use // it to access the corresponding index entry. If the requested target // is even below the freezer tail, reject it. @@ -429,17 +417,28 @@ func (t *freezerTable) truncateHead(items uint64) error { if items < tail { return errors.New("truncation below tail") } - offset := items - itemOffset + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Something's out of sync, truncate the table's offset index + log := t.logger.Debug + if existing > items+1 { + log = t.logger.Warn // Only loud warn if we delete multiple items + } + log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(offset)*indexEntrySize+metaLength); err != nil { + length := items - itemOffset + if err := truncateFreezerFile(t.index, int64(length)*indexEntrySize+metaLength); err != nil { return err } // Calculate the new expected size of the data file and truncate it var expected *indexEntry - if offset == 0 { + if length == 0 { expected = &indexEntry{filenum: t.tailId, offset: 0} } else { - expected, err = t.getIndex(int64(offset-1), 0) + expected, err = t.getIndex(int64(length-1), 0) if err != nil { return err } @@ -498,7 +497,7 @@ func (t *freezerTable) truncateIndexFile(originDeleted, deleted, hidden uint64, return nil } -// truncateHead discards any recent data before the provided threshold number. +// truncateTail discards any recent data before the provided threshold number. func (t *freezerTable) truncateTail(items uint64) error { t.lock.Lock() defer t.lock.Unlock() @@ -515,18 +514,31 @@ func (t *freezerTable) truncateTail(items uint64) error { if head < items { return errors.New("truncation above head") } - // Load the index of new tail item after the deletion. - newTail, err := t.getIndex(int64(items-deleted), 0) + // Load the file number of new tail item after the deletion. + count, err := t.indexLen() if err != nil { return err } + var ( + tailId uint32 + delLen = items - deleted + ) + if uint64(count) == delLen { + tailId = t.headId + } else { + newTail, err := t.getIndex(int64(delLen), 0) + if err != nil { + return err + } + tailId = newTail.filenum + } // Freezer only supports deletion by file, just mark the entries as hidden - if t.tailId == newTail.filenum { - atomic.StoreUint64(&t.itemHidden, items-deleted) - return storeMetadata(t.index, newMetadata(t.tailId, deleted, items-deleted)) + if t.tailId == tailId { + atomic.StoreUint64(&t.itemHidden, delLen) + return storeMetadata(t.index, newMetadata(t.tailId, deleted, delLen)) } - if t.tailId > newTail.filenum { - return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTail.filenum) + if t.tailId > tailId { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, tailId) } // We need to truncate, save the old size for metrics tracking oldSize, err := t.sizeNolock() @@ -540,16 +552,16 @@ func (t *freezerTable) truncateTail(items uint64) error { if err != nil { return err } - if cur.filenum != newTail.filenum { + if cur.filenum != tailId { break } newDeleted = current } - if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, newTail.filenum); err != nil { + if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, tailId); err != nil { return err } // Release any files before the current tail - t.tailId = newTail.filenum + t.tailId = tailId atomic.StoreUint64(&t.itemOffset, newDeleted) atomic.StoreUint64(&t.itemHidden, items-newDeleted) t.releaseFilesBefore(t.tailId, true) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index dbbbabac19405..c2101f45a6952 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -760,7 +760,7 @@ func TestTruncateTail(t *testing.T) { }) // truncate all, the entire freezer should be deleted - f.truncateTail(6) + f.truncateTail(7) checkRetrieveError(t, f, map[uint64]error{ 0: errOutOfBounds, 1: errOutOfBounds, @@ -768,13 +768,11 @@ func TestTruncateTail(t *testing.T) { 3: errOutOfBounds, 4: errOutOfBounds, 5: errOutOfBounds, - }) - checkRetrieve(t, f, map[uint64][]byte{ - 6: getChunk(20, 0x11), + 6: errOutOfBounds, }) } -func TestTruncateHeadBelowTail(t *testing.T) { +func TestTruncateHead(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) @@ -822,12 +820,6 @@ func TestTruncateHeadBelowTail(t *testing.T) { 5: getChunk(20, 0xaa), 6: getChunk(20, 0x11), }) - - f.truncateTail(5) // Lazy deleted the item-4, it's hidden - f.truncateHead(5) // New head is reset to item-4 - checkRetrieveError(t, f, map[uint64]error{ - 4: errOutOfBounds, // Hidden item - }) } func TestUpgradeLegacyFreezerTable(t *testing.T) { From a44239fa5110b82dc7deb7883462fc54a66ff516 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 17 Feb 2022 21:55:13 +0800 Subject: [PATCH 04/10] core/rawdb: separate metadata into a standalone file --- core/rawdb/freezer_meta.go | 239 ++++++----------- core/rawdb/freezer_meta_test.go | 76 +++--- core/rawdb/freezer_table.go | 431 +++++++++++++------------------ core/rawdb/freezer_table_test.go | 79 ++---- core/rawdb/freezer_utils.go | 39 ++- 5 files changed, 350 insertions(+), 514 deletions(-) diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index cf0aadb5b66dd..060bcfd5193c6 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -17,210 +17,119 @@ package rawdb import ( - "bytes" + "encoding/binary" "errors" - "fmt" "io" "os" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -const ( - freezerVersion = 1 // The version tag of freezer table structure - metaLength = 1024 // The number of bytes allocated for the freezer table metadata -) - -var errIncompatibleVersion = errors.New("incompatible version") - -type incompatibleError struct { - version uint16 - expect uint16 - err error -} - -func newIncompatibleError(version uint16) *incompatibleError { - return &incompatibleError{ - version: version, - expect: freezerVersion, - err: errIncompatibleVersion, - } -} - -// Unwrap returns the internal evm error which allows us for further -// analysis outside. -func (err *incompatibleError) Unwrap() error { - return err.err -} - -func (err *incompatibleError) Error() string { - return fmt.Sprintf("%v, get %d, expect %d", err.err, err.version, err.expect) -} +const freezerVersion = 1 // The initial version tag of freezer table metadata // freezerTableMeta wraps all the metadata of the freezer table. type freezerTableMeta struct { - version uint16 // Freezer table version descriptor - tailId uint32 // The number of the earliest file - deleted uint64 // The number of items that have been removed from the table - hidden uint64 // The number of items that have been hidden in the table -} + // version is the versioning descriptor of the freezer table. + version uint16 -// newMetadata initializes the metadata object with the given parameters. -func newMetadata(tailId uint32, deleted uint64, hidden uint64) *freezerTableMeta { - return &freezerTableMeta{ - version: freezerVersion, - tailId: tailId, - deleted: deleted, - hidden: hidden, - } + // VirtualTail indicates how many items have been marked as deleted. + // Its value is equal to the number of items removed from the table + // plus the number of items hidden in the table, so it should never + // be lower than the "actual tail". + VirtualTail uint64 } -// encodeMetadata encodes the given parameters as the freezer table metadata. -func encodeMetadata(meta *freezerTableMeta) ([]byte, error) { - buffer := new(bytes.Buffer) - if err := rlp.Encode(buffer, meta.version); err != nil { - return nil, err - } - if err := rlp.Encode(buffer, meta.tailId); err != nil { - return nil, err - } - if err := rlp.Encode(buffer, meta.deleted); err != nil { - return nil, err - } - if err := rlp.Encode(buffer, meta.hidden); err != nil { - return nil, err +// newMetadata initializes the metadata object with the given virtual tail. +func newMetadata(tail uint64) *freezerTableMeta { + return &freezerTableMeta{ + version: freezerVersion, + VirtualTail: tail, } - buffer.Write(make([]byte, metaLength-buffer.Len())) // Right pad zero bytes to the specified length - return buffer.Bytes(), nil } -// decodeMetadata decodes the freezer-table metadata from the given -// rlp stream. -func decodeMetadata(r *rlp.Stream) (*freezerTableMeta, error) { - var version uint16 - if err := r.Decode(&version); err != nil { - return nil, err - } - if version != freezerVersion { - return nil, newIncompatibleError(version) - } - var tailId uint32 - if err := r.Decode(&tailId); err != nil { +// readMetadata reads the metadata of the freezer table from the +// given metadata file. +func readMetadata(file *os.File) (*freezerTableMeta, error) { + _, err := file.Seek(0, io.SeekStart) + if err != nil { return nil, err } - var deleted, hidden uint64 - if err := r.Decode(&deleted); err != nil { + // load the first 2 bytes, resolve the version tag + var buf [2]byte + _, err = file.Read(buf[:2]) + if err != nil { return nil, err } - if err := r.Decode(&hidden); err != nil { - return nil, err + version := binary.BigEndian.Uint16(buf[:]) + switch version { + case freezerVersion: + var meta freezerTableMeta + if err := rlp.Decode(file, &meta); err != nil { + return nil, err + } + meta.version = freezerVersion + return &meta, nil + default: + return nil, errors.New("undefined version") } - return newMetadata(tailId, deleted, hidden), nil } -// storeMetadata stores the metadata of the freezer table into the -// given index file. -func storeMetadata(index *os.File, meta *freezerTableMeta) error { - encoded, err := encodeMetadata(meta) +// writeMetadata writes the metadata of the freezer table into the +// given metadata file. +func writeMetadata(file *os.File, meta *freezerTableMeta) error { + _, err := file.Seek(0, io.SeekStart) if err != nil { return err } - if _, err := index.WriteAt(encoded, 0); err != nil { - return err - } - return nil -} - -// loadMetadata loads the metadata of the freezer table from the -// given index file. Return the error if the version of loaded -// metadata is not expected. -func loadMetadata(index *os.File) (*freezerTableMeta, error) { - stat, err := index.Stat() + var buf [2]byte + binary.BigEndian.PutUint16(buf[:], meta.version) + _, err = file.Write(buf[:]) if err != nil { - return nil, err - } - if stat.Size() < metaLength { - return nil, newIncompatibleError(0) - } - buffer := make([]byte, metaLength) - if _, err := index.ReadAt(buffer, 0); err != nil { - return nil, err - } - return decodeMetadata(rlp.NewStream(bytes.NewReader(buffer), 0)) -} - -// upgradeV0TableIndex extracts the indexes from version-0 index file and -// encodes/stores them into the latest version index file. -func upgradeV0TableIndex(index *os.File) error { - // Close the origin index file. - if err := index.Close(); err != nil { - return err - } - return copyFrom(index.Name(), index.Name(), indexEntrySize, func(f *os.File) error { - encoded, err := encodeMetadata(newMetadata(0, 0, 0)) - if err != nil { - return err - } - _, err = f.Write(encoded) return err - }) -} - -// upgradeTableIndex upgrades the legacy index file to the latest version. -// This function should be responsible for closing the origin index file -// and return the re-opened one. -func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableMeta, error) { - switch version { - case 0: - if err := upgradeV0TableIndex(index); err != nil { - return nil, nil, err - } - default: - return nil, nil, errors.New("unknown freezer table version") - } - // Reopen the upgraded index file and load the metadata from it - index, err := os.Open(index.Name()) - if err != nil { - return nil, nil, err } - meta, err := loadMetadata(index) + encoded, err := rlp.EncodeToBytes(meta) if err != nil { - return nil, nil, err + return err } - return index, meta, nil + _, err = file.Write(encoded) + return err } -// repairTableIndex repairs the given index file of freezer table and returns -// the stored metadata inside. If the index file is to be rewritten, the function -// should be responsible for closing the origin one and returning the new handler. -// If the table is empty, commit the empty metadata; -// If the table is legacy, upgrade it to the latest version; -func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) { - stat, err := index.Stat() +// loadMetadata loads the metadata from the given metadata file. +// Initializes the metadata file with the given "actual tail" if +// it's empty. +func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) { + stat, err := file.Stat() if err != nil { - return nil, nil, err + return nil, err } + // Write the metadata with the given actual tail into metadata file + // if it's non-existent. There are two possible scenarios here: + // - the freezer table is empty + // - the freezer table is legacy + // In both cases, write the meta into the file with the actual tail + // as the virtual tail. if stat.Size() == 0 { - meta := newMetadata(0, 0, 0) - if err := storeMetadata(index, meta); err != nil { - return nil, nil, err + m := newMetadata(tail) + if err := writeMetadata(file, m); err != nil { + return nil, err } - // Shift file cursor to the end for next write operation - _, err = index.Seek(0, io.SeekEnd) - if err != nil { - return nil, nil, err - } - return index, meta, nil + return m, nil } - meta, err := loadMetadata(index) + m, err := readMetadata(file) if err != nil { - if !errors.Is(err, errIncompatibleVersion) { - return nil, nil, err - } - index, meta, err = upgradeTableIndex(index, err.(*incompatibleError).version) + return nil, err } - if err != nil { - return nil, nil, err + // Update the virtual tail with the given actual tail if it's even + // lower than it. Theoretically it shouldn't happen at all, print + // a warning here. + if m.VirtualTail < tail { + log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail) + m.VirtualTail = tail + if err := writeMetadata(file, m); err != nil { + return nil, err + } } - return index, meta, nil + return m, nil } diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index 3340450f984a9..c2c01e46e1fe3 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -17,53 +17,45 @@ package rawdb import ( - "errors" "io/ioutil" "os" "testing" ) -func TestStoreLoadFreezerTableMeta(t *testing.T) { - var cases = []struct { - version uint16 - deleted uint64 - hidden uint64 - expectErr error - }{ - { - freezerVersion, 100, 200, nil, - }, - { - 0, 100, 200, errIncompatibleVersion, // legacy version - }, +func TestReadWriteFreezerTableMeta(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) } - for _, c := range cases { - f, err := ioutil.TempFile(os.TempDir(), "*") - if err != nil { - t.Fatalf("Failed to create file %v", err) - } - err = storeMetadata(f, &freezerTableMeta{ - version: c.version, - deleted: c.deleted, - hidden: c.hidden, - }) - if err != nil { - t.Fatalf("Failed to store metadata %v", err) - } - meta, err := loadMetadata(f) - if !errors.Is(err, c.expectErr) { - t.Fatalf("Unexpected error %v", err) - } - if c.expectErr == nil { - if meta.version != c.version { - t.Fatalf("Unexpected version field") - } - if meta.deleted != c.deleted { - t.Fatalf("Unexpected deleted field") - } - if meta.hidden != c.hidden { - t.Fatalf("Unexpected hidden field") - } - } + err = writeMetadata(f, newMetadata(100)) + if err != nil { + t.Fatalf("Failed to write metadata %v", err) + } + meta, err := readMetadata(f) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(100) { + t.Fatalf("Unexpected virtual tail field") + } +} + +func TestInitializeFreezerTableMeta(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + meta, err := loadMetadata(f, uint64(0)) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(0) { + t.Fatalf("Unexpected virtual tail field") } } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 8c0671a29f6cc..466cf3622e68c 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -94,11 +94,11 @@ type freezerTable struct { items uint64 // Number of items stored in the table (including items removed from tail) itemOffset uint64 // Number of items removed from the table - // itemHidden is the number of items marked as deleted which are not removed - // from the table yet. Tail deletion is only supported at file level which - // means the actual deletion will be delayed until the total "marked as - // deleted" data reaches the threshold. Before that these items will be hidden - // to prevent being visited again. + // itemHidden is the number of items marked as deleted. Tail deletion is + // only supported at file level which means the actual deletion will be + // delayed until the entire data file is marked as deleted. Before that + // these items will be hidden to prevent being visited again. The value + // should never be lower than itemOffset. itemHidden uint64 noCompression bool // if true, disables snappy compression. Note: does not work retroactively @@ -108,10 +108,11 @@ type freezerTable struct { path string head *os.File // File descriptor for the data head of the table + index *os.File // File descriptor for the indexEntry file of the table + meta *os.File // File descriptor for metadata of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file tailId uint32 // number of the earliest file - index *os.File // File descriptor for the indexEntry file of the table headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read @@ -127,44 +128,6 @@ func NewFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerT return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly) } -// openFreezerFileForAppend opens a freezer table file and seeks to the end -func openFreezerFileForAppend(filename string) (*os.File, error) { - // Open the file without the O_APPEND flag - // because it has differing behaviour during Truncate operations - // on different OS's - file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - // Seek to end for append - if _, err = file.Seek(0, io.SeekEnd); err != nil { - return nil, err - } - return file, nil -} - -// openFreezerFileForReadOnly opens a freezer table file for read only access -func openFreezerFileForReadOnly(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDONLY, 0644) -} - -// openFreezerFileTruncated opens a freezer table making sure it is truncated -func openFreezerFileTruncated(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) -} - -// truncateFreezerFile resizes a freezer table file and seeks to the end -func truncateFreezerFile(file *os.File, size int64) error { - if err := file.Truncate(size); err != nil { - return err - } - // Seek to end for append - if _, err := file.Seek(0, io.SeekEnd); err != nil { - return err - } - return nil -} - // newTable opens a freezer table, creating the data and index files if they are // non-existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. @@ -182,19 +145,33 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr var ( err error index *os.File + meta *os.File ) if readonly { // Will fail if table doesn't exist index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName)) + if err != nil { + return nil, err + } + // Will fail if the table is legacy(no metadata) + meta, err = openFreezerFileForReadOnly(filepath.Join(path, fmt.Sprintf("%s.meta", name))) + if err != nil { + return nil, err + } } else { index, err = openFreezerFileForAppend(filepath.Join(path, idxName)) - } - if err != nil { - return nil, err + if err != nil { + return nil, err + } + meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name))) + if err != nil { + return nil, err + } } // Create the table and repair any past inconsistency tab := &freezerTable{ index: index, + meta: meta, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -224,26 +201,23 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr // repair cross-checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { - index, meta, err := repairTableIndex(t.index) - if err != nil { - return err - } - t.index = index // index file may be reopened, update it - t.tailId, t.itemOffset, t.itemHidden = meta.tailId, meta.deleted, meta.hidden + // Create a temporary offset buffer to init files with and read indexEntry into + buffer := make([]byte, indexEntrySize) - // Ensure the index is a multiple of indexEntrySize bytes. The assumption - // is held that index file at least has metaLength bytes for storing meta- - // data. + // If we've just created the files, initialize the index with the 0 indexEntry stat, err := t.index.Stat() if err != nil { return err } - if overflow := (stat.Size() - metaLength) % indexEntrySize; overflow != 0 { - err := truncateFreezerFile(t.index, stat.Size()-overflow) - if err != nil { + if stat.Size() == 0 { + if _, err := t.index.Write(buffer); err != nil { return err } } + // Ensure the index is a multiple of indexEntrySize bytes + if overflow := stat.Size() % indexEntrySize; overflow != 0 { + truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path + } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { return err @@ -252,39 +226,44 @@ func (t *freezerTable) repair() error { // Open the head file var ( + firstIndex indexEntry + lastIndex indexEntry contentSize int64 contentExp int64 - lastIndex *indexEntry ) - // Read last index, determine what file is the latest and - // what's the current head item - items, err := t.indexLen() + // Read index zero, determine what file is the earliest + // and what item offset to use + t.index.ReadAt(buffer, 0) + firstIndex.unmarshalBinary(buffer) + + // Assign the tail fields with the first stored index. + // The total removed items is represented with an uint32, + // which is not enough in theory but enough in practice. + // TODO: use uint64 to represent total removed items. + t.tailId = firstIndex.filenum + t.itemOffset = uint64(firstIndex.offset) + + // Load metadata from the file + meta, err := loadMetadata(t.meta, t.itemOffset) if err != nil { return err } - if items == 0 { - if t.readonly { - t.head, err = t.openFile(t.tailId, openFreezerFileForReadOnly) - } else { - t.head, err = t.openFile(t.tailId, openFreezerFileForAppend) - } - if err != nil { - return err - } - lastIndex = &indexEntry{filenum: t.tailId, offset: 0} + t.itemHidden = meta.VirtualTail + + // Read the last index, use the default value in case the freezer is empty + if offsetsSize == indexEntrySize { + lastIndex = indexEntry{filenum: t.tailId, offset: 0} } else { - lastIndex, err = t.getIndex(0, 1) - if err != nil { - return err - } - if t.readonly { - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) - } else { - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) - } - if err != nil { - return err - } + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + lastIndex.unmarshalBinary(buffer) + } + if t.readonly { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) + } else { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + } + if err != nil { + return err } if stat, err = t.head.Stat(); err != nil { return err @@ -308,23 +287,22 @@ func (t *freezerTable) repair() error { if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } - // Load the previous index entry from the index file - offsetsSize, items = offsetsSize-indexEntrySize, items-1 + offsetsSize -= indexEntrySize - var newLast *indexEntry - if items == 0 { - newLast = &indexEntry{filenum: t.tailId, offset: 0} + // Read the new head index, use the default value in case + // the freezer is already empty. + var newLastIndex indexEntry + if offsetsSize == indexEntrySize { + newLastIndex = indexEntry{filenum: t.tailId, offset: 0} } else { - newLast, err = t.getIndex(0, 1) - if err != nil { - return err - } + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + newLastIndex.unmarshalBinary(buffer) } // We might have slipped back into an earlier head-file here - if newLast.filenum != lastIndex.filenum { + if newLastIndex.filenum != lastIndex.filenum { // Release earlier opened file t.releaseFile(lastIndex.filenum) - if t.head, err = t.openFile(newLast.filenum, openFreezerFileForAppend); err != nil { + if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { return err } if stat, err = t.head.Stat(); err != nil { @@ -334,7 +312,7 @@ func (t *freezerTable) repair() error { } contentSize = stat.Size() } - lastIndex = newLast + lastIndex = newLastIndex contentExp = int64(lastIndex.offset) } } @@ -347,9 +325,12 @@ func (t *freezerTable) repair() error { if err := t.head.Sync(); err != nil { return err } + if err := t.meta.Sync(); err != nil { + return err + } } // Update the item and byte counters and return - t.items = t.itemOffset + uint64(items) + t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file t.headBytes = contentSize t.headId = lastIndex.filenum @@ -391,9 +372,10 @@ func (t *freezerTable) preopen() (err error) { } // tail returns the index of the first stored item in the freezer table. -// It can also be interpreted as the number of deleted items from the tail. +// It can also be interpreted as the number of items marked as deleted +// from the tail. func (t *freezerTable) tail() uint64 { - return atomic.LoadUint64(&t.itemHidden) + atomic.LoadUint64(&t.itemOffset) + return atomic.LoadUint64(&t.itemHidden) } // truncateHead discards any recent data above the provided threshold number. @@ -401,20 +383,12 @@ func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() - // If our item count is correct, don't do anything + // Ensure the given truncate target falls in the correct range existing := atomic.LoadUint64(&t.items) if existing <= items { return nil } - // Calculate the relative offset between the new head and tail, use - // it to access the corresponding index entry. If the requested target - // is even below the freezer tail, reject it. - var ( - itemOffset = atomic.LoadUint64(&t.itemOffset) - itemHidden = atomic.LoadUint64(&t.itemHidden) - tail = itemOffset + itemHidden - ) - if items < tail { + if items < atomic.LoadUint64(&t.itemHidden) { return errors.New("truncation below tail") } // We need to truncate, save the old size for metrics tracking @@ -429,19 +403,22 @@ func (t *freezerTable) truncateHead(items uint64) error { } log("Truncating freezer table", "items", existing, "limit", items) - length := items - itemOffset - if err := truncateFreezerFile(t.index, int64(length)*indexEntrySize+metaLength); err != nil { + // Truncate the index file first, the tail position is also considered + // when calculating the new freezer table length. + length := items - atomic.LoadUint64(&t.itemOffset) + if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { return err } // Calculate the new expected size of the data file and truncate it - var expected *indexEntry + var expected indexEntry if length == 0 { - expected = &indexEntry{filenum: t.tailId, offset: 0} + expected = indexEntry{filenum: t.tailId, offset: 0} } else { - expected, err = t.getIndex(int64(length-1), 0) - if err != nil { + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil { return err } + expected.unmarshalBinary(buffer) } // We might need to truncate back to older files if expected.filenum != t.headId { @@ -474,96 +451,102 @@ func (t *freezerTable) truncateHead(items uint64) error { return nil } -func (t *freezerTable) truncateIndexFile(originDeleted, deleted, hidden uint64, tailId uint32) error { - encoded, err := encodeMetadata(newMetadata(tailId, deleted, hidden)) - if err != nil { - return err - } - err = copyFrom(t.index.Name(), t.index.Name(), metaLength+indexEntrySize*(deleted-originDeleted), func(f *os.File) error { - _, err := f.Write(encoded) - return err - }) - if err != nil { - return err - } - if err := t.index.Close(); err != nil { - return err - } - offsets, err := openFreezerFileForAppend(t.index.Name()) - if err != nil { - return err - } - t.index = offsets - return nil -} - // truncateTail discards any recent data before the provided threshold number. func (t *freezerTable) truncateTail(items uint64) error { t.lock.Lock() defer t.lock.Unlock() // Ensure the given truncate target falls in the correct range - var ( - deleted = atomic.LoadUint64(&t.itemOffset) - hidden = atomic.LoadUint64(&t.itemHidden) - ) - if deleted+hidden >= items { + if atomic.LoadUint64(&t.itemHidden) >= items { return nil } - head := atomic.LoadUint64(&t.items) - if head < items { + if atomic.LoadUint64(&t.items) < items { return errors.New("truncation above head") } - // Load the file number of new tail item after the deletion. - count, err := t.indexLen() - if err != nil { - return err - } + // Load the new tail index by the given new tail position var ( - tailId uint32 - delLen = items - deleted + newTailId uint32 + buffer = make([]byte, indexEntrySize) ) - if uint64(count) == delLen { - tailId = t.headId + if atomic.LoadUint64(&t.items) == items { + newTailId = t.headId } else { - newTail, err := t.getIndex(int64(delLen), 0) - if err != nil { + offset := items - atomic.LoadUint64(&t.itemOffset) + if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil { return err } - tailId = newTail.filenum + var newTail indexEntry + newTail.unmarshalBinary(buffer) + newTailId = newTail.filenum } - // Freezer only supports deletion by file, just mark the entries as hidden - if t.tailId == tailId { - atomic.StoreUint64(&t.itemHidden, delLen) - return storeMetadata(t.index, newMetadata(t.tailId, deleted, delLen)) + // Update the virtual tail marker and hidden these entries in table. + atomic.StoreUint64(&t.itemHidden, items) + if err := writeMetadata(t.meta, newMetadata(items)); err != nil { + return err } - if t.tailId > tailId { - return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, tailId) + // Hidden items still fall in the current tail file, no data file + // can be dropped. + if t.tailId == newTailId { + return nil } - // We need to truncate, save the old size for metrics tracking + // Hidden items fall in the incorrect range, returns the error. + if t.tailId > newTailId { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) + } + // Hidden items exceed the current tail file, drop the relevant + // data files. We need to truncate, save the old size for metrics + // tracking. oldSize, err := t.sizeNolock() if err != nil { return err } // Count how many items can be deleted from the file. - var newDeleted = items + var ( + newDeleted = items + deleted = atomic.LoadUint64(&t.itemOffset) + ) for current := items - 1; current >= deleted; current -= 1 { - cur, err := t.getIndex(int64(current-deleted), 0) - if err != nil { + if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil { return err } - if cur.filenum != tailId { + var pre indexEntry + pre.unmarshalBinary(buffer) + if pre.filenum != newTailId { break } newDeleted = current } - if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, tailId); err != nil { + // Truncate the deleted index entries from the index file. + err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error { + tailIndex := indexEntry{ + filenum: newTailId, + offset: uint32(newDeleted), + } + encoded := tailIndex.append(nil) + n, err := f.Write(encoded) + if err != nil { + return err + } + if n != len(encoded) { + return fmt.Errorf("faield to write zero index %d %d", len(encoded), n) + } + return nil + }) + if err != nil { return err } + if err := t.index.Close(); err != nil { + return err + } + offsets, err := openFreezerFileForAppend(t.index.Name()) + if err != nil { + return err + } + t.index = offsets + // Release any files before the current tail - t.tailId = tailId + t.tailId = newTailId atomic.StoreUint64(&t.itemOffset, newDeleted) - atomic.StoreUint64(&t.itemHidden, items-newDeleted) t.releaseFilesBefore(t.tailId, true) // Retrieve the new size and update the total size counter @@ -653,56 +636,6 @@ func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { } } -// indexLen returns the total index entries stored in the index file. -// This number can also be counted as the data entries stored in the -// freezer table. -func (t *freezerTable) indexLen() (int64, error) { - stat, err := t.index.Stat() - if err != nil { - return 0, err - } - size := stat.Size() - if size < metaLength { - return 0, errors.New("invalid index file") - } - indexSize := size - metaLength - if indexSize%indexEntrySize != 0 { - return 0, errors.New("invalid index file") - } - return indexSize / indexEntrySize, nil -} - -// getIndex returns a single index from the index file, with the given offset -// interpreted according to whence: 0 means relative to the origin of the file -// and 1 means relative to the end. -func (t *freezerTable) getIndex(offset int64, whence int) (*indexEntry, error) { - count, err := t.indexLen() - if err != nil { - return nil, err - } - var ( - off int64 - index indexEntry - buffer = make([]byte, indexEntrySize) - ) - if whence == 0 { - if offset >= count { - return nil, errors.New("out of range") - } - off = metaLength + offset*indexEntrySize - } else { - if offset >= count { - return nil, errors.New("out of range") - } - off = metaLength + (count-1-offset)*indexEntrySize - } - if _, err := t.index.ReadAt(buffer, off); err != nil { - return nil, err - } - index.unmarshalBinary(buffer[:]) - return &index, nil -} - // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -710,32 +643,32 @@ func (t *freezerTable) getIndex(offset int64, whence int) (*indexEntry, error) { // so that the items are within bounds. If this method is used to read out of bounds, // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { - // Special case if we're reading the first item in the freezer. We assume that - // the first item always start from zero(regarding the deletion, we - // only support deletion by files, so that the assumption is held). - var indices []*indexEntry - from = from - atomic.LoadUint64(&t.itemOffset) - if from == 0 { - indices = append(indices, &indexEntry{ - filenum: t.tailId, - offset: 0, - }) - count = count - 1 - from = from + 1 - } + // Apply the table-offset + from = from - t.itemOffset // For reading N items, we need N+1 indices. - var ( - buffer = make([]byte, (count+1)*indexEntrySize) - offset = metaLength + int64(from-1)*indexEntrySize - ) - if _, err := t.index.ReadAt(buffer, offset); err != nil { + buffer := make([]byte, (count+1)*indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { return nil, err } - for i := 0; i <= int(count); i++ { + var ( + indices []*indexEntry + offset int + ) + for i := from; i <= from+count; i++ { index := new(indexEntry) - index.unmarshalBinary(buffer[i*indexEntrySize:]) + index.unmarshalBinary(buffer[offset:]) + offset += indexEntrySize indices = append(indices, index) } + if from == 0 { + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + // This means we can use the first item metadata to carry information about + // the 'global' offset, for the deletion-case + indices[0].offset = 0 + indices[0].filenum = indices[1].filenum + } return indices, nil } @@ -801,14 +734,12 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i return nil, nil, errClosed } var ( - items = atomic.LoadUint64(&t.items) // the total items(head + 1) - deleted = atomic.LoadUint64(&t.itemOffset) // the number of deleted items - hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items - tail = deleted + hidden + items = atomic.LoadUint64(&t.items) // the total items(head + 1) + hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items ) // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if items <= start || tail > start || count == 0 { + if items <= start || hidden > start || count == 0 { return nil, nil, errOutOfBounds } if start+count > items { @@ -962,12 +893,12 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { - meta, err := loadMetadata(t.index) + meta, err := readMetadata(t.meta) if err != nil { fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) return } - fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, meta.deleted, meta.hidden) + fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden)) buf := make([]byte, indexEntrySize) @@ -975,7 +906,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { - if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)+metaLength); err != nil { + if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil { break } var entry indexEntry diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index c2101f45a6952..7bbd5ea67a202 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -19,7 +19,6 @@ package rawdb import ( "bytes" "fmt" - "io/ioutil" "math/rand" "os" "path/filepath" @@ -204,8 +203,8 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Fatalf("Failed to open index file: %v", err) } // Remove everything but the first item, and leave data unaligned - // metadata, 1-indexEntry, corrupt-indexEntry - idxFile.Truncate(metaLength + indexEntrySize + indexEntrySize/2) + // 0-indexEntry, 1-indexEntry, corrupt-indexEntry + idxFile.Truncate(2*indexEntrySize + indexEntrySize/2) idxFile.Close() // Now open it again @@ -560,23 +559,27 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 6*indexEntrySize+metaLength) + indexBuf := make([]byte, 7*indexEntrySize) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, deleted = 4, hidden = 0 ] as meta - blob, err := encodeMetadata(newMetadata(2, 4, 0)) - if err != nil { - t.Fatal(err) + // [ file = 2, offset = 4 ] at index zero + + zeroIndex := indexEntry{ + filenum: uint32(2), // First file is 2 + offset: uint32(4), // We have removed four items } - copy(indexBuf, blob) + buf := zeroIndex.append(nil) + + // Overwrite index zero + copy(indexBuf, buf) // Remove the four next indices by overwriting - copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*4:]) + copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) indexFile.WriteAt(indexBuf, 0) // Need to truncate the moved index items - indexFile.Truncate(indexEntrySize*2 + metaLength) + indexFile.Truncate(indexEntrySize * (1 + 2)) indexFile.Close() } @@ -615,22 +618,21 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 2*indexEntrySize+metaLength) + indexBuf := make([]byte, 3*indexEntrySize) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, deleted = 1M, hidden = 0 ] as meta - blob, err := encodeMetadata(newMetadata(2, 1000000, 0)) - if err != nil { - t.Fatal(err) + // [ file = 2, offset = 1M ] at index zero + + zeroIndex := indexEntry{ + offset: uint32(1000000), // We have removed 1M items + filenum: uint32(2), // First file is 2 } - copy(indexBuf, blob) + buf := zeroIndex.append(nil) - // Remove the four 2 indices by overwriting - copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*2:]) + // Overwrite index zero + copy(indexBuf, buf) indexFile.WriteAt(indexBuf, 0) - - indexFile.Truncate(indexEntrySize*2 + metaLength) indexFile.Close() } @@ -822,41 +824,6 @@ func TestTruncateHead(t *testing.T) { }) } -func TestUpgradeLegacyFreezerTable(t *testing.T) { - f, err := ioutil.TempFile("", "") - if err != nil { - t.Fatal(err) - } - defer os.Remove(f.Name()) - - index := &indexEntry{ - filenum: 0, - offset: 0, - } - encoded := index.append(nil) - f.Write(encoded) - - newf, meta, err := repairTableIndex(f) - if err != nil { - t.Fatal(err) - } - if newf.Name() != f.Name() { - t.Fatal("Unexpected file name") - } - if meta.tailId != 0 { - t.Fatal("Unexpected tail file", meta.tailId) - } - if meta.deleted != 0 { - t.Fatal("Unexpected deleted items") - } - if meta.hidden != 0 { - t.Fatal("Unexpected hidden items") - } - if meta.version != freezerVersion { - t.Fatal("Unexpected freezer version") - } -} - func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { t.Helper() diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go index c7510d4da847c..5695fc0fa891b 100644 --- a/core/rawdb/freezer_utils.go +++ b/core/rawdb/freezer_utils.go @@ -43,7 +43,6 @@ func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) e } os.Remove(fname) }() - // Apply the given function if it's not nil before we copy // the content from the src. if before != nil { @@ -81,3 +80,41 @@ func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) e } return nil } + +// openFreezerFileForAppend opens a freezer table file and seeks to the end +func openFreezerFileForAppend(filename string) (*os.File, error) { + // Open the file without the O_APPEND flag + // because it has differing behaviour during Truncate operations + // on different OS's + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + // Seek to end for append + if _, err = file.Seek(0, io.SeekEnd); err != nil { + return nil, err + } + return file, nil +} + +// openFreezerFileForReadOnly opens a freezer table file for read only access +func openFreezerFileForReadOnly(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDONLY, 0644) +} + +// openFreezerFileTruncated opens a freezer table making sure it is truncated +func openFreezerFileTruncated(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +} + +// truncateFreezerFile resizes a freezer table file and seeks to the end +func truncateFreezerFile(file *os.File, size int64) error { + if err := file.Truncate(size); err != nil { + return err + } + // Seek to end for append + if _, err := file.Seek(0, io.SeekEnd); err != nil { + return err + } + return nil +} From 267eaa702272e39f908b650d273bc739ed7336ee Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 18 Feb 2022 15:48:10 +0800 Subject: [PATCH 05/10] core/rawdb: remove unused code --- core/rawdb/freezer.go | 5 +++-- core/rawdb/freezer_table.go | 9 +-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index cc81701462d3c..c7008e83f4d44 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -379,8 +379,9 @@ func (f *freezer) repair() error { if head > items { head = items } - if table.tail() > tail { - tail = table.tail() + hidden := atomic.LoadUint64(&table.itemHidden) + if hidden > tail { + tail = hidden } } for _, table := range f.tables { diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 466cf3622e68c..0a1b0a1da8e87 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -371,13 +371,6 @@ func (t *freezerTable) preopen() (err error) { return err } -// tail returns the index of the first stored item in the freezer table. -// It can also be interpreted as the number of items marked as deleted -// from the tail. -func (t *freezerTable) tail() uint64 { - return atomic.LoadUint64(&t.itemHidden) -} - // truncateHead discards any recent data above the provided threshold number. func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() @@ -822,7 +815,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i // has returns an indicator whether the specified number data is still accessible // in the freezer table. func (t *freezerTable) has(number uint64) bool { - return atomic.LoadUint64(&t.items) > number && t.tail() <= number + return atomic.LoadUint64(&t.items) > number && atomic.LoadUint64(&t.itemHidden) <= number } // size returns the total data size in the freezer table. From 3161d7ee4f38447eb3512a17e1dfd88a5c0e5ebd Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 21 Feb 2022 10:11:27 +0800 Subject: [PATCH 06/10] core/rawdb: add random test --- core/rawdb/freezer_table_test.go | 214 +++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 7bbd5ea67a202..0bddcf7211363 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -18,13 +18,18 @@ package rawdb import ( "bytes" + "encoding/binary" "fmt" "math/rand" "os" "path/filepath" + "reflect" + "sync/atomic" "testing" + "testing/quick" "time" + "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/metrics" "github.com/stretchr/testify/require" ) @@ -1080,3 +1085,212 @@ func TestFreezerReadonly(t *testing.T) { t.Fatalf("Writing to readonly table should fail") } } + +// randTest performs random freezer table operations. +// Instances of this test are created by Generate. +type randTest []randTestStep + +type randTestStep struct { + op int + items []uint64 // for append and retrieve + blobs [][]byte // for append + target uint64 // for truncate(head/tail) + err error // for debugging +} + +const ( + opReload = iota + opAppend + opRetrieve + opTruncateHead + opTruncateHeadAll + opTruncateTail + opTruncateTailAll + opCheckAll + opMax // boundary value, not an actual op +) + +func getVals(first uint64, n int) [][]byte { + var ret [][]byte + for i := 0; i < n; i++ { + val := make([]byte, 8) + binary.BigEndian.PutUint64(val, first+uint64(i)) + ret = append(ret, val) + } + return ret +} + +func (randTest) Generate(r *rand.Rand, size int) reflect.Value { + var ( + deleted uint64 // The number of deleted items from tail + items []uint64 // The index of entries in table + + // getItems retrieves the indexes for items in table. + getItems = func(n int) []uint64 { + length := len(items) + if length == 0 { + return nil + } + var ret []uint64 + index := rand.Intn(length) + for i := index; len(ret) < n && i < length; i++ { + ret = append(ret, items[i]) + } + return ret + } + + // addItems appends the given length items into the table. + addItems = func(n int) []uint64 { + var first = deleted + if len(items) != 0 { + first = items[len(items)-1] + 1 + } + var ret []uint64 + for i := 0; i < n; i++ { + ret = append(ret, first+uint64(i)) + } + items = append(items, ret...) + return ret + } + ) + + var steps randTest + for i := 0; i < size; i++ { + step := randTestStep{op: r.Intn(opMax)} + switch step.op { + case opReload, opCheckAll: + case opAppend: + num := r.Intn(3) + step.items = addItems(num) + if len(step.items) == 0 { + step.blobs = nil + } else { + step.blobs = getVals(step.items[0], num) + } + case opRetrieve: + step.items = getItems(r.Intn(3)) + case opTruncateHead: + if len(items) == 0 { + step.target = deleted + } else { + index := r.Intn(len(items)) + items = items[:index] + step.target = deleted + uint64(index) + } + case opTruncateHeadAll: + step.target = deleted + items = items[:0] + case opTruncateTail: + if len(items) == 0 { + step.target = deleted + } else { + index := r.Intn(len(items)) + items = items[index:] + deleted += uint64(index) + step.target = deleted + } + case opTruncateTailAll: + step.target = deleted + uint64(len(items)) + items = items[:0] + deleted = step.target + } + steps = append(steps, step) + } + return reflect.ValueOf(steps) +} + +func runRandTest(rt randTest) bool { + fname := fmt.Sprintf("randtest-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fname, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + panic("failed to initialize table") + } + var values [][]byte + for i, step := range rt { + switch step.op { + case opReload: + f.Close() + f, err = newTable(os.TempDir(), fname, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + rt[i].err = fmt.Errorf("failed to reload table %v", err) + } + case opCheckAll: + tail := atomic.LoadUint64(&f.itemHidden) + head := atomic.LoadUint64(&f.items) + + if tail == head { + continue + } + got, err := f.RetrieveItems(atomic.LoadUint64(&f.itemHidden), head-tail, 100000) + if err != nil { + rt[i].err = err + } else { + if !reflect.DeepEqual(got, values) { + rt[i].err = fmt.Errorf("mismatch on retrieved values %v %v", got, values) + } + } + + case opAppend: + batch := f.newBatch() + for i := 0; i < len(step.items); i++ { + batch.AppendRaw(step.items[i], step.blobs[i]) + } + batch.commit() + values = append(values, step.blobs...) + + case opRetrieve: + var blobs [][]byte + if len(step.items) == 0 { + continue + } + tail := atomic.LoadUint64(&f.itemHidden) + for i := 0; i < len(step.items); i++ { + blobs = append(blobs, values[step.items[i]-tail]) + } + got, err := f.RetrieveItems(step.items[0], uint64(len(step.items)), 100000) + if err != nil { + rt[i].err = err + } else { + if !reflect.DeepEqual(got, blobs) { + rt[i].err = fmt.Errorf("mismatch on retrieved values %v %v %v", got, blobs, step.items) + } + } + + case opTruncateHead: + f.truncateHead(step.target) + + length := atomic.LoadUint64(&f.items) - atomic.LoadUint64(&f.itemHidden) + values = values[:length] + + case opTruncateHeadAll: + f.truncateHead(step.target) + values = nil + + case opTruncateTail: + prev := atomic.LoadUint64(&f.itemHidden) + f.truncateTail(step.target) + + truncated := atomic.LoadUint64(&f.itemHidden) - prev + values = values[truncated:] + + case opTruncateTailAll: + f.truncateTail(step.target) + values = nil + } + // Abort the test on error. + if rt[i].err != nil { + return false + } + } + f.Close() + return true +} + +func TestRandom(t *testing.T) { + if err := quick.Check(runRandTest, nil); err != nil { + if cerr, ok := err.(*quick.CheckError); ok { + t.Fatalf("random test iteration %d failed: %s", cerr.Count, spew.Sdump(cerr.In)) + } + t.Fatal(err) + } +} From 991bba0569d034da185c7ebb927832f0f9dc798a Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 21 Feb 2022 13:36:54 +0800 Subject: [PATCH 07/10] core/rawdb: polish code --- core/rawdb/freezer_meta.go | 7 +------ core/rawdb/freezer_meta_test.go | 4 ++-- core/rawdb/freezer_table.go | 15 +++------------ 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 060bcfd5193c6..17f5627b3e450 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -88,12 +88,7 @@ func writeMetadata(file *os.File, meta *freezerTableMeta) error { if err != nil { return err } - encoded, err := rlp.EncodeToBytes(meta) - if err != nil { - return err - } - _, err = file.Write(encoded) - return err + return rlp.Encode(file, meta) } // loadMetadata loads the metadata from the given metadata file. diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index c2c01e46e1fe3..47f0b8485b6cf 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -48,14 +48,14 @@ func TestInitializeFreezerTableMeta(t *testing.T) { if err != nil { t.Fatalf("Failed to create file %v", err) } - meta, err := loadMetadata(f, uint64(0)) + meta, err := loadMetadata(f, uint64(100)) if err != nil { t.Fatalf("Failed to read metadata %v", err) } if meta.version != freezerVersion { t.Fatalf("Unexpected version field") } - if meta.VirtualTail != uint64(0) { + if meta.VirtualTail != uint64(100) { t.Fatalf("Unexpected virtual tail field") } } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 0a1b0a1da8e87..5d3b8c5da58a2 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -515,15 +515,8 @@ func (t *freezerTable) truncateTail(items uint64) error { filenum: newTailId, offset: uint32(newDeleted), } - encoded := tailIndex.append(nil) - n, err := f.Write(encoded) - if err != nil { - return err - } - if n != len(encoded) { - return fmt.Errorf("faield to write zero index %d %d", len(encoded), n) - } - return nil + _, err := f.Write(tailIndex.append(nil)) + return err }) if err != nil { return err @@ -531,12 +524,10 @@ func (t *freezerTable) truncateTail(items uint64) error { if err := t.index.Close(); err != nil { return err } - offsets, err := openFreezerFileForAppend(t.index.Name()) + t.index, err = openFreezerFileForAppend(t.index.Name()) if err != nil { return err } - t.index = offsets - // Release any files before the current tail t.tailId = newTailId atomic.StoreUint64(&t.itemOffset, newDeleted) From 62333e8c0e5f2b03008480c1dee73f3856ced81b Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 21 Feb 2022 13:58:10 +0800 Subject: [PATCH 08/10] core/rawdb: fsync meta file before manipulating the index --- core/rawdb/freezer_table.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 5d3b8c5da58a2..2490e00109ba1 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -509,6 +509,11 @@ func (t *freezerTable) truncateTail(items uint64) error { } newDeleted = current } + // Commit the changes of metadata file first before manipulating + // the indexes file. + if err := t.meta.Sync(); err != nil { + return err + } // Truncate the deleted index entries from the index file. err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error { tailIndex := indexEntry{ @@ -521,6 +526,7 @@ func (t *freezerTable) truncateTail(items uint64) error { if err != nil { return err } + // Reopen the modified index file to load the changes if err := t.index.Close(); err != nil { return err } @@ -553,6 +559,11 @@ func (t *freezerTable) Close() error { } t.index = nil + if err := t.meta.Close(); err != nil { + errs = append(errs, err) + } + t.meta = nil + for _, f := range t.files { if err := f.Close(); err != nil { errs = append(errs, err) @@ -860,6 +871,9 @@ func (t *freezerTable) Sync() error { if err := t.index.Sync(); err != nil { return err } + if err := t.meta.Sync(); err != nil { + return err + } return t.head.Sync() } From b23cdebc3bf6ed121066c88595784776f2711494 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 23 Feb 2022 12:39:45 +0800 Subject: [PATCH 09/10] core/rawdb: fix typo --- core/rawdb/freezer_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 2490e00109ba1..b4b2000dc46d2 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -46,7 +46,7 @@ var ( errNotSupported = errors.New("this operation is not supported") ) -// indexEntry contains the number/id of the file that the data resides in, as well as the +// indexEntry contains the number/id of the file that the data resides in, aswell as the // offset within the file to the end of the data. // In serialized form, the filenum is stored as uint16. type indexEntry struct { From 940cb9dfe01ee536bee1d42f6d6bbc2656615401 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 24 Feb 2022 21:43:36 +0800 Subject: [PATCH 10/10] core/rawdb: address comments --- core/rawdb/freezer_meta.go | 33 ++++++--------------------------- core/rawdb/freezer_meta_test.go | 4 ++-- core/rawdb/freezer_table.go | 2 +- 3 files changed, 9 insertions(+), 30 deletions(-) diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 17f5627b3e450..d0bd2f9544366 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -17,8 +17,6 @@ package rawdb import ( - "encoding/binary" - "errors" "io" "os" @@ -30,8 +28,8 @@ const freezerVersion = 1 // The initial version tag of freezer table metadata // freezerTableMeta wraps all the metadata of the freezer table. type freezerTableMeta struct { - // version is the versioning descriptor of the freezer table. - version uint16 + // Version is the versioning descriptor of the freezer table. + Version uint16 // VirtualTail indicates how many items have been marked as deleted. // Its value is equal to the number of items removed from the table @@ -43,7 +41,7 @@ type freezerTableMeta struct { // newMetadata initializes the metadata object with the given virtual tail. func newMetadata(tail uint64) *freezerTableMeta { return &freezerTableMeta{ - version: freezerVersion, + Version: freezerVersion, VirtualTail: tail, } } @@ -55,24 +53,11 @@ func readMetadata(file *os.File) (*freezerTableMeta, error) { if err != nil { return nil, err } - // load the first 2 bytes, resolve the version tag - var buf [2]byte - _, err = file.Read(buf[:2]) - if err != nil { + var meta freezerTableMeta + if err := rlp.Decode(file, &meta); err != nil { return nil, err } - version := binary.BigEndian.Uint16(buf[:]) - switch version { - case freezerVersion: - var meta freezerTableMeta - if err := rlp.Decode(file, &meta); err != nil { - return nil, err - } - meta.version = freezerVersion - return &meta, nil - default: - return nil, errors.New("undefined version") - } + return &meta, nil } // writeMetadata writes the metadata of the freezer table into the @@ -82,12 +67,6 @@ func writeMetadata(file *os.File, meta *freezerTableMeta) error { if err != nil { return err } - var buf [2]byte - binary.BigEndian.PutUint16(buf[:], meta.version) - _, err = file.Write(buf[:]) - if err != nil { - return err - } return rlp.Encode(file, meta) } diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index 47f0b8485b6cf..191744a754108 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -35,7 +35,7 @@ func TestReadWriteFreezerTableMeta(t *testing.T) { if err != nil { t.Fatalf("Failed to read metadata %v", err) } - if meta.version != freezerVersion { + if meta.Version != freezerVersion { t.Fatalf("Unexpected version field") } if meta.VirtualTail != uint64(100) { @@ -52,7 +52,7 @@ func TestInitializeFreezerTableMeta(t *testing.T) { if err != nil { t.Fatalf("Failed to read metadata %v", err) } - if meta.version != freezerVersion { + if meta.Version != freezerVersion { t.Fatalf("Unexpected version field") } if meta.VirtualTail != uint64(100) { diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index b4b2000dc46d2..4456937286522 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -896,7 +896,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) return } - fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden)) + fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.Version, atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden)) buf := make([]byte, indexEntrySize)