Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion state/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (ls *LedgerState) processTransactionCertificates(
blockPoint pcommon.Point,
tx lcommon.Transaction,
) error {
for _, tmpCert := range tx.Certificates() {
var tmpCert lcommon.Certificate
for _, tmpCert = range tx.Certificates() {
certDeposit, err := ls.currentEra.CertDepositFunc(
tmpCert,
ls.currentPParams,
Expand Down
25 changes: 17 additions & 8 deletions state/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math/big"

"github.com/blinklabs-io/dingo/database"
"github.com/blinklabs-io/dingo/state/eras"
"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/ledger"
Expand Down Expand Up @@ -98,20 +99,28 @@ func (ls *LedgerState) queryHardFork(
func (ls *LedgerState) queryHardForkEraHistory() (any, error) {
retData := []any{}
timespan := big.NewInt(0)
for _, era := range eras.Eras {
epochSlotLength, epochLength, err := era.EpochLengthFunc(
var epochs []database.Epoch
var era eras.EraDesc
var err error
var tmpStart, tmpEnd []any
var tmpEpoch database.Epoch
var tmpEra, tmpParams []any
var epochSlotLength, epochLength uint
var idx int
for _, era = range eras.Eras {
epochSlotLength, epochLength, err = era.EpochLengthFunc(
ls.config.CardanoNodeConfig,
)
if err != nil {
return nil, err
}
epochs, err := ls.db.GetEpochsByEra(era.Id, nil)
epochs, err = ls.db.GetEpochsByEra(era.Id, nil)
if err != nil {
return nil, err
}
tmpStart := []any{0, 0, 0}
tmpEnd := tmpStart
tmpParams := []any{
tmpStart = []any{0, 0, 0}
tmpEnd = tmpStart
tmpParams = []any{
epochLength,
epochSlotLength,
[]any{
Expand All @@ -121,7 +130,7 @@ func (ls *LedgerState) queryHardForkEraHistory() (any, error) {
},
0,
}
for idx, tmpEpoch := range epochs {
for idx, tmpEpoch = range epochs {
// Update era start
if idx == 0 {
tmpStart = []any{
Expand All @@ -148,7 +157,7 @@ func (ls *LedgerState) queryHardForkEraHistory() (any, error) {
}
}
}
tmpEra := []any{
tmpEra = []any{
tmpStart,
tmpEnd,
tmpParams,
Expand Down
50 changes: 31 additions & 19 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (ls *LedgerState) recoverCommitTimestampConflict() error {
if err != nil {
return err
}
for _, tmpBlock := range recentBlocks {
var tmpBlock database.Block
for _, tmpBlock = range recentBlocks {
blockPoint := ocommon.NewPoint(
tmpBlock.Slot,
tmpBlock.Hash,
Expand Down Expand Up @@ -241,8 +242,9 @@ func (ls *LedgerState) rollback(point ocommon.Point) error {
if err != nil {
return fmt.Errorf("query blocks: %w", err)
}
for _, tmpBlock := range tmpBlocks {
if err := ls.removeBlock(txn, tmpBlock); err != nil {
var tmpBlock database.Block
for _, tmpBlock = range tmpBlocks {
if err = ls.removeBlock(txn, tmpBlock); err != nil {
return fmt.Errorf("remove block: %w", err)
}
}
Expand All @@ -260,7 +262,8 @@ func (ls *LedgerState) rollback(point ocommon.Point) error {
)
}
// Update tip
recentBlocks, err := database.BlocksRecentTxn(txn, 1)
var recentBlocks []database.Block
recentBlocks, err = database.BlocksRecentTxn(txn, 1)
if err != nil {
return err
}
Expand All @@ -272,7 +275,7 @@ func (ls *LedgerState) rollback(point ocommon.Point) error {
),
BlockNumber: recentBlocks[0].Number,
}
if err := ls.db.SetTip(ls.currentTip, txn); err != nil {
if err = ls.db.SetTip(ls.currentTip, txn); err != nil {
return err
}
}
Expand Down Expand Up @@ -371,10 +374,15 @@ func (ls *LedgerState) ledgerProcessBlocks() {
shouldBlock := false
// We chose 500 as an arbitrary max batch size. A "chain extended" message will be logged after each batch
nextBatch := make([]*chain.ChainIteratorResult, 0, 500)
var next, nextRollback *chain.ChainIteratorResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next is used with append() in the loop, but I think this won't actually be a problem. We should be storing a copy of the pointer value in the list, which won't change when the original pointer changes.

var tmpBlock ledger.Block
var needsRollback bool
var end, i int
var txn *database.Txn
for {
// Gather up next batch of blocks
for {
next, err := iter.Next(shouldBlock)
next, err = iter.Next(shouldBlock)
shouldBlock = false
if err != nil {
if !errors.Is(err, chain.ErrIteratorChainTip) {
Expand Down Expand Up @@ -402,15 +410,15 @@ func (ls *LedgerState) ledgerProcessBlocks() {
}
}
// Process batch in groups of 50 to stay under DB txn limits
needsRollback := false
for i := 0; i < len(nextBatch); i += 50 {
needsRollback = false
for i = 0; i < len(nextBatch); i += 50 {
ls.Lock()
end := min(
end = min(
len(nextBatch),
i+50,
)
txn := ls.db.Transaction(true)
err := txn.Do(func(txn *database.Txn) error {
txn = ls.db.Transaction(true)
err = txn.Do(func(txn *database.Txn) error {
for _, next := range nextBatch[i:end] {
// Rollbacks need to be handled outside of the batch DB transaction
// A rollback should only occur at the end of a batch
Expand All @@ -419,11 +427,11 @@ func (ls *LedgerState) ledgerProcessBlocks() {
return nil
}
// Process block
tmpBlock, err := next.Block.Decode()
tmpBlock, err = next.Block.Decode()
if err != nil {
return err
}
if err := ls.ledgerProcessBlock(txn, next.Point, tmpBlock, next.Block.Nonce); err != nil {
if err = ls.ledgerProcessBlock(txn, next.Point, tmpBlock, next.Block.Nonce); err != nil {
return err
}
}
Expand All @@ -442,9 +450,9 @@ func (ls *LedgerState) ledgerProcessBlocks() {
if needsRollback {
needsRollback = false
// The rollback should be at the end of the batch
nextRollback := nextBatch[len(nextBatch)-1]
nextRollback = nextBatch[len(nextBatch)-1]
ls.Lock()
if err := ls.rollback(nextRollback.Point); err != nil {
if err = ls.rollback(nextRollback.Point); err != nil {
ls.Unlock()
ls.config.Logger.Error(
"failed to process rollback: " + err.Error(),
Expand Down Expand Up @@ -603,7 +611,8 @@ func (ls *LedgerState) RecentChainPoints(count int) ([]ocommon.Point, error) {
return nil, err
}
ret := []ocommon.Point{}
for _, tmpBlock := range tmpBlocks {
var tmpBlock database.Block
for _, tmpBlock = range tmpBlocks {
ret = append(
ret,
ocommon.NewPoint(tmpBlock.Slot, tmpBlock.Hash),
Expand All @@ -618,9 +627,11 @@ func (ls *LedgerState) GetIntersectPoint(
) (*ocommon.Point, error) {
tip := ls.Tip()
var ret ocommon.Point
var tmpBlock database.Block
var err error
foundOrigin := false
txn := ls.db.Transaction(false)
err := txn.Do(func(txn *database.Txn) error {
err = txn.Do(func(txn *database.Txn) error {
for _, point := range points {
// Ignore points with a slot later than our current tip
if point.Slot > tip.Point.Slot {
Expand All @@ -636,7 +647,7 @@ func (ls *LedgerState) GetIntersectPoint(
continue
}
// Lookup block in metadata DB
tmpBlock, err := database.BlockByPoint(ls.db, point)
tmpBlock, err = database.BlockByPoint(ls.db, point)
if err != nil {
if errors.Is(err, database.ErrBlockNotFound) {
continue
Expand Down Expand Up @@ -694,8 +705,9 @@ func (ls *LedgerState) UtxosByAddress(
if err != nil {
return ret, err
}
var tmpUtxo database.Utxo
for _, utxo := range utxos {
tmpUtxo := database.Utxo(utxo)
tmpUtxo = database.Utxo(utxo)
ret = append(ret, tmpUtxo)
}
return ret, nil
Expand Down
Loading