From 46480491b14c0ea4f0cab87d67edfae5a8814d00 Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Thu, 3 Jul 2025 10:38:23 -0400 Subject: [PATCH] refactor(database): use a config object for the database Signed-off-by: Chris Gianelloni --- chain/chain.go | 5 +- chain/chain_test.go | 17 ++++-- chain/manager.go | 20 +++++-- database/block_nonce.go | 10 +++- database/database.go | 55 +++++++++++++++---- database/database_test.go | 10 +++- .../plugin/metadata/sqlite/block_nonce.go | 3 +- internal/node/load.go | 8 ++- ledger/chainsync.go | 22 +++++++- ledger/state.go | 6 +- node.go | 8 ++- 11 files changed, 132 insertions(+), 32 deletions(-) diff --git a/chain/chain.go b/chain/chain.go index dad4fc69..abb0d229 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -463,7 +463,10 @@ func (c *Chain) reconcile() error { // Check our blocks against primary chain until we find a match primaryChain := c.manager.PrimaryChain() for i := len(c.blocks) - 1; i >= 0; i-- { - tmpBlock, err := primaryChain.blockByIndex(c.lastCommonBlockIndex+uint64(i), nil) + tmpBlock, err := primaryChain.blockByIndex( + c.lastCommonBlockIndex+uint64(i), + nil, + ) if err != nil { if errors.Is(err, ErrBlockNotFound) { continue diff --git a/chain/chain_test.go b/chain/chain_test.go index c17d1647..9dfdd518 100644 --- a/chain/chain_test.go +++ b/chain/chain_test.go @@ -106,6 +106,12 @@ var ( MockPrevHash: testHashPrefix + "0005", }, } + dbConfig = &database.Config{ + BlobCacheSize: 1 << 20, + Logger: nil, + PromRegistry: nil, + DataDir: "", + } ) func TestChainBasic(t *testing.T) { @@ -463,8 +469,7 @@ func TestChainFromIntersect(t *testing.T) { Slot: testBlocks[testForkPointIndex].MockSlot, }, } - const testCacheSize int64 = 1 << 20 - db, err := database.New(nil, nil, "", testCacheSize) + db, err := database.New(dbConfig) if err != nil { t.Fatalf("unexpected error creating database: %s", err) } @@ -522,8 +527,7 @@ func TestChainFork(t *testing.T) { MockPrevHash: testHashPrefix + "00a5", }, } - const testCacheSize int64 = 1 << 20 - db, err := database.New(nil, nil, "", testCacheSize) + db, err := database.New(dbConfig) if err != nil { t.Fatalf("unexpected error creating database: %s", err) } @@ -554,7 +558,10 @@ func TestChainFork(t *testing.T) { // Iterate until hitting chain tip, and make sure we get blocks in the correct order with // all expected data testBlockIdx := 0 - testBlocks := slices.Concat(testBlocks[0:testForkPointIndex+1], testForkBlocks) + testBlocks := slices.Concat( + testBlocks[0:testForkPointIndex+1], + testForkBlocks, + ) for { next, err := iter.Next(false) if err != nil { diff --git a/chain/manager.go b/chain/manager.go index 3c75eaec..5a605d64 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -42,7 +42,10 @@ type ChainManager struct { blocks map[string]database.Block } -func NewManager(db *database.Database, eventBus *event.EventBus) (*ChainManager, error) { +func NewManager( + db *database.Database, + eventBus *event.EventBus, +) (*ChainManager, error) { cm := &ChainManager{ db: db, eventBus: eventBus, @@ -98,7 +101,9 @@ func (cm *ChainManager) NewChain(point ocommon.Point) (*Chain, error) { } // NewChainFromIntersect creates a new Chain that forks the primary chain at the latest common point. -func (cm *ChainManager) NewChainFromIntersect(points []ocommon.Point) (*Chain, error) { +func (cm *ChainManager) NewChainFromIntersect( + points []ocommon.Point, +) (*Chain, error) { cm.mutex.Lock() defer cm.mutex.Unlock() primaryChain := cm.PrimaryChain() @@ -264,7 +269,11 @@ func (cm *ChainManager) loadPrimaryChain() error { return nil } -func (cm *ChainManager) addBlock(block database.Block, txn *database.Txn, persistent bool) error { +func (cm *ChainManager) addBlock( + block database.Block, + txn *database.Txn, + persistent bool, +) error { if persistent { // Add block to database if err := cm.db.BlockCreate(block, txn); err != nil { @@ -309,7 +318,10 @@ func (cm *ChainManager) removeBlockByIndex(blockIndex uint64) error { return nil } -func (cm *ChainManager) chainNeedsReconcile(chainId ChainId, lastCommonBlockIndex uint64) bool { +func (cm *ChainManager) chainNeedsReconcile( + chainId ChainId, + lastCommonBlockIndex uint64, +) bool { events, ok := cm.chainRollbackEvents[chainId] if !ok { return false diff --git a/database/block_nonce.go b/database/block_nonce.go index aa0d39ac..443f0538 100644 --- a/database/block_nonce.go +++ b/database/block_nonce.go @@ -79,7 +79,13 @@ func (d *Database) DeleteBlockNoncesBeforeSlotWithoutCheckpoints( txn *Txn, ) error { if txn == nil { - return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(slotNumber, nil) + return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints( + slotNumber, + nil, + ) } - return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(slotNumber, txn.Metadata()) + return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints( + slotNumber, + txn.Metadata(), + ) } diff --git a/database/database.go b/database/database.go index bfc41787..f29e0173 100644 --- a/database/database.go +++ b/database/database.go @@ -24,12 +24,29 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +var DefaultConfig = &Config{ + BlobCacheSize: 1073741824, + BlobPlugin: "badger", + DataDir: ".dingo", + MetadataPlugin: "sqlite", +} + +// Config represents the configuration for a database instance +type Config struct { + BlobCacheSize int64 + BlobPlugin string + DataDir string + MetadataPlugin string + Logger *slog.Logger + PromRegistry prometheus.Registerer +} + // Database represents our data storage services type Database struct { + config *Config logger *slog.Logger blob blob.BlobStore metadata metadata.MetadataStore - dataDir string } // Blob returns the underling blob store instance @@ -37,14 +54,19 @@ func (d *Database) Blob() blob.BlobStore { return d.blob } +// Config returns the config object used for the database instance +func (d *Database) Config() *Config { + return d.config +} + // DataDir returns the path to the data directory used for storage func (d *Database) DataDir() string { - return d.dataDir + return d.config.DataDir } // Logger returns the logger instance func (d *Database) Logger() *slog.Logger { - return d.logger + return d.config.Logger } // Metadata returns the underlying metadata store instance @@ -94,24 +116,35 @@ func (d *Database) init() error { // New creates a new database instance with optional persistence using the provided data directory func New( - logger *slog.Logger, - promRegistry prometheus.Registerer, - dataDir string, - badgerCacheSize int64, + config *Config, ) (*Database, error) { - metadataDb, err := metadata.New("sqlite", dataDir, logger, promRegistry) + if config == nil { + config = DefaultConfig + } + blobDb, err := blob.New( + config.BlobPlugin, + config.DataDir, + config.Logger, + config.PromRegistry, + config.BlobCacheSize, + ) if err != nil { return nil, err } - blobDb, err := blob.New("badger", dataDir, logger, promRegistry, badgerCacheSize) + metadataDb, err := metadata.New( + config.MetadataPlugin, + config.DataDir, + config.Logger, + config.PromRegistry, + ) if err != nil { return nil, err } db := &Database{ - logger: logger, blob: blobDb, metadata: metadataDb, - dataDir: dataDir, + logger: config.Logger, + config: config, } if err := db.init(); err != nil { // Database is available for recovery, so return it with error diff --git a/database/database_test.go b/database/database_test.go index d04869c9..ab9e55ee 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -26,6 +26,13 @@ type TestTable struct { gorm.Model } +var dbConfig = &database.Config{ + BlobCacheSize: 1 << 20, + Logger: nil, + PromRegistry: nil, + DataDir: "", +} + // TestInMemorySqliteMultipleTransaction tests that our sqlite connection allows multiple // concurrent transactions when using in-memory mode. This requires special URI flags, and // this is mostly making sure that we don't lose them @@ -42,8 +49,7 @@ func TestInMemorySqliteMultipleTransaction(t *testing.T) { } return nil } - const testCacheSize int64 = 1 << 20 - db, err := database.New(nil, nil, "", testCacheSize) // in-memory + db, err := database.New(dbConfig) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/database/plugin/metadata/sqlite/block_nonce.go b/database/plugin/metadata/sqlite/block_nonce.go index e9372473..6dc364ae 100644 --- a/database/plugin/metadata/sqlite/block_nonce.go +++ b/database/plugin/metadata/sqlite/block_nonce.go @@ -58,7 +58,8 @@ func (d *MetadataStoreSqlite) GetBlockNonce( ) ([]byte, error) { ret := models.BlockNonce{} if txn != nil { - result := txn.Where("hash = ? AND slot = ?", blockHash, slotNumber).First(&ret) + result := txn.Where("hash = ? AND slot = ?", blockHash, slotNumber). + First(&ret) if result.Error != nil { if !errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, result.Error diff --git a/internal/node/load.go b/internal/node/load.go index 6ceea1a5..265e3634 100644 --- a/internal/node/load.go +++ b/internal/node/load.go @@ -47,7 +47,13 @@ func Load(cfg *config.Config, logger *slog.Logger, immutableDir string) error { ) } // Load database - db, err := database.New(logger, nil, cfg.DatabasePath, cfg.BadgerCacheSize) + dbConfig := &database.Config{ + BlobCacheSize: cfg.BadgerCacheSize, + DataDir: cfg.DatabasePath, + Logger: logger, + PromRegistry: nil, + } + db, err := database.New(dbConfig) if err != nil { return err } diff --git a/ledger/chainsync.go b/ledger/chainsync.go index 2063b822..aade0726 100644 --- a/ledger/chainsync.go +++ b/ledger/chainsync.go @@ -96,7 +96,13 @@ func (ls *LedgerState) handleEventBlockfetch(evt event.Event) { func (ls *LedgerState) handleEventChainsyncRollback(e ChainsyncEvent) error { if ls.chainsyncState == SyncingChainsyncState { - ls.config.Logger.Warn(fmt.Sprintf("ledger: rolling back to %d.%s", e.Point.Slot, hex.EncodeToString(e.Point.Hash))) + ls.config.Logger.Warn( + fmt.Sprintf( + "ledger: rolling back to %d.%s", + e.Point.Slot, + hex.EncodeToString(e.Point.Hash), + ), + ) ls.chainsyncState = RollbackChainsyncState } if err := ls.chain.Rollback(e.Point); err != nil { @@ -107,7 +113,13 @@ func (ls *LedgerState) handleEventChainsyncRollback(e ChainsyncEvent) error { func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error { if ls.chainsyncState == RollbackChainsyncState { - ls.config.Logger.Info(fmt.Sprintf("ledger: switched to fork at %d.%s", e.Point.Slot, hex.EncodeToString(e.Point.Hash))) + ls.config.Logger.Info( + fmt.Sprintf( + "ledger: switched to fork at %d.%s", + e.Point.Slot, + hex.EncodeToString(e.Point.Hash), + ), + ) ls.metrics.forks.Add(1) } ls.chainsyncState = SyncingChainsyncState @@ -289,7 +301,11 @@ func (ls *LedgerState) calculateEpochNonce( if err != nil { return nil, fmt.Errorf("lookup block before slot: %w", err) } - blockBeforeStabilityWindowNonce, err := ls.db.GetBlockNonce(blockBeforeStabilityWindow.Hash, blockBeforeStabilityWindow.Slot, txn) + blockBeforeStabilityWindowNonce, err := ls.db.GetBlockNonce( + blockBeforeStabilityWindow.Hash, + blockBeforeStabilityWindow.Slot, + txn, + ) if err != nil { return nil, fmt.Errorf("lookup block nonce: %w", err) } diff --git a/ledger/state.go b/ledger/state.go index b5873202..3d4ec5d3 100644 --- a/ledger/state.go +++ b/ledger/state.go @@ -742,7 +742,11 @@ func (ls *LedgerState) loadTip() error { ls.currentTip = tmpTip // Load tip block and set cached block nonce if ls.currentTip.Point.Slot > 0 { - tipNonce, err := ls.db.GetBlockNonce(tmpTip.Point.Hash, tmpTip.Point.Slot, nil) + tipNonce, err := ls.db.GetBlockNonce( + tmpTip.Point.Hash, + tmpTip.Point.Slot, + nil, + ) if err != nil { return err } diff --git a/node.go b/node.go index 93cd149b..efe38258 100644 --- a/node.go +++ b/node.go @@ -77,7 +77,13 @@ func (n *Node) Run() error { } // Load database dbNeedsRecovery := false - db, err := database.New(n.config.logger, n.config.promRegistry, n.config.dataDir, n.config.badgerCacheSize) + dbConfig := &database.Config{ + BlobCacheSize: n.config.badgerCacheSize, + DataDir: n.config.dataDir, + Logger: n.config.logger, + PromRegistry: n.config.promRegistry, + } + db, err := database.New(dbConfig) if db == nil { n.config.logger.Error( "failed to create database",