From c60a8b7be3fdde1d1bb4179fec532924e05d0442 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 16:30:11 +0100 Subject: [PATCH 01/10] save --- .../execution_client_direct.go | 7 +- .../execution_client/execution_client_rpc.go | 2 +- cl/phase1/execution_client/interface.go | 2 +- cl/phase1/stages/clstages.go | 93 +++++++++++++++++-- cl/phase1/stages/stage_history_download.go | 4 +- .../eth1/eth1_chain_reader.go/chain_reader.go | 21 ++++- 6 files changed, 115 insertions(+), 14 deletions(-) diff --git a/cl/phase1/execution_client/execution_client_direct.go b/cl/phase1/execution_client/execution_client_direct.go index 6ba1d6d7438..b5bf109cc46 100644 --- a/cl/phase1/execution_client/execution_client_direct.go +++ b/cl/phase1/execution_client/execution_client_direct.go @@ -70,8 +70,11 @@ func (cc *ExecutionClientDirect) SupportInsertion() bool { return true } -func (cc *ExecutionClientDirect) InsertBlocks(blks []*types.Block) error { - return cc.chainRW.InsertBlocksAndWait(blks) +func (cc *ExecutionClientDirect) InsertBlocks(blks []*types.Block, wait bool) error { + if !wait { + return cc.chainRW.InsertBlocksAndWait(blks) + } + return cc.chainRW.InsertBlocks(blks) } func (cc *ExecutionClientDirect) InsertBlock(blk *types.Block) error { diff --git a/cl/phase1/execution_client/execution_client_rpc.go b/cl/phase1/execution_client/execution_client_rpc.go index 137eab6f1cf..859c0249a18 100644 --- a/cl/phase1/execution_client/execution_client_rpc.go +++ b/cl/phase1/execution_client/execution_client_rpc.go @@ -174,7 +174,7 @@ func (cc *ExecutionClientRpc) SupportInsertion() bool { return false } -func (cc *ExecutionClientRpc) InsertBlocks([]*types.Block) error { +func (cc *ExecutionClientRpc) InsertBlocks([]*types.Block, bool) error { panic("unimplemented") } diff --git a/cl/phase1/execution_client/interface.go b/cl/phase1/execution_client/interface.go index e8879fd6fe5..e9f8e25423e 100644 --- a/cl/phase1/execution_client/interface.go +++ b/cl/phase1/execution_client/interface.go @@ -15,7 +15,7 @@ type ExecutionEngine interface { NewPayload(payload *cltypes.Eth1Block, beaconParentRoot *libcommon.Hash, versionedHashes []libcommon.Hash) (bool, error) ForkChoiceUpdate(finalized libcommon.Hash, head libcommon.Hash) error SupportInsertion() bool - InsertBlocks([]*types.Block) error + InsertBlocks([]*types.Block, bool) error InsertBlock(*types.Block) error IsCanonicalHash(libcommon.Hash) (bool, error) Ready() (bool, error) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index d8481f7cb04..a1d8967e3d0 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -1,6 +1,7 @@ package stages import ( + "bytes" "context" "errors" "fmt" @@ -11,7 +12,9 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" "github.com/ledgerwatch/erigon/cl/antiquary" "github.com/ledgerwatch/erigon/cl/beacon/beaconevents" "github.com/ledgerwatch/erigon/cl/beacon/synced_data" @@ -27,6 +30,7 @@ import ( "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" network2 "github.com/ledgerwatch/erigon/cl/phase1/network" @@ -52,6 +56,7 @@ type Cfg struct { antiquary *antiquary.Antiquary syncedData *synced_data.SyncedDataManager emitter *beaconevents.Emitters + prebuffer *etl.Collector hasDownloaded, backfilling bool } @@ -100,6 +105,7 @@ func ClStagesCfg( backfilling: backfilling, syncedData: syncedData, emitter: emitters, + prebuffer: etl.NewCollector("Caplin-blocks", tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()), } } @@ -305,8 +311,20 @@ func ConsensusClStages(ctx context.Context, } } if shouldInsert { - if err := cfg.executionClient.InsertBlocks(blockBatch); err != nil { - log.Warn("failed to insert blocks", "err", err) + if cfg.prebuffer == nil { + cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) + } + var buf bytes.Buffer + // prebuffer the blocks + for _, block := range blockBatch { + buf.Reset() + if err := block.EncodeRLP(&buf); err != nil { + logger.Warn("failed to encode block", "err", err) + continue + } + if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(block.NumberU64(), block.Hash()), common.Copy(buf.Bytes())); err != nil { + return highestSlotProcessed, highestBlockRootProcessed, err + } } } return highestSlotProcessed, highestBlockRootProcessed, nil @@ -345,6 +363,72 @@ func ConsensusClStages(ctx context.Context, }, ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { totalRequest := args.targetSlot - args.seenSlot + readyTimeout := time.NewTimer(10 * time.Second) + readyInterval := time.NewTimer(50 * time.Millisecond) + defer readyTimeout.Stop() + ReadyLoop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-readyTimeout.C: + time.Sleep(10 * time.Second) + return nil + case <-readyInterval.C: + ready, err := cfg.executionClient.Ready() + if err != nil { + return err + } + if ready { + break ReadyLoop + } + } + } + + tx, err := cfg.indiciesDB.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + var b bytes.Buffer + + blocksBatch := []*types.Block{} + blocksBatchLimit := 1000 + if cfg.prebuffer != nil && cfg.executionClient.SupportInsertion() { + if err := cfg.prebuffer.Load(tx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + if len(v) == 0 { + return nil + } + b.Reset() + if _, err := b.Write(v); err != nil { + return err + } + stream := rlp.NewStream(&b, 0) + + block := &types.Block{} + if err := block.DecodeRLP(stream); err != nil { + return err + } + blocksBatch = append(blocksBatch, block) + if len(blocksBatch) >= blocksBatchLimit { + if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil { + logger.Warn("failed to insert blocks", "err", err) + } + blocksBatch = blocksBatch[:0] + } + return next(k, nil, nil) + }, etl.TransformArgs{}); err != nil { + return err + } + if len(blocksBatch) > 0 { + if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil { + logger.Warn("failed to insert blocks", "err", err) + } + } + cfg.prebuffer.Close() + cfg.prebuffer = nil + } + logger.Debug("waiting for blocks...", "seenSlot", args.seenSlot, "targetSlot", args.targetSlot, @@ -403,11 +487,6 @@ func ConsensusClStages(ctx context.Context, respCh <- blocks }(v) } - tx, err := cfg.indiciesDB.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() logTimer := time.NewTicker(30 * time.Second) defer logTimer.Stop() diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index 43269d850a4..be2442de6e6 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -264,7 +264,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co block := types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals) blockBatch = append(blockBatch, block) if len(blockBatch) >= blockBatchMaxSize { - if err := cfg.engine.InsertBlocks(blockBatch); err != nil { + if err := cfg.engine.InsertBlocks(blockBatch, true); err != nil { return fmt.Errorf("error inserting block during collection: %s", err) } blockBatch = blockBatch[:0] @@ -274,7 +274,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co return err } if cfg.engine != nil && cfg.engine.SupportInsertion() { - if err := cfg.engine.InsertBlocks(blockBatch); err != nil { + if err := cfg.engine.InsertBlocks(blockBatch, true); err != nil { return fmt.Errorf("error doing last block insertion during collection: %s", err) } } diff --git a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go index 0621dcd814f..84c91ed1233 100644 --- a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go +++ b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go @@ -265,6 +265,7 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(blocks []*types.Block) error } retryInterval := time.NewTicker(retryTimeout) defer retryInterval.Stop() + for response.Result == execution.ExecutionStatus_Busy { select { case <-retryInterval.C: @@ -273,7 +274,7 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(blocks []*types.Block) error return err } case <-c.ctx.Done(): - return context.Canceled + return c.ctx.Err() } } if response.Result != execution.ExecutionStatus_Success { @@ -282,6 +283,24 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(blocks []*types.Block) error return nil } +func (c ChainReaderWriterEth1) InsertBlocks(blocks []*types.Block) error { + request := &execution.InsertBlocksRequest{ + Blocks: eth1_utils.ConvertBlocksToRPC(blocks), + } + response, err := c.executionModule.InsertBlocks(c.ctx, request) + if err != nil { + return err + } + + if response.Result == execution.ExecutionStatus_Busy { + return context.DeadlineExceeded + } + if response.Result != execution.ExecutionStatus_Success { + return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String()) + } + return nil +} + func (c ChainReaderWriterEth1) InsertBlockAndWait(block *types.Block) error { blocks := []*types.Block{block} request := &execution.InsertBlocksRequest{ From 4300b5e73fe2ff26c82b3d0d8f71b2ec70f1ba4c Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 16:36:50 +0100 Subject: [PATCH 02/10] fixed crash --- cl/phase1/stages/clstages.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index a1d8967e3d0..32d272df788 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -366,21 +366,24 @@ func ConsensusClStages(ctx context.Context, readyTimeout := time.NewTimer(10 * time.Second) readyInterval := time.NewTimer(50 * time.Millisecond) defer readyTimeout.Stop() - ReadyLoop: - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-readyTimeout.C: - time.Sleep(10 * time.Second) - return nil - case <-readyInterval.C: - ready, err := cfg.executionClient.Ready() - if err != nil { - return err - } - if ready { - break ReadyLoop + defer readyInterval.Stop() + if cfg.executionClient != nil { + ReadyLoop: + for { // if the client does not support insertion, then skip + select { + case <-ctx.Done(): + return ctx.Err() + case <-readyTimeout.C: + time.Sleep(10 * time.Second) + return nil + case <-readyInterval.C: + ready, err := cfg.executionClient.Ready() + if err != nil { + return err + } + if ready { + break ReadyLoop + } } } } @@ -394,7 +397,7 @@ func ConsensusClStages(ctx context.Context, blocksBatch := []*types.Block{} blocksBatchLimit := 1000 - if cfg.prebuffer != nil && cfg.executionClient.SupportInsertion() { + if cfg.executionClient != nil && cfg.prebuffer != nil && cfg.executionClient.SupportInsertion() { if err := cfg.prebuffer.Load(tx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if len(v) == 0 { return nil From 92bc13c26fee011e40ffb826c5d4693857593803 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:24:33 +0100 Subject: [PATCH 03/10] fixed crash --- cl/phase1/stages/clstages.go | 80 +++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 32d272df788..170e2800e35 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -30,7 +30,6 @@ import ( "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" network2 "github.com/ledgerwatch/erigon/cl/phase1/network" @@ -278,27 +277,40 @@ func ConsensusClStages(ctx context.Context, downloader.SetHighestProcessedRoot(finalizedCheckpoint.BlockRoot()) downloader.SetHighestProcessedSlot(currentSlot.Load()) downloader.SetProcessFunction(func(highestSlotProcessed uint64, highestBlockRootProcessed common.Hash, blocks []*cltypes.SignedBeaconBlock) (newHighestSlotProcessed uint64, newHighestBlockRootProcessed common.Hash, err error) { - blockBatch := []*types.Block{} for _, block := range blocks { + + if err := processBlock(tx, block, false, true); err != nil { + log.Warn("bad blocks segment received", "err", err) + return highestSlotProcessed, highestBlockRootProcessed, err + } if shouldInsert && block.Version() >= clparams.BellatrixVersion { executionPayload := block.Block.Body.ExecutionPayload - body := executionPayload.Body() - txs, err := types.DecodeTransactions(body.Transactions) + executionPayloadRoot, err := executionPayload.HashSSZ() if err != nil { - log.Warn("bad blocks segment received", "err", err) return highestSlotProcessed, highestBlockRootProcessed, err } - parentRoot := &block.Block.ParentRoot - header, err := executionPayload.RlpHeader(parentRoot) + versionByte := byte(block.Version()) + enc, err := executionPayload.EncodeSSZ(nil) if err != nil { - log.Warn("bad blocks segment received", "err", err) return highestSlotProcessed, highestBlockRootProcessed, err } - blockBatch = append(blockBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals)) - } - if err := processBlock(tx, block, false, true); err != nil { - log.Warn("bad blocks segment received", "err", err) - return highestSlotProcessed, highestBlockRootProcessed, err + enc = append([]byte{versionByte}, append(block.Block.ParentRoot[:], enc...)...) + // body := executionPayload.Body() + // txs, err := types.DecodeTransactions(body.Transactions) + // if err != nil { + // log.Warn("bad blocks segment received", "err", err) + // return highestSlotProcessed, highestBlockRootProcessed, err + // } + // parentRoot := &block.Block.ParentRoot + // header, err := executionPayload.RlpHeader(parentRoot) + // if err != nil { + // log.Warn("bad blocks segment received", "err", err) + // return highestSlotProcessed, highestBlockRootProcessed, err + // } + // blockBatch = append(blockBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals)) + if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(executionPayload.BlockNumber, executionPayloadRoot), enc); err != nil { + return highestSlotProcessed, highestBlockRootProcessed, err + } } if highestSlotProcessed < block.Block.Slot { @@ -310,23 +322,7 @@ func ConsensusClStages(ctx context.Context, } } } - if shouldInsert { - if cfg.prebuffer == nil { - cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) - } - var buf bytes.Buffer - // prebuffer the blocks - for _, block := range blockBatch { - buf.Reset() - if err := block.EncodeRLP(&buf); err != nil { - logger.Warn("failed to encode block", "err", err) - continue - } - if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(block.NumberU64(), block.Hash()), common.Copy(buf.Bytes())); err != nil { - return highestSlotProcessed, highestBlockRootProcessed, err - } - } - } + return highestSlotProcessed, highestBlockRootProcessed, nil }) chainTipSlot := utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) @@ -402,22 +398,30 @@ func ConsensusClStages(ctx context.Context, if len(v) == 0 { return nil } - b.Reset() - if _, err := b.Write(v); err != nil { + version := clparams.StateVersion(v[0]) + parentRoot := common.BytesToHash(v[1:33]) + v = v[33:] + executionPayload := cltypes.NewEth1Block(version, cfg.beaconCfg) + if err := executionPayload.DecodeSSZ(v, int(version)); err != nil { return err } - stream := rlp.NewStream(&b, 0) - - block := &types.Block{} - if err := block.DecodeRLP(stream); err != nil { + body := executionPayload.Body() + txs, err := types.DecodeTransactions(body.Transactions) + if err != nil { + log.Warn("bad blocks segment received", "err", err) + return err + } + header, err := executionPayload.RlpHeader(&parentRoot) + if err != nil { + log.Warn("bad blocks segment received", "err", err) return err } - blocksBatch = append(blocksBatch, block) + blocksBatch = append(blocksBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals)) if len(blocksBatch) >= blocksBatchLimit { if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil { logger.Warn("failed to insert blocks", "err", err) } - blocksBatch = blocksBatch[:0] + blocksBatch = []*types.Block{} } return next(k, nil, nil) }, etl.TransformArgs{}); err != nil { From ef8e512c691d95e1dc612333fe12e93d3f1824b0 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:25:07 +0100 Subject: [PATCH 04/10] fixed crash --- cl/phase1/stages/clstages.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 170e2800e35..99ce7eecdcd 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -1,7 +1,6 @@ package stages import ( - "bytes" "context" "errors" "fmt" @@ -389,7 +388,6 @@ func ConsensusClStages(ctx context.Context, return err } defer tx.Rollback() - var b bytes.Buffer blocksBatch := []*types.Block{} blocksBatchLimit := 1000 From 2b5bf641cfbfbaee7fee38edc65ff2af5237d9d5 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:26:39 +0100 Subject: [PATCH 05/10] fixed crash --- cl/phase1/stages/clstages.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 99ce7eecdcd..30bdd3bdd38 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -283,6 +283,9 @@ func ConsensusClStages(ctx context.Context, return highestSlotProcessed, highestBlockRootProcessed, err } if shouldInsert && block.Version() >= clparams.BellatrixVersion { + if cfg.prebuffer == nil { + cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) + } executionPayload := block.Block.Body.ExecutionPayload executionPayloadRoot, err := executionPayload.HashSSZ() if err != nil { @@ -294,19 +297,7 @@ func ConsensusClStages(ctx context.Context, return highestSlotProcessed, highestBlockRootProcessed, err } enc = append([]byte{versionByte}, append(block.Block.ParentRoot[:], enc...)...) - // body := executionPayload.Body() - // txs, err := types.DecodeTransactions(body.Transactions) - // if err != nil { - // log.Warn("bad blocks segment received", "err", err) - // return highestSlotProcessed, highestBlockRootProcessed, err - // } - // parentRoot := &block.Block.ParentRoot - // header, err := executionPayload.RlpHeader(parentRoot) - // if err != nil { - // log.Warn("bad blocks segment received", "err", err) - // return highestSlotProcessed, highestBlockRootProcessed, err - // } - // blockBatch = append(blockBatch, types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals)) + if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(executionPayload.BlockNumber, executionPayloadRoot), enc); err != nil { return highestSlotProcessed, highestBlockRootProcessed, err } From 952fedf290b18d0da3a4b69e1237cb5f22f3fd41 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:30:47 +0100 Subject: [PATCH 06/10] fixed crash --- cl/phase1/stages/clstages.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 30bdd3bdd38..cff7ff6699c 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -381,7 +381,7 @@ func ConsensusClStages(ctx context.Context, defer tx.Rollback() blocksBatch := []*types.Block{} - blocksBatchLimit := 1000 + blocksBatchLimit := 10_000 if cfg.executionClient != nil && cfg.prebuffer != nil && cfg.executionClient.SupportInsertion() { if err := cfg.prebuffer.Load(tx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if len(v) == 0 { @@ -411,6 +411,7 @@ func ConsensusClStages(ctx context.Context, logger.Warn("failed to insert blocks", "err", err) } blocksBatch = []*types.Block{} + logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64()) } return next(k, nil, nil) }, etl.TransformArgs{}); err != nil { From 916d074044363d6936016b5b6c0206ed78845d5b Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:47:12 +0100 Subject: [PATCH 07/10] save --- cl/phase1/stages/clstages.go | 10 +- cl/phase1/stages/stage_history_download.go | 129 +++++++-------------- 2 files changed, 48 insertions(+), 91 deletions(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index cff7ff6699c..7954283badd 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -243,8 +243,10 @@ func ConsensusClStages(ctx context.Context, // This stage is special so use context.Background() TODO(Giulio2002): make the context be passed in startingSlot := cfg.state.LatestBlockHeader().Slot downloader := network2.NewBackwardBeaconDownloader(context.Background(), cfg.rpc, cfg.indiciesDB) - - if err := SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.antiquary, cfg.sn, cfg.beaconDB, cfg.indiciesDB, cfg.executionClient, cfg.genesisCfg, cfg.beaconCfg, cfg.backfilling, false, startingRoot, startingSlot, cfg.tmpdir, 600*time.Millisecond, logger), context.Background(), logger); err != nil { + if cfg.prebuffer == nil { + cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) + } + if err := SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.antiquary, cfg.sn, cfg.beaconDB, cfg.indiciesDB, cfg.executionClient, cfg.genesisCfg, cfg.beaconCfg, cfg.backfilling, false, startingRoot, startingSlot, cfg.tmpdir, 600*time.Millisecond, cfg.prebuffer, logger), context.Background(), logger); err != nil { cfg.hasDownloaded = false return err } @@ -387,6 +389,10 @@ func ConsensusClStages(ctx context.Context, if len(v) == 0 { return nil } + v, err = utils.DecompressSnappy(v) + if err != nil { + return err + } version := clparams.StateVersion(v[0]) parentRoot := common.BytesToHash(v[1:33]) v = v[33:] diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index be2442de6e6..ac8511e4f68 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -17,7 +17,6 @@ import ( "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/network" "github.com/ledgerwatch/erigon/cl/utils" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" "github.com/ledgerwatch/erigon/cl/clparams" @@ -26,42 +25,44 @@ import ( ) type StageHistoryReconstructionCfg struct { - genesisCfg *clparams.GenesisConfig - beaconCfg *clparams.BeaconChainConfig - downloader *network.BackwardBeaconDownloader - sn *freezeblocks.CaplinSnapshots - startingRoot libcommon.Hash - backfilling bool - waitForAllRoutines bool - startingSlot uint64 - tmpdir string - db persistence.BeaconChainDatabase - indiciesDB kv.RwDB - engine execution_client.ExecutionEngine - antiquary *antiquary.Antiquary - logger log.Logger - backfillingThrottling time.Duration + genesisCfg *clparams.GenesisConfig + beaconCfg *clparams.BeaconChainConfig + downloader *network.BackwardBeaconDownloader + sn *freezeblocks.CaplinSnapshots + startingRoot libcommon.Hash + backfilling bool + waitForAllRoutines bool + startingSlot uint64 + tmpdir string + db persistence.BeaconChainDatabase + indiciesDB kv.RwDB + engine execution_client.ExecutionEngine + antiquary *antiquary.Antiquary + logger log.Logger + backfillingThrottling time.Duration + executionBlocksCollector *etl.Collector } const logIntervalTime = 30 * time.Second -func StageHistoryReconstruction(downloader *network.BackwardBeaconDownloader, antiquary *antiquary.Antiquary, sn *freezeblocks.CaplinSnapshots, db persistence.BeaconChainDatabase, indiciesDB kv.RwDB, engine execution_client.ExecutionEngine, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, backfilling, waitForAllRoutines bool, startingRoot libcommon.Hash, startinSlot uint64, tmpdir string, backfillingThrottling time.Duration, logger log.Logger) StageHistoryReconstructionCfg { +func StageHistoryReconstruction(downloader *network.BackwardBeaconDownloader, antiquary *antiquary.Antiquary, sn *freezeblocks.CaplinSnapshots, db persistence.BeaconChainDatabase, indiciesDB kv.RwDB, engine execution_client.ExecutionEngine, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, backfilling, waitForAllRoutines bool, startingRoot libcommon.Hash, startinSlot uint64, tmpdir string, backfillingThrottling time.Duration, executionBlocksCollector *etl.Collector, logger log.Logger) StageHistoryReconstructionCfg { return StageHistoryReconstructionCfg{ - genesisCfg: genesisCfg, - beaconCfg: beaconCfg, - downloader: downloader, - startingRoot: startingRoot, - tmpdir: tmpdir, - startingSlot: startinSlot, - waitForAllRoutines: waitForAllRoutines, - logger: logger, - backfilling: backfilling, - indiciesDB: indiciesDB, - antiquary: antiquary, - db: db, - engine: engine, - sn: sn, - backfillingThrottling: backfillingThrottling, + genesisCfg: genesisCfg, + beaconCfg: beaconCfg, + downloader: downloader, + startingRoot: startingRoot, + tmpdir: tmpdir, + startingSlot: startinSlot, + waitForAllRoutines: waitForAllRoutines, + logger: logger, + backfilling: backfilling, + indiciesDB: indiciesDB, + antiquary: antiquary, + db: db, + engine: engine, + sn: sn, + backfillingThrottling: backfillingThrottling, + executionBlocksCollector: executionBlocksCollector, } } @@ -74,9 +75,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co if !clparams.SupportBackfilling(cfg.beaconCfg.DepositNetworkID) { cfg.backfilling = false // disable backfilling if not on a supported network } - executionBlocksCollector := etl.NewCollector("HistoryDownload", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer executionBlocksCollector.Close() - executionBlocksCollector.LogLvl(log.LvlDebug) + // Start the procedure logger.Info("Starting downloading History", "from", currentSlot) // Setup slot and block root @@ -113,13 +112,17 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co } if !foundLatestEth1ValidBlock.Load() && blk.Version() >= clparams.BellatrixVersion { payload := blk.Block.Body.ExecutionPayload + payloadRoot, err := payload.HashSSZ() + if err != nil { + return false, fmt.Errorf("error hashing execution payload during download: %s", err) + } encodedPayload, err := payload.EncodeSSZ(nil) if err != nil { return false, fmt.Errorf("error encoding execution payload during download: %s", err) } // Use snappy compression that the temporary files do not take too much disk. - encodedPayload = utils.CompressSnappy(append(encodedPayload, append(blk.Block.ParentRoot[:], byte(blk.Version()))...)) - if err := executionBlocksCollector.Collect(dbutils.BlockBodyKey(payload.BlockNumber, payload.BlockHash), encodedPayload); err != nil { + encodedPayload = utils.CompressSnappy(append([]byte{byte(blk.Version())}, append(blk.Block.ParentRoot[:], encodedPayload...)...)) + if err := cfg.executionBlocksCollector.Collect(dbutils.BlockBodyKey(payload.BlockNumber, payloadRoot), encodedPayload); err != nil { return false, fmt.Errorf("error collecting execution payload during download: %s", err) } if currEth1Progress.Load()%100 == 0 { @@ -224,59 +227,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co } defer tx2.Rollback() - blockBatch := []*types.Block{} - blockBatchMaxSize := 1000 - cfg.logger.Info("Ready to insert history, waiting for sync cycle to finish") - if err := executionBlocksCollector.Load(tx2, kv.Headers, func(k, vComp []byte, _ etl.CurrentTableReader, next etl.LoadNextFunc) error { - if cfg.engine == nil || !cfg.engine.SupportInsertion() { - return next(k, nil, nil) - } - var err error - var v []byte - if v, err = utils.DecompressSnappy(vComp); err != nil { - return fmt.Errorf("error decompressing dump during collection: %s", err) - } - - version := clparams.StateVersion(v[len(v)-1]) - parentRoot := libcommon.BytesToHash(v[len(v)-1-32 : len(v)-1]) - - executionPayload := cltypes.NewEth1Block(version, cfg.beaconCfg) - if err := executionPayload.DecodeSSZ(v[:len(v)-1-32], int(version)); err != nil { - return fmt.Errorf("error decoding execution payload during collection: %s", err) - } - if executionPayload.BlockNumber%10000 == 0 { - cfg.logger.Info("Inserting execution payload", "blockNumber", executionPayload.BlockNumber) - } - body := executionPayload.Body() - - header, err := executionPayload.RlpHeader(&parentRoot) - if err != nil { - return fmt.Errorf("error parsing rlp header during collection: %s", err) - } - - txs, err := types.DecodeTransactions(body.Transactions) - if err != nil { - return err - } - - block := types.NewBlockFromStorage(executionPayload.BlockHash, header, txs, nil, body.Withdrawals) - blockBatch = append(blockBatch, block) - if len(blockBatch) >= blockBatchMaxSize { - if err := cfg.engine.InsertBlocks(blockBatch, true); err != nil { - return fmt.Errorf("error inserting block during collection: %s", err) - } - blockBatch = blockBatch[:0] - } - return next(k, nil, nil) - }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err - } - if cfg.engine != nil && cfg.engine.SupportInsertion() { - if err := cfg.engine.InsertBlocks(blockBatch, true); err != nil { - return fmt.Errorf("error doing last block insertion during collection: %s", err) - } - } return nil } From 76d1a8fa725b4c195d2f20a86117a25382649367 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sun, 11 Feb 2024 23:53:01 +0100 Subject: [PATCH 08/10] save --- cl/phase1/stages/clstages.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 7954283badd..4eb3a416a89 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -245,6 +245,7 @@ func ConsensusClStages(ctx context.Context, downloader := network2.NewBackwardBeaconDownloader(context.Background(), cfg.rpc, cfg.indiciesDB) if cfg.prebuffer == nil { cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) + cfg.prebuffer.LogLvl(log.LvlDebug) } if err := SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.antiquary, cfg.sn, cfg.beaconDB, cfg.indiciesDB, cfg.executionClient, cfg.genesisCfg, cfg.beaconCfg, cfg.backfilling, false, startingRoot, startingSlot, cfg.tmpdir, 600*time.Millisecond, cfg.prebuffer, logger), context.Background(), logger); err != nil { cfg.hasDownloaded = false @@ -287,6 +288,7 @@ func ConsensusClStages(ctx context.Context, if shouldInsert && block.Version() >= clparams.BellatrixVersion { if cfg.prebuffer == nil { cfg.prebuffer = etl.NewCollector("Caplin-blocks", cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), log.Root()) + cfg.prebuffer.LogLvl(log.LvlDebug) } executionPayload := block.Block.Body.ExecutionPayload executionPayloadRoot, err := executionPayload.HashSSZ() @@ -430,6 +432,7 @@ func ConsensusClStages(ctx context.Context, } cfg.prebuffer.Close() cfg.prebuffer = nil + } logger.Debug("waiting for blocks...", From c6d2a53f56cd04a72fc9bbce77d57b948975ef7d Mon Sep 17 00:00:00 2001 From: Giulio Date: Mon, 12 Feb 2024 14:18:18 +0100 Subject: [PATCH 09/10] save --- cl/phase1/stages/clstages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 4eb3a416a89..4be1485eadf 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -418,8 +418,8 @@ func ConsensusClStages(ctx context.Context, if err := cfg.executionClient.InsertBlocks(blocksBatch, true); err != nil { logger.Warn("failed to insert blocks", "err", err) } - blocksBatch = []*types.Block{} logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64()) + blocksBatch = []*types.Block{} } return next(k, nil, nil) }, etl.TransformArgs{}); err != nil { From 0fed9b12cf652ce18a3d02c917a911902fad4bc7 Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 13 Feb 2024 16:23:45 +0100 Subject: [PATCH 10/10] save --- cl/phase1/stages/clstages.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 4be1485eadf..f396dbc5944 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -301,6 +301,7 @@ func ConsensusClStages(ctx context.Context, return highestSlotProcessed, highestBlockRootProcessed, err } enc = append([]byte{versionByte}, append(block.Block.ParentRoot[:], enc...)...) + enc = utils.CompressSnappy(enc) if err := cfg.prebuffer.Collect(dbutils.BlockBodyKey(executionPayload.BlockNumber, executionPayloadRoot), enc); err != nil { return highestSlotProcessed, highestBlockRootProcessed, err