Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateHead(num); err != nil {
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
if _, err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
Expand All @@ -1157,7 +1157,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
if _, err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
Expand Down
25 changes: 22 additions & 3 deletions core/rawdb/ancient_scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package rawdb

import (
"path/filepath"

"github.com/CortexFoundation/CortexTheseus/ctxcdb"
)

// The list of table names of chain freezer.
const (
// ChainFreezerHeaderTable indicates the name of the freezer header table.
Expand Down Expand Up @@ -66,9 +72,22 @@ var stateFreezerNoSnappy = map[string]bool{

// The list of identifiers of ancient stores.
var (
chainFreezerName = "chain" // the folder name of chain segment ancient store.
stateFreezerName = "state" // the folder name of reverse diff ancient store.
ChainFreezerName = "chain" // the folder name of chain segment ancient store.
StateFreezerName = "state" // the folder name of reverse diff ancient store.
)

// freezers the collections of all builtin freezers.
var freezers = []string{chainFreezerName}
var freezers = []string{ChainFreezerName, StateFreezerName}

// NewStateFreezer initializes the ancient store for state history.
//
// - if the empty directory is given, initializes the pure in-memory
// state freezer (e.g. dev mode).
// - if non-empty directory is given, initializes the regular file-based
// state freezer.
func NewStateFreezer(ancientDir string, readOnly bool) (ctxcdb.ResettableAncientStore, error) {
if ancientDir == "" {
return NewMemoryFreezer(readOnly, stateFreezerNoSnappy), nil
}
return newResettableFreezer(filepath.Join(ancientDir, StateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy)
}
6 changes: 3 additions & 3 deletions core/rawdb/ancient_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func inspectFreezers(db ctxcdb.Database) ([]freezerInfo, error) {
var infos []freezerInfo
for _, freezer := range freezers {
switch freezer {
case chainFreezerName:
case ChainFreezerName:
// Chain ancient store is a bit special. It's always opened along
// with the key-value store, inspect the chain store directly.
info := freezerInfo{name: freezer}
Expand Down Expand Up @@ -100,9 +100,9 @@ func InspectFreezerTable(ancient string, freezerName string, tableName string, s
tables map[string]bool
)
switch freezerName {
case chainFreezerName:
case ChainFreezerName:
path, tables = resolveChainFreezerDir(ancient), chainFreezerNoSnappy
case stateFreezerName:
case StateFreezerName:
path, tables = filepath.Join(ancient, freezerName), stateFreezerNoSnappy
default:
return fmt.Errorf("unknown freezer, supported ones: %v", freezers)
Expand Down
40 changes: 27 additions & 13 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,40 @@ const (
freezerBatchLimit = 30000
)

// chainFreezer is a wrapper of freezer with additional chain freezing feature.
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
// chainFreezer is a wrapper of chain ancient store with additional chain freezing
// feature. The background thread will keep moving ancient chain segments from
// key-value database to flat files for saving space on live database.
type chainFreezer struct {
*Freezer
ctxcdb.AncientStore // Ancient store for storing cold chain segment

quit chan struct{}
wg sync.WaitGroup
trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
}

// newChainFreezer initializes the freezer for ancient chain data.
// newChainFreezer initializes the freezer for ancient chain segment.
//
// - if the empty directory is given, initializes the pure in-memory
// state freezer (e.g. dev mode).
// - if non-empty directory is given, initializes the regular file-based
// state freezer.
func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) {
freezer, err := NewChainFreezer(datadir, namespace, readonly)
var (
err error
freezer ctxcdb.AncientStore
)
if datadir == "" {
freezer = NewMemoryFreezer(readonly, chainFreezerNoSnappy)
} else {
freezer, err = NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy)
}
if err != nil {
return nil, err
}
return &chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
AncientStore: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}, nil
}

Expand All @@ -70,7 +84,7 @@ func (f *chainFreezer) Close() error {
close(f.quit)
}
f.wg.Wait()
return f.Freezer.Close()
return f.AncientStore.Close()
}

// readHeadNumber returns the number of chain head block. 0 is returned if the
Expand Down Expand Up @@ -167,7 +181,7 @@ func (f *chainFreezer) freeze(db ctxcdb.KeyValueStore) {
log.Debug("Current full block not old enough to freeze", "err", err)
continue
}
frozen := f.frozen.Load()
frozen, _ := f.Ancients() // no error will occur, safe to ignore

// Short circuit if the blocks below threshold are already frozen.
if frozen != 0 && frozen-1 >= threshold {
Expand All @@ -190,7 +204,7 @@ func (f *chainFreezer) freeze(db ctxcdb.KeyValueStore) {
backoff = true
continue
}
// Batch of blocks have been frozen, flush them before wiping from leveldb
// Batch of blocks have been frozen, flush them before wiping from key-value store
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
Expand All @@ -210,7 +224,7 @@ func (f *chainFreezer) freeze(db ctxcdb.KeyValueStore) {

// Wipe out side chains also and track dangling side chains
var dangling []common.Hash
frozen = f.frozen.Load() // Needs reload after during freezeRange
frozen, _ = f.Ancients() // Needs reload after during freezeRange
for number := first; number < frozen; number++ {
// Always keep the genesis block in active database
if number != 0 {
Expand Down
37 changes: 22 additions & 15 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ import (
"github.com/CortexFoundation/CortexTheseus/log"
)

// freezerdb is a database wrapper that enabled freezer data retrievals.
// freezerdb is a database wrapper that enables ancient chain segment freezing.
type freezerdb struct {
ancientRoot string
ctxcdb.KeyValueStore
ctxcdb.AncientStore
*chainFreezer

readOnly bool
ancientRoot string
}

// AncientDatadir returns the path of root ancient directory.
Expand All @@ -52,7 +54,7 @@ func (frdb *freezerdb) AncientDatadir() (string, error) {
// the slow ancient tables.
func (frdb *freezerdb) Close() error {
var errs []error
if err := frdb.AncientStore.Close(); err != nil {
if err := frdb.chainFreezer.Close(); err != nil {
errs = append(errs, err)
}
if err := frdb.KeyValueStore.Close(); err != nil {
Expand All @@ -68,12 +70,12 @@ func (frdb *freezerdb) Close() error {
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
func (frdb *freezerdb) Freeze() error {
if frdb.AncientStore.(*chainFreezer).readonly {
if frdb.readOnly {
return errReadOnly
}
// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
frdb.AncientStore.(*chainFreezer).trigger <- trigger
frdb.chainFreezer.trigger <- trigger
<-trigger
return nil
}
Expand Down Expand Up @@ -119,13 +121,13 @@ func (db *nofreezedb) ModifyAncients(func(ctxcdb.AncientWriteOp) error) (int64,
}

// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) error {
return errNotSupported
func (db *nofreezedb) TruncateHead(items uint64) (uint64, error) {
return 0, errNotSupported
}

// TruncateTail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, errNotSupported
}

// Sync returns an error as we don't have a backing chain freezer.
Expand Down Expand Up @@ -173,7 +175,7 @@ func resolveChainFreezerDir(ancient string) string {
// sub folder, if not then two possibilities:
// - chain freezer is not initialized
// - chain freezer exists in legacy location (root ancient folder)
freezer := path.Join(ancient, chainFreezerName)
freezer := path.Join(ancient, ChainFreezerName)
if !common.FileExist(freezer) {
if !common.FileExist(ancient) {
// The entire ancient store is not initialized, still use the sub
Expand All @@ -194,8 +196,13 @@ func resolveChainFreezerDir(ancient string) string {
// storage. The passed ancient indicates the path of root ancient directory
// where the chain freezer can be opened.
func NewDatabaseWithFreezer(db ctxcdb.KeyValueStore, ancient string, namespace string, readonly bool) (ctxcdb.Database, error) {
// Create the idle freezer instance
frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly)
// Create the idle freezer instance. If the given ancient directory is empty,
// in-memory chain freezer is used (e.g. dev mode); otherwise the regular
// file-based freezer is created.
if ancient != "" {
ancient = resolveChainFreezerDir(ancient)
}
frdb, err := newChainFreezer(ancient, namespace, readonly)
if err != nil {
printChainMetadata(db)
return nil, err
Expand Down Expand Up @@ -279,7 +286,7 @@ func NewDatabaseWithFreezer(db ctxcdb.KeyValueStore, ancient string, namespace s
}
}
// Freezer is consistent with the key-value database, permit combining the two
if !frdb.readonly {
if !readonly {
frdb.wg.Add(1)
go func() {
frdb.freeze(db)
Expand All @@ -289,7 +296,7 @@ func NewDatabaseWithFreezer(db ctxcdb.KeyValueStore, ancient string, namespace s
return &freezerdb{
ancientRoot: ancient,
KeyValueStore: db,
AncientStore: frdb,
chainFreezer: frdb,
}, nil
}

Expand Down
34 changes: 15 additions & 19 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
// reserving it for CortexTheseus. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type Freezer struct {
frozen atomic.Uint64 // Number of blocks already frozen
frozen atomic.Uint64 // Number of items already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer

// This lock synchronizes writers and the truncate operation, as well as
Expand All @@ -77,12 +77,6 @@ type Freezer struct {
closeOnce sync.Once
}

// NewChainFreezer is a small utility method around NewFreezer that sets the
// default parameters for the chain storage.
func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, error) {
return NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy)
}

// NewFreezer creates a freezer instance for maintaining immutable ordered
// data according to the given parameters.
//
Expand Down Expand Up @@ -280,43 +274,45 @@ func (f *Freezer) ModifyAncients(fn func(ctxcdb.AncientWriteOp) error) (writeSiz
}

// TruncateHead discards any recent data above the provided threshold number.
func (f *Freezer) TruncateHead(items uint64) error {
func (f *Freezer) TruncateHead(items uint64) (uint64, error) {
if f.readonly {
return errReadOnly
return 0, errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

if f.frozen.Load() <= items {
return nil
oitems := f.frozen.Load()
if oitems <= items {
return oitems, nil
}
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return err
return 0, err
}
}
f.frozen.Store(items)
return nil
return oitems, nil
}

// TruncateTail discards any recent data below the provided threshold number.
func (f *Freezer) TruncateTail(tail uint64) error {
func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
if f.readonly {
return errReadOnly
return 0, errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

if f.tail.Load() >= tail {
return nil
old := f.tail.Load()
if old >= tail {
return old, nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return err
return 0, err
}
}
f.tail.Store(tail)
return nil
return old, nil
}

// Sync flushes all data tables to disk.
Expand Down
Loading