Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cl/phase1/execution_client/execution_client_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ func (cc *ExecutionClientDirect) SupportInsertion() bool {
return true
}

func (cc *ExecutionClientDirect) InsertBlocks(blks []*types.Block) error {
return cc.chainRW.InsertBlocksAndWait(blks)
func (cc *ExecutionClientDirect) InsertBlocks(blks []*types.Block, wait bool) error {
if !wait {
return cc.chainRW.InsertBlocksAndWait(blks)
}
return cc.chainRW.InsertBlocks(blks)
}

func (cc *ExecutionClientDirect) InsertBlock(blk *types.Block) error {
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/execution_client/execution_client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (cc *ExecutionClientRpc) SupportInsertion() bool {
return false
}

func (cc *ExecutionClientRpc) InsertBlocks([]*types.Block) error {
func (cc *ExecutionClientRpc) InsertBlocks([]*types.Block, bool) error {
panic("unimplemented")
}

Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/execution_client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ExecutionEngine interface {
NewPayload(payload *cltypes.Eth1Block, beaconParentRoot *libcommon.Hash, versionedHashes []libcommon.Hash) (bool, error)
ForkChoiceUpdate(finalized libcommon.Hash, head libcommon.Hash) error
SupportInsertion() bool
InsertBlocks([]*types.Block) error
InsertBlocks([]*types.Block, bool) error
InsertBlock(*types.Block) error
IsCanonicalHash(libcommon.Hash) (bool, error)
Ready() (bool, error)
Expand Down
129 changes: 106 additions & 23 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/cl/antiquary"
"github.com/ledgerwatch/erigon/cl/beacon/beaconevents"
"github.com/ledgerwatch/erigon/cl/beacon/synced_data"
Expand Down Expand Up @@ -51,6 +53,7 @@ type Cfg struct {
antiquary *antiquary.Antiquary
syncedData *synced_data.SyncedDataManager
emitter *beaconevents.Emitters
prebuffer *etl.Collector
gossipSource persistence.BlockSource

hasDownloaded, backfilling bool
Expand Down Expand Up @@ -99,6 +102,7 @@ func ClStagesCfg(
backfilling: backfilling,
syncedData: syncedData,
emitter: emitters,
prebuffer: etl.NewCollector("Caplin-blocks", tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()),
gossipSource: gossipSource,
}
}
Expand Down Expand Up @@ -239,7 +243,7 @@ func ConsensusClStages(ctx context.Context,
startingSlot := cfg.state.LatestBlockHeader().Slot
downloader := network2.NewBackwardBeaconDownloader(context.Background(), cfg.rpc, cfg.indiciesDB)

if err := SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.antiquary, cfg.sn, cfg.indiciesDB, cfg.executionClient, cfg.genesisCfg, cfg.beaconCfg, cfg.backfilling, false, startingRoot, startingSlot, cfg.tmpdir, 600*time.Millisecond, logger), context.Background(), logger); err != nil {
if err := SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.antiquary, cfg.sn, cfg.indiciesDB, cfg.executionClient, cfg.genesisCfg, cfg.beaconCfg, cfg.backfilling, false, startingRoot, startingSlot, cfg.tmpdir, 600*time.Millisecond, cfg.prebuffer, logger), context.Background(), logger); err != nil {
cfg.hasDownloaded = false
return err
}
Expand Down Expand Up @@ -271,27 +275,33 @@ func ConsensusClStages(ctx context.Context,
downloader.SetHighestProcessedRoot(finalizedCheckpoint.BlockRoot())
downloader.SetHighestProcessedSlot(currentSlot.Load())
downloader.SetProcessFunction(func(highestSlotProcessed uint64, highestBlockRootProcessed common.Hash, blocks []*cltypes.SignedBeaconBlock) (newHighestSlotProcessed uint64, newHighestBlockRootProcessed common.Hash, err error) {
blockBatch := []*types.Block{}
for _, block := range blocks {

if err := processBlock(tx, block, false, true); err != nil {
log.Warn("bad blocks segment received", "err", err)
return highestSlotProcessed, highestBlockRootProcessed, err
}
if shouldInsert && block.Version() >= clparams.BellatrixVersion {
if cfg.prebuffer == nil {
cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root())
cfg.prebuffer.LogLvl(log.LvlDebug)
}
executionPayload := block.Block.Body.ExecutionPayload
body := executionPayload.Body()
txs, err := types.DecodeTransactions(body.Transactions)
executionPayloadRoot, err := executionPayload.HashSSZ()
if err != nil {
log.Warn("bad blocks segment received", "err", err)
return highestSlotProcessed, highestBlockRootProcessed, err
}
parentRoot := &block.Block.ParentRoot
header, err := executionPayload.RlpHeader(parentRoot)
versionByte := byte(block.Version())
enc, err := executionPayload.EncodeSSZ(nil)
if err != nil {
log.Warn("bad blocks segment received", "err", err)
return highestSlotProcessed, highestBlockRootProcessed, err
}
blockBatch = append(blockBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals))
}
if err := processBlock(tx, block, false, true); err != nil {
log.Warn("bad blocks segment received", "err", err)
return highestSlotProcessed, highestBlockRootProcessed, err
enc = append([]byte{versionByte}, append(block.Block.ParentRoot[:], enc...)...)
enc = utils.CompressSnappy(enc)

if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(executionPayload.BlockNumber, executionPayloadRoot), enc); err != nil {
return highestSlotProcessed, highestBlockRootProcessed, err
}
}

if highestSlotProcessed < block.Block.Slot {
Expand All @@ -303,11 +313,7 @@ func ConsensusClStages(ctx context.Context,
}
}
}
if shouldInsert {
if err := cfg.executionClient.InsertBlocks(blockBatch); err != nil {
log.Warn("failed to insert blocks", "err", err)
}
}

return highestSlotProcessed, highestBlockRootProcessed, nil
})
chainTipSlot := utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot)
Expand Down Expand Up @@ -347,6 +353,88 @@ func ConsensusClStages(ctx context.Context,
},
ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error {
totalRequest := args.targetSlot - args.seenSlot
readyTimeout := time.NewTimer(10 * time.Second)
readyInterval := time.NewTimer(50 * time.Millisecond)
defer readyTimeout.Stop()
defer readyInterval.Stop()
if cfg.executionClient != nil {
ReadyLoop:
for { // if the client does not support insertion, then skip
select {
case <-ctx.Done():
return ctx.Err()
case <-readyTimeout.C:
time.Sleep(10 * time.Second)
return nil
case <-readyInterval.C:
ready, err := cfg.executionClient.Ready()
if err != nil {
return err
}
if ready {
break ReadyLoop
}
}
}
}

tx, err := cfg.indiciesDB.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()

blocksBatch := []*types.Block{}
blocksBatchLimit := 10_000
if cfg.executionClient != nil && cfg.prebuffer != nil && cfg.executionClient.SupportInsertion() {
if err := cfg.prebuffer.Load(tx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if len(v) == 0 {
return nil
}
v, err = utils.DecompressSnappy(v)
if err != nil {
return err
}
version := clparams.StateVersion(v[0])
parentRoot := common.BytesToHash(v[1:33])
v = v[33:]
executionPayload := cltypes.NewEth1Block(version, cfg.beaconCfg)
if err := executionPayload.DecodeSSZ(v, int(version)); err != nil {
return err
}
body := executionPayload.Body()
txs, err := types.DecodeTransactions(body.Transactions)
if err != nil {
log.Warn("bad blocks segment received", "err", err)
return err
}
header, err := executionPayload.RlpHeader(&parentRoot)
if err != nil {
log.Warn("bad blocks segment received", "err", err)
return err
}
blocksBatch = append(blocksBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals))
if len(blocksBatch) >= blocksBatchLimit {
if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil {
logger.Warn("failed to insert blocks", "err", err)
}
logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64())
blocksBatch = []*types.Block{}
}
return next(k, nil, nil)
}, etl.TransformArgs{}); err != nil {
return err
}
if len(blocksBatch) > 0 {
if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil {
logger.Warn("failed to insert blocks", "err", err)
}
}
cfg.prebuffer.Close()
cfg.prebuffer = nil

}

logger.Debug("waiting for blocks...",
"seenSlot", args.seenSlot,
"targetSlot", args.targetSlot,
Expand Down Expand Up @@ -423,11 +511,6 @@ func ConsensusClStages(ctx context.Context,
}
}(v)
}
tx, err := cfg.indiciesDB.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
logTimer := time.NewTicker(30 * time.Second)
defer logTimer.Stop()
MainLoop:
Expand Down
96 changes: 23 additions & 73 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/network"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"

"github.com/ledgerwatch/erigon/cl/clparams"
Expand All @@ -26,25 +25,26 @@ import (
)

type StageHistoryReconstructionCfg struct {
genesisCfg *clparams.GenesisConfig
beaconCfg *clparams.BeaconChainConfig
downloader *network.BackwardBeaconDownloader
sn *freezeblocks.CaplinSnapshots
startingRoot libcommon.Hash
backfilling bool
waitForAllRoutines bool
startingSlot uint64
tmpdir string
indiciesDB kv.RwDB
engine execution_client.ExecutionEngine
antiquary *antiquary.Antiquary
logger log.Logger
backfillingThrottling time.Duration
genesisCfg *clparams.GenesisConfig
beaconCfg *clparams.BeaconChainConfig
downloader *network.BackwardBeaconDownloader
sn *freezeblocks.CaplinSnapshots
startingRoot libcommon.Hash
backfilling bool
waitForAllRoutines bool
startingSlot uint64
tmpdir string
indiciesDB kv.RwDB
engine execution_client.ExecutionEngine
antiquary *antiquary.Antiquary
logger log.Logger
executionBlocksCollector *etl.Collector
backfillingThrottling time.Duration
}

const logIntervalTime = 30 * time.Second

func StageHistoryReconstruction(downloader *network.BackwardBeaconDownloader, antiquary *antiquary.Antiquary, sn *freezeblocks.CaplinSnapshots, indiciesDB kv.RwDB, engine execution_client.ExecutionEngine, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, backfilling, waitForAllRoutines bool, startingRoot libcommon.Hash, startinSlot uint64, tmpdir string, backfillingThrottling time.Duration, logger log.Logger) StageHistoryReconstructionCfg {
func StageHistoryReconstruction(downloader *network.BackwardBeaconDownloader, antiquary *antiquary.Antiquary, sn *freezeblocks.CaplinSnapshots, indiciesDB kv.RwDB, engine execution_client.ExecutionEngine, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, backfilling, waitForAllRoutines bool, startingRoot libcommon.Hash, startinSlot uint64, tmpdir string, backfillingThrottling time.Duration, executionBlocksCollector *etl.Collector, logger log.Logger) StageHistoryReconstructionCfg {
return StageHistoryReconstructionCfg{
genesisCfg: genesisCfg,
beaconCfg: beaconCfg,
Expand Down Expand Up @@ -72,9 +72,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
if !clparams.SupportBackfilling(cfg.beaconCfg.DepositNetworkID) {
cfg.backfilling = false // disable backfilling if not on a supported network
}
executionBlocksCollector := etl.NewCollector("HistoryDownload", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger)
defer executionBlocksCollector.Close()
executionBlocksCollector.LogLvl(log.LvlDebug)

// Start the procedure
logger.Info("Starting downloading History", "from", currentSlot)
// Setup slot and block root
Expand Down Expand Up @@ -111,13 +109,17 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}
if !foundLatestEth1ValidBlock.Load() && blk.Version() >= clparams.BellatrixVersion {
payload := blk.Block.Body.ExecutionPayload
payloadRoot, err := payload.HashSSZ()
if err != nil {
return false, fmt.Errorf("error hashing execution payload during download: %s", err)
}
encodedPayload, err := payload.EncodeSSZ(nil)
if err != nil {
return false, fmt.Errorf("error encoding execution payload during download: %s", err)
}
// Use snappy compression that the temporary files do not take too much disk.
encodedPayload = utils.CompressSnappy(append(encodedPayload, append(blk.Block.ParentRoot[:], byte(blk.Version()))...))
if err := executionBlocksCollector.Collect(dbutils.BlockBodyKey(payload.BlockNumber, payload.BlockHash), encodedPayload); err != nil {
encodedPayload = utils.CompressSnappy(append([]byte{byte(blk.Version())}, append(blk.Block.ParentRoot[:], encodedPayload...)...))
if err := cfg.executionBlocksCollector.Collect(dbutils.BlockBodyKey(payload.BlockNumber, payloadRoot), encodedPayload); err != nil {
return false, fmt.Errorf("error collecting execution payload during download: %s", err)
}
if currEth1Progress.Load()%100 == 0 {
Expand Down Expand Up @@ -222,59 +224,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}
defer tx2.Rollback()

blockBatch := []*types.Block{}
blockBatchMaxSize := 1000

cfg.logger.Info("Ready to insert history, waiting for sync cycle to finish")

if err := executionBlocksCollector.Load(tx2, kv.Headers, func(k, vComp []byte, _ etl.CurrentTableReader, next etl.LoadNextFunc) error {
if cfg.engine == nil || !cfg.engine.SupportInsertion() {
return next(k, nil, nil)
}
var err error
var v []byte
if v, err = utils.DecompressSnappy(vComp); err != nil {
return fmt.Errorf("error decompressing dump during collection: %s", err)
}

version := clparams.StateVersion(v[len(v)-1])
parentRoot := libcommon.BytesToHash(v[len(v)-1-32 : len(v)-1])

executionPayload := cltypes.NewEth1Block(version, cfg.beaconCfg)
if err := executionPayload.DecodeSSZ(v[:len(v)-1-32], int(version)); err != nil {
return fmt.Errorf("error decoding execution payload during collection: %s", err)
}
if executionPayload.BlockNumber%10000 == 0 {
cfg.logger.Info("Inserting execution payload", "blockNumber", executionPayload.BlockNumber)
}
body := executionPayload.Body()

header, err := executionPayload.RlpHeader(&parentRoot)
if err != nil {
return fmt.Errorf("error parsing rlp header during collection: %s", err)
}

txs, err := types.DecodeTransactions(body.Transactions)
if err != nil {
return err
}

block := types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals)
blockBatch = append(blockBatch, block)
if len(blockBatch) >= blockBatchMaxSize {
if err := cfg.engine.InsertBlocks(blockBatch); err != nil {
return fmt.Errorf("error inserting block during collection: %s", err)
}
blockBatch = blockBatch[:0]
}
return next(k, nil, nil)
}, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}
if cfg.engine != nil && cfg.engine.SupportInsertion() {
if err := cfg.engine.InsertBlocks(blockBatch); err != nil {
return fmt.Errorf("error doing last block insertion during collection: %s", err)
}
}
return nil
}
2 changes: 1 addition & 1 deletion cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (c *Chain) Run(ctx *Context) error {
}

downloader := network.NewBackwardBeaconDownloader(ctx, beacon, db)
cfg := stages.StageHistoryReconstruction(downloader, antiquary.NewAntiquary(ctx, nil, nil, nil, dirs, nil, nil, nil, nil, nil, false, false, nil), csn, db, nil, genesisConfig, beaconConfig, true, true, bRoot, bs.Slot(), "/tmp", 300*time.Millisecond, log.Root())
cfg := stages.StageHistoryReconstruction(downloader, antiquary.NewAntiquary(ctx, nil, nil, nil, dirs, nil, nil, nil, nil, nil, false, false, nil), csn, db, nil, genesisConfig, beaconConfig, true, true, bRoot, bs.Slot(), "/tmp", 300*time.Millisecond, nil, log.Root())
return stages.SpawnStageHistoryDownload(cfg, ctx, log.Root())
}

Expand Down
Loading