Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ func (c *Chain) reconcile() error {
// Check our blocks against primary chain until we find a match
primaryChain := c.manager.PrimaryChain()
for i := len(c.blocks) - 1; i >= 0; i-- {
tmpBlock, err := primaryChain.blockByIndex(c.lastCommonBlockIndex+uint64(i), nil)
tmpBlock, err := primaryChain.blockByIndex(
c.lastCommonBlockIndex+uint64(i),
nil,
)
if err != nil {
if errors.Is(err, ErrBlockNotFound) {
continue
Expand Down
17 changes: 12 additions & 5 deletions chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ var (
MockPrevHash: testHashPrefix + "0005",
},
}
dbConfig = &database.Config{
BlobCacheSize: 1 << 20,
Logger: nil,
PromRegistry: nil,
DataDir: "",
}
)

func TestChainBasic(t *testing.T) {
Expand Down Expand Up @@ -463,8 +469,7 @@ func TestChainFromIntersect(t *testing.T) {
Slot: testBlocks[testForkPointIndex].MockSlot,
},
}
const testCacheSize int64 = 1 << 20
db, err := database.New(nil, nil, "", testCacheSize)
db, err := database.New(dbConfig)
if err != nil {
t.Fatalf("unexpected error creating database: %s", err)
}
Expand Down Expand Up @@ -522,8 +527,7 @@ func TestChainFork(t *testing.T) {
MockPrevHash: testHashPrefix + "00a5",
},
}
const testCacheSize int64 = 1 << 20
db, err := database.New(nil, nil, "", testCacheSize)
db, err := database.New(dbConfig)
if err != nil {
t.Fatalf("unexpected error creating database: %s", err)
}
Expand Down Expand Up @@ -554,7 +558,10 @@ func TestChainFork(t *testing.T) {
// Iterate until hitting chain tip, and make sure we get blocks in the correct order with
// all expected data
testBlockIdx := 0
testBlocks := slices.Concat(testBlocks[0:testForkPointIndex+1], testForkBlocks)
testBlocks := slices.Concat(
testBlocks[0:testForkPointIndex+1],
testForkBlocks,
)
for {
next, err := iter.Next(false)
if err != nil {
Expand Down
20 changes: 16 additions & 4 deletions chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ type ChainManager struct {
blocks map[string]database.Block
}

func NewManager(db *database.Database, eventBus *event.EventBus) (*ChainManager, error) {
func NewManager(
db *database.Database,
eventBus *event.EventBus,
) (*ChainManager, error) {
cm := &ChainManager{
db: db,
eventBus: eventBus,
Expand Down Expand Up @@ -98,7 +101,9 @@ func (cm *ChainManager) NewChain(point ocommon.Point) (*Chain, error) {
}

// NewChainFromIntersect creates a new Chain that forks the primary chain at the latest common point.
func (cm *ChainManager) NewChainFromIntersect(points []ocommon.Point) (*Chain, error) {
func (cm *ChainManager) NewChainFromIntersect(
points []ocommon.Point,
) (*Chain, error) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
primaryChain := cm.PrimaryChain()
Expand Down Expand Up @@ -264,7 +269,11 @@ func (cm *ChainManager) loadPrimaryChain() error {
return nil
}

func (cm *ChainManager) addBlock(block database.Block, txn *database.Txn, persistent bool) error {
func (cm *ChainManager) addBlock(
block database.Block,
txn *database.Txn,
persistent bool,
) error {
if persistent {
// Add block to database
if err := cm.db.BlockCreate(block, txn); err != nil {
Expand Down Expand Up @@ -309,7 +318,10 @@ func (cm *ChainManager) removeBlockByIndex(blockIndex uint64) error {
return nil
}

func (cm *ChainManager) chainNeedsReconcile(chainId ChainId, lastCommonBlockIndex uint64) bool {
func (cm *ChainManager) chainNeedsReconcile(
chainId ChainId,
lastCommonBlockIndex uint64,
) bool {
events, ok := cm.chainRollbackEvents[chainId]
if !ok {
return false
Expand Down
10 changes: 8 additions & 2 deletions database/block_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ func (d *Database) DeleteBlockNoncesBeforeSlotWithoutCheckpoints(
txn *Txn,
) error {
if txn == nil {
return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(slotNumber, nil)
return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(
slotNumber,
nil,
)
}
return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(slotNumber, txn.Metadata())
return d.metadata.DeleteBlockNoncesBeforeSlotWithoutCheckpoints(
slotNumber,
txn.Metadata(),
)
}
55 changes: 44 additions & 11 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,49 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var DefaultConfig = &Config{
BlobCacheSize: 1073741824,
BlobPlugin: "badger",
DataDir: ".dingo",
MetadataPlugin: "sqlite",
}

// Config represents the configuration for a database instance
type Config struct {
BlobCacheSize int64
BlobPlugin string
DataDir string
MetadataPlugin string
Logger *slog.Logger
PromRegistry prometheus.Registerer
}

// Database represents our data storage services
type Database struct {
config *Config
logger *slog.Logger
blob blob.BlobStore
metadata metadata.MetadataStore
dataDir string
}

// Blob returns the underling blob store instance
func (d *Database) Blob() blob.BlobStore {
return d.blob
}

// Config returns the config object used for the database instance
func (d *Database) Config() *Config {
return d.config
}

// DataDir returns the path to the data directory used for storage
func (d *Database) DataDir() string {
return d.dataDir
return d.config.DataDir
}

// Logger returns the logger instance
func (d *Database) Logger() *slog.Logger {
return d.logger
return d.config.Logger
}

// Metadata returns the underlying metadata store instance
Expand Down Expand Up @@ -94,24 +116,35 @@ func (d *Database) init() error {

// New creates a new database instance with optional persistence using the provided data directory
func New(
logger *slog.Logger,
promRegistry prometheus.Registerer,
dataDir string,
badgerCacheSize int64,
config *Config,
) (*Database, error) {
metadataDb, err := metadata.New("sqlite", dataDir, logger, promRegistry)
if config == nil {
config = DefaultConfig
}
blobDb, err := blob.New(
config.BlobPlugin,
config.DataDir,
config.Logger,
config.PromRegistry,
config.BlobCacheSize,
)
if err != nil {
return nil, err
}
blobDb, err := blob.New("badger", dataDir, logger, promRegistry, badgerCacheSize)
metadataDb, err := metadata.New(
config.MetadataPlugin,
config.DataDir,
config.Logger,
config.PromRegistry,
)
if err != nil {
return nil, err
}
db := &Database{
logger: logger,
blob: blobDb,
metadata: metadataDb,
dataDir: dataDir,
logger: config.Logger,
config: config,
}
if err := db.init(); err != nil {
// Database is available for recovery, so return it with error
Expand Down
10 changes: 8 additions & 2 deletions database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ type TestTable struct {
gorm.Model
}

var dbConfig = &database.Config{
BlobCacheSize: 1 << 20,
Logger: nil,
PromRegistry: nil,
DataDir: "",
}

// TestInMemorySqliteMultipleTransaction tests that our sqlite connection allows multiple
// concurrent transactions when using in-memory mode. This requires special URI flags, and
// this is mostly making sure that we don't lose them
Expand All @@ -42,8 +49,7 @@ func TestInMemorySqliteMultipleTransaction(t *testing.T) {
}
return nil
}
const testCacheSize int64 = 1 << 20
db, err := database.New(nil, nil, "", testCacheSize) // in-memory
db, err := database.New(dbConfig)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion database/plugin/metadata/sqlite/block_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (d *MetadataStoreSqlite) GetBlockNonce(
) ([]byte, error) {
ret := models.BlockNonce{}
if txn != nil {
result := txn.Where("hash = ? AND slot = ?", blockHash, slotNumber).First(&ret)
result := txn.Where("hash = ? AND slot = ?", blockHash, slotNumber).
First(&ret)
if result.Error != nil {
if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
Expand Down
8 changes: 7 additions & 1 deletion internal/node/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ func Load(cfg *config.Config, logger *slog.Logger, immutableDir string) error {
)
}
// Load database
db, err := database.New(logger, nil, cfg.DatabasePath, cfg.BadgerCacheSize)
dbConfig := &database.Config{
BlobCacheSize: cfg.BadgerCacheSize,
DataDir: cfg.DatabasePath,
Logger: logger,
PromRegistry: nil,
}
db, err := database.New(dbConfig)
if err != nil {
return err
}
Expand Down
22 changes: 19 additions & 3 deletions ledger/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ func (ls *LedgerState) handleEventBlockfetch(evt event.Event) {

func (ls *LedgerState) handleEventChainsyncRollback(e ChainsyncEvent) error {
if ls.chainsyncState == SyncingChainsyncState {
ls.config.Logger.Warn(fmt.Sprintf("ledger: rolling back to %d.%s", e.Point.Slot, hex.EncodeToString(e.Point.Hash)))
ls.config.Logger.Warn(
fmt.Sprintf(
"ledger: rolling back to %d.%s",
e.Point.Slot,
hex.EncodeToString(e.Point.Hash),
),
)
ls.chainsyncState = RollbackChainsyncState
}
if err := ls.chain.Rollback(e.Point); err != nil {
Expand All @@ -107,7 +113,13 @@ func (ls *LedgerState) handleEventChainsyncRollback(e ChainsyncEvent) error {

func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error {
if ls.chainsyncState == RollbackChainsyncState {
ls.config.Logger.Info(fmt.Sprintf("ledger: switched to fork at %d.%s", e.Point.Slot, hex.EncodeToString(e.Point.Hash)))
ls.config.Logger.Info(
fmt.Sprintf(
"ledger: switched to fork at %d.%s",
e.Point.Slot,
hex.EncodeToString(e.Point.Hash),
),
)
ls.metrics.forks.Add(1)
}
ls.chainsyncState = SyncingChainsyncState
Expand Down Expand Up @@ -289,7 +301,11 @@ func (ls *LedgerState) calculateEpochNonce(
if err != nil {
return nil, fmt.Errorf("lookup block before slot: %w", err)
}
blockBeforeStabilityWindowNonce, err := ls.db.GetBlockNonce(blockBeforeStabilityWindow.Hash, blockBeforeStabilityWindow.Slot, txn)
blockBeforeStabilityWindowNonce, err := ls.db.GetBlockNonce(
blockBeforeStabilityWindow.Hash,
blockBeforeStabilityWindow.Slot,
txn,
)
if err != nil {
return nil, fmt.Errorf("lookup block nonce: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion ledger/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,11 @@ func (ls *LedgerState) loadTip() error {
ls.currentTip = tmpTip
// Load tip block and set cached block nonce
if ls.currentTip.Point.Slot > 0 {
tipNonce, err := ls.db.GetBlockNonce(tmpTip.Point.Hash, tmpTip.Point.Slot, nil)
tipNonce, err := ls.db.GetBlockNonce(
tmpTip.Point.Hash,
tmpTip.Point.Slot,
nil,
)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ func (n *Node) Run() error {
}
// Load database
dbNeedsRecovery := false
db, err := database.New(n.config.logger, n.config.promRegistry, n.config.dataDir, n.config.badgerCacheSize)
dbConfig := &database.Config{
BlobCacheSize: n.config.badgerCacheSize,
DataDir: n.config.dataDir,
Logger: n.config.logger,
PromRegistry: n.config.promRegistry,
}
db, err := database.New(dbConfig)
if db == nil {
n.config.logger.Error(
"failed to create database",
Expand Down
Loading