diff --git a/docs/ai/acceptance-tests.md b/docs/ai/acceptance-tests.md
index 0b60f74e9f5..bd977692cd8 100644
--- a/docs/ai/acceptance-tests.md
+++ b/docs/ai/acceptance-tests.md
@@ -46,15 +46,25 @@ RUST_JIT_BUILD=1 cd op-acceptance-tests && mise exec -- just acceptance-test bas
This runs only packages listed in `gates/base.txt`.
-### Kona Reproducible Prestate
+### Kona Prestate
-Some tests (e.g. superfaultproofs, interop fault proofs) require a reproducible kona prestate. This is **not** handled by `build-deps` or `RUST_JIT_BUILD`:
+Some tests (e.g. superfaultproofs, interop fault proofs) require a kona prestate. This is **not** handled by `build-deps` or `RUST_JIT_BUILD`. There are two ways to build it:
+
+**Reproducible build** (preferred when Docker is available):
```bash
mise exec -- just reproducible-prestate-kona
```
-**Requires Docker.** If Docker is not available in your environment, ask the user to run this command for you.
+This produces a prestate whose hash matches CI/release builds. It works on any host with Docker installed.
+
+**Native build** (fallback when Docker is not available):
+
+```bash
+cd rust && mise exec -- just build-kona-prestates
+```
+
+Only works on **Linux** with the **MIPS cross-compile toolchain** installed. The produced hash will not match release builds, so this is only suitable for local test runs where the hash doesn't need to match a deployed release. If neither Docker nor the MIPS toolchain is available, ask the user to build the prestate for you.
## What `build-deps` Does
diff --git a/docs/public-docs/docs.json b/docs/public-docs/docs.json
index ba8665507f8..4bf6722f69f 100644
--- a/docs/public-docs/docs.json
+++ b/docs/public-docs/docs.json
@@ -2324,6 +2324,7 @@
"pages": [
"/op-stack/interop/interop",
"/op-stack/interop/explainer",
+ "/op-stack/interop/supernode",
"/op-stack/interop/reorg",
"/op-stack/interop/superchain-eth-bridge"
]
diff --git a/docs/public-docs/op-stack/interop/explainer.mdx b/docs/public-docs/op-stack/interop/explainer.mdx
index 0aa4218a768..21c370976dd 100644
--- a/docs/public-docs/op-stack/interop/explainer.mdx
+++ b/docs/public-docs/op-stack/interop/explainer.mdx
@@ -183,4 +183,5 @@ The Superchain interop cluster is being rolled out iteratively. To see which cha
* Learn [how messages get from one chain to another chain](/app-developers/guides/interoperability/message-passing).
* Learn how [interop handles reorgs and avoids double-spends](/op-stack/interop/reorg).
+* Read about [op-supernode](/op-stack/interop/supernode), the component that derives every chain in the dependency set together and enforces cross-chain safety.
* Read the [cross-chain security measures](/op-stack/security/interop-security) for safe interoperability.
diff --git a/docs/public-docs/op-stack/interop/reorg.mdx b/docs/public-docs/op-stack/interop/reorg.mdx
index ddb21f30c83..ad9cfa6a096 100644
--- a/docs/public-docs/op-stack/interop/reorg.mdx
+++ b/docs/public-docs/op-stack/interop/reorg.mdx
@@ -85,7 +85,7 @@ L1 reorgs typically happen at the unsafe head — only the most recent L1 blocks
When an L1 reorg does affect L2, one of two things happens:
* **The replacement L1 block carries the same batch data as the original.** Derivation is deterministic, so the L2 chain it produces is identical, and the reorg is a no-op from the L2 perspective.
-* **The replacement L1 block does not carry that batch data.** The sequencer notices and reposts the affected batch in a later L1 block. As long as the batch lands again before the [sequencer window](https://specs.optimism.io/glossary.html#sequencing-window) elapses (3600 L1 blocks ≈ 12 hours on standard chains like OP Mainnet and Base), derivation reproduces the same L2 chain. If the window does elapse without the batch reappearing, the affected L2 blocks are replaced with deposit-only blocks (see [Invalid block](#invalid-block) below).
+* **The replacement L1 block does not carry that batch data.** The sequencer notices and reposts the affected batch in a later L1 block. As long as the batch lands again before the [sequencer window](https://specs.optimism.io/glossary.html#sequencing-window) elapses (3600 L1 blocks ≈ 12 hours on standard chains like OP Mainnet and Unichain), derivation reproduces the same L2 chain. If the window does elapse without the batch reappearing, the affected L2 blocks are replaced with deposit-only blocks (see [Invalid block](#invalid-block) below).
The takeaway is that L1 reorgs do not by themselves break interop guarantees: either the data comes back and L2 stays identical, or the chain falls back to deposit-only blocks for that span — the same behavior as if the sequencer had simply gone offline.
@@ -160,5 +160,6 @@ At worst, some unsafe blocks need to be recalculated (if one fork is chosen over
## Next steps
* Read the [interop explainer](./explainer) for the rest of the architecture.
+* Read about [op-supernode](./supernode), the component that derives every chain in the dependency set together and enforces the safety levels described above.
* Read the [cross-chain security measures](/op-stack/security/interop-security) for safe interoperability.
-* View more [interop tutorials](/app-developers/tutorials/interoperability/).
+* View more [interop guides and tutorials](/app-developers/guides/interoperability/get-started).
diff --git a/docs/public-docs/op-stack/interop/supernode.mdx b/docs/public-docs/op-stack/interop/supernode.mdx
new file mode 100644
index 00000000000..4e2e57ee5ab
--- /dev/null
+++ b/docs/public-docs/op-stack/interop/supernode.mdx
@@ -0,0 +1,160 @@
+---
+title: OP Supernode
+description: Learn how op-supernode runs every chain in an interop dependency set inside one process.
+audit-source:
+ - op-supernode/cmd/main.go
+ - op-supernode/supernode/supernode.go
+ - op-supernode/supernode/chain_container/chain_container.go
+ - op-supernode/supernode/activity/interop/interop.go
+ - op-supernode/README.md
+ - op-supernode/safety-labels.md
+---
+
+
+ OP Stack interop is in active development.
+ op-supernode is the runtime that operators of interop chains will run, and the architecture and interfaces described here may continue to evolve as the rollout progresses.
+
+
+# OP Supernode
+
+*op-supernode* is a component that runs every chain in an interop dependency set together as virtual nodes inside one binary.
+Where a pre-interop deployment runs one op-node per chain, op-supernode hosts every chain side by side, shares the L1 and beacon-chain plumbing across them, and adds the cross-chain message verification work that interop needs.
+
+## Why op-supernode exists
+
+OP Stack interop changes what a single node has to do.
+Before interop, a node only had to derive its own chain.
+With interop, a node has to derive *every chain in its dependency set*, because any of those chains can emit an initiating message that the local chain depends on.
+The local chain cannot advance past a block whose dependencies it cannot prove, so the derivation of every other chain in the set is on the critical path.
+
+Without consolidation, every operator runs a full op-node and execution client for every chain in the dependency set.
+That duplicates the L1 client, the beacon-chain client, the derivation pipeline, the storage layout, and the operational glue around each one.
+For a fully-connected dependency set — the configuration the [Superchain interop cluster](/op-stack/interop/explainer#superchain-interop-cluster) targets — the duplication scales with the size of the cluster.
+
+op-supernode collapses that duplication.
+It runs each chain as an in-memory *virtual node* inside one process, with a single L1 client and a single beacon client serving all of them, and adds the cross-chain message verification work above the per-chain layer.
+
+## How op-supernode works
+
+The supernode is composed of *chain containers* and *activities*.
+Each chain container hosts one virtual node for one chain.
+Activities are modular components that operate above the chain layer, with access to every chain.
+
+
+
+### Chain containers
+
+A *chain container* is the supernode's wrapper around one chain.
+Inside a chain container is a *virtual node*: a consensus-layer (CL) implementation hosted in-process rather than as a separate operating-system process.
+Today the only virtual node implementation is op-node itself, hosted as a library; the chain container manages its lifecycle (start, stop, pause, resume) and exposes a stable interface to the rest of the supernode.
+
+Chain containers also drive the execution engine for the chain through an engine controller.
+They expose the operations the supernode needs to verify cross-chain messages — deriving the local-safe block at a timestamp, fetching receipts, answering output-root queries — without reaching into the internals of any one chain.
+
+### Shared resources
+
+Running every chain inside one process makes shared resources possible.
+
+* A single **L1 RPC client** and a single **L1 beacon client** serve every chain. Cache hits on L1 blocks and blob lookups carry across chains.
+* The **JSON-RPC surface** is namespaced per chain. `11155420/` reaches OP Sepolia's RPC; `1301/` reaches Unichain Sepolia's. Tools that expect an op-node-shaped endpoint reach a chain by addressing it through that prefix.
+* **Metrics** are namespaced per chain via the same scheme.
+* **Data directories** are namespaced so SafeDB and P2P state for one chain cannot collide with another's.
+
+Some flags are intentionally owned at the supernode level rather than per chain.
+`--l1` and `--l1.beacon` configure the shared L1 plumbing, and any per-chain override of them is silently replaced with the top-level value.
+
+### Activities
+
+An *activity* is a modular component that operates above the chain layer rather than inside any one chain.
+Each activity can register an RPC namespace on the supernode root, expose Prometheus metrics, and run a goroutine for as long as the supernode is up.
+
+The supernode ships with a small set of activities:
+
+* **Heartbeat** — emits a liveness signal and exposes `heartbeat_check` over JSON-RPC.
+* **SuperRoot** — produces a *super root*: a commitment over verified L2 blocks across the dependency set at a given timestamp. Exposed as `superroot_atTimestamp`. The fault proof system needs this commitment to produce an interop-aware proof.
+* **Supernode** — exposes `supernode_syncStatus`, an aggregate per-chain sync status across the dependency set.
+* **Interop** — does the actual cross-chain message verification (see the next section).
+
+## Cross-chain message safety
+
+The interop activity is the part of op-supernode that decides when a chain's blocks have satisfied their cross-chain dependencies and can be promoted past unsafe.
+It runs above the chain containers and reaches into them through a narrow interface.
+
+For every block produced on every chain, the interop activity answers one question: have all the initiating messages this block executes been reproduced from L1, and at the same safety level the destination block is trying to reach?
+The activity decides per round between *wait*, *advance*, *invalidate*, and *rewind*.
+The decision is recorded in a write-ahead log so the supernode can pick up after a restart in the same state it was in before.
+
+When the answer is *advance*, the supernode signals the chain's CL to promote the block.
+The signal flows through an *authority* interface that the chain container holds and the virtual node defers to: the supernode advances safety on a chain only via the chain's own CL.
+The CL stays the single source of truth for safety on its chain, and the execution layer (EL) never has to learn about interop.
+
+When the answer is *invalidate* or *rewind*, the supernode tells the chain to back out the affected blocks.
+A block on chain B that referenced a chain-A log can be invalidated because the chain-A log never made it to L1, or because the L1 record contradicts what was gossiped over P2P.
+See [Interop reorg awareness](/op-stack/interop/reorg) for how that plays out at the chain level.
+
+The user-facing safety levels do not change.
+Block safety levels (*unsafe*, *safe*, *finalized*) are still defined as they are without interop, and the EL's view of those labels matches.
+The supernode's role is to make sure the labels mean what they say once cross-chain dependencies are part of the picture.
+
+## op-supernode and Light CL
+
+op-supernode pairs with the *Light CL* mode of op-node and kona-node.
+A Light CL turns off local derivation and mirrors safe and finalized state from a trusted external source over the `optimism_syncStatus` RPC.
+It still advances the unsafe chain over P2P; only the safe and finalized views are delegated.
+
+Together, supernode and Light CL form a topology:
+
+* One trusted op-supernode (or a small high-availability pool of them) runs derivation for every chain in the dependency set, plus the interop activity that promotes blocks to safe.
+* The rest of the operator's fleet runs op-node or kona-node in Light CL mode, points at the supernode's `optimism_syncStatus`, and inherits its safe and finalized view.
+* Node operators run the same setup, minus the sequencer Light CLs (only the chain operator produces blocks).
+
+This split is what makes interop tractable for operators who already run dozens or hundreds of nodes per chain.
+The expensive multi-chain derivation work happens once, on the supernode; the rest of the fleet stays cheap.
+
+```mermaid
+graph LR
+ classDef supernode fill:#FFE
+ classDef transparent fill:none, stroke:none
+
+ SnA[Supernode-A]
+ SnB[Supernode-B]
+ SnC[Supernode-C]
+
+ Px0["Proxy CL Chain-0"]
+ Px1["Proxy CL Chain-1"]
+
+ subgraph c0["Chain-0 Light CL fleet"]
+ direction TB
+ C0Seq["Sequencer 0..N"]
+ C0Rep["Replica 0..N"]
+ end
+
+ subgraph c1["Chain-1 Light CL fleet"]
+ direction TB
+ C1Seq["Sequencer 0..N"]
+ C1Rep["Replica 0..N"]
+ end
+
+ SnA --> Px0
+ SnA --> Px1
+ SnB --> Px0
+ SnB --> Px1
+ SnC --> Px0
+ SnC --> Px1
+
+ Px0 --> C0Seq
+ Px0 --> C0Rep
+ Px1 --> C1Seq
+ Px1 --> C1Rep
+
+ class SnA,SnB,SnC supernode
+ class c0,c1 transparent
+```
+
+## Where to go next
+
+* Read the [interop explainer](/op-stack/interop/explainer) for how cross-chain messaging works at the protocol level.
+* Read [interop reorg awareness](/op-stack/interop/reorg) for how the safety model handles equivocation and L1 reorgs.
+* Read the [cross-chain security measures](/op-stack/security/interop-security) for how an operator can configure the safety level it requires for inbound messages.
+* Read the [specialized op-node topology notice](/notices/specialized-node-topology) for the operator-facing pattern of running light op-nodes with `--l2.follow.source`, the fleet-side of the supernode-plus-light-CL topology.
+* For implementation detail, see the [op-supernode source](https://github.com/ethereum-optimism/optimism/tree/develop/op-supernode) in the monorepo.
diff --git a/docs/public-docs/op-stack/reference/glossary.mdx b/docs/public-docs/op-stack/reference/glossary.mdx
index a31703f678f..67bf2a1c0d4 100644
--- a/docs/public-docs/op-stack/reference/glossary.mdx
+++ b/docs/public-docs/op-stack/reference/glossary.mdx
@@ -34,9 +34,9 @@ reality the block time is variable as some time slots might be skipped. Pre-merg
### Delegation
-Refers to the process of assigning the voting power of your tokens to a designated community member, known as a delegate.
-Delegates are individuals who have volunteered to actively participate in the governance of the Optimism Token House.
-By delegating your voting power, you enable these delegates to vote on governance matters on your behalf, while you retain full ownership of your tokens and the freedom to use them as you wish.
+Refers to the process of assigning the voting power of your tokens to a designated community member, known as a delegate.
+Delegates are individuals who have volunteered to actively participate in the governance of the Optimism Token House.
+By delegating your voting power, you enable these delegates to vote on governance matters on your behalf, while you retain full ownership of your tokens and the freedom to use them as you wish.
You can also change your chosen delegate at any time, allowing for flexibility in how your voting power is represented in the governance process.
### EOA or externally owned account
@@ -53,8 +53,8 @@ at the request of the L1 consensus layer. On L2, the executed blocks are freshly
### Optimism collective
-The Optimism Collective is a band of people, projects, and companies working together to build a better economy for everyone,
-united by a mutually beneficial pact to adhere to the axiom of impact=profit — the principle that positive impact to the collective should be rewarded with profit to the individual.
+The Optimism Collective is a band of people, projects, and companies working together to build a better economy for everyone,
+united by a mutually beneficial pact to adhere to the axiom of impact=profit — the principle that positive impact to the collective should be rewarded with profit to the individual.
New model of digital democratic governance optimized to drive rapid and sustained growth of a decentralized ecosystem.
@@ -269,7 +269,7 @@ It also submits [output roots](#l2-output-root) to L1.
Range of L1 blocks from which a [sequencing epoch](#sequencing-epoch) can be derived.
A sequencing window whose first L1 block has number `N` contains [batcher transactions](#batcher-transaction) for epoch
-`N`. The window contains blocks `(N, N + SWS)` where `SWS` is the sequencer window size.
+`N`. The window contains blocks `(N, N + SWS)` where `SWS` is the sequencer window size.
The current default `SWS` is 3600 epochs.
Additionally, the first block in the window defines the [depositing transactions](#depositing-transaction) which determine the
[deposits](#deposit) to be included in the first L2 block of the epoch.
@@ -284,6 +284,31 @@ sequencing window. Epochs can have variable size, subject to some constraints.
The network of OP Stack chains connected by native interoperability. Not yet live. Chains that are part of the OP Stack ecosystem share security and a common development stack (the OP Stack). The interop cluster specifically refers to the subset of chains connected by the OP Stack interoperability layer.
+### Dependency set
+
+The set of chains that a given chain accepts initiating messages from. A chain's local block cannot become safe until every initiating message it depends on has also been derived from L1. The *transitive* dependency set extends this to the dependencies of those chains, and so on. The [OP Stack interop cluster](#op-stack-interop-cluster) is configured as a fully-connected dependency set: every chain in the set has every other chain in its dependency set.
+
+### Supernode
+
+A component that runs the consensus layer of every chain in a [dependency set](#dependency-set) together as in-memory [virtual nodes](#virtual-node) inside one binary. The supernode shares the L1 client, L1 beacon client, JSON-RPC surface, and metrics across chains, and verifies that every cross-chain message a chain depends on has been reproduced from L1 before promoting blocks to "safe". It connects to an execution client for each chain using the engine API. Often referred to as `op-supernode` after the binary name. See the [supernode explainer](/op-stack/interop/supernode) for the architecture.
+
+### Chain container
+
+The [supernode](#supernode)'s wrapper around a single chain. A chain container hosts one [virtual node](#virtual-node), drives that chain's execution engine via an engine controller, and exposes a stable interface the rest of the supernode uses to derive blocks, fetch receipts, and answer output-root queries.
+
+### Virtual node
+
+A consensus-layer node hosted in-process inside a [supernode](#supernode), rather than as a separate operating-system process. Today the only virtual node implementation is op-node itself, hosted as a library.
+
+### Super root
+
+A commitment over verified L2 blocks across a [dependency set](#dependency-set) at a given timestamp. Produced by the supernode's `superroot_atTimestamp` RPC and consumed by the fault proof system as the input it needs to generate an interop-aware proof.
+
+### Light CL
+
+A mode of operation for the consensus layer (op-node or kona-node) that turns off local L1-to-L2 derivation and mirrors safe and finalized state from a trusted external source over the `optimism_syncStatus` RPC. A light CL still advances the unsafe chain over P2P. Pairs with a [supernode](#supernode) acting as the safe source for a fleet of light CLs.
+Learn more at the [Light Node Topology Notice Page](0xE69104DD872222E1Bd7C1adD47588F8C62ed64C0).
+
### Shared L1 Bridge
The L1 bridge contracts which govern all OP Chains in the OP Stack ecosystem. This bridge can be upgraded by the Optimism Collective.
diff --git a/docs/public-docs/public/img/op-stack/interop/supernode-architecture.png b/docs/public-docs/public/img/op-stack/interop/supernode-architecture.png
new file mode 100644
index 00000000000..8e3acd2250c
Binary files /dev/null and b/docs/public-docs/public/img/op-stack/interop/supernode-architecture.png differ
diff --git a/op-devstack/sysgo/l2_el_opreth.go b/op-devstack/sysgo/l2_el_opreth.go
index b73e61b0e5a..06663a20ec1 100644
--- a/op-devstack/sysgo/l2_el_opreth.go
+++ b/op-devstack/sysgo/l2_el_opreth.go
@@ -126,7 +126,7 @@ func (n *OpReth) Start() {
})
n.userRPC = "ws://" + n.userProxy.Addr()
}
- logOut := logpipe.ToLoggerWithMinLevel(n.p.Logger().New("component", "op-reth", "src", "stdout", "name", n.name, "chain", n.chainID), log.LevelWarn)
+ logOut := logpipe.ToLoggerWithMinLevel(n.p.Logger().New("component", "op-reth", "src", "stdout", "name", n.name, "chain", n.chainID), log.LevelInfo)
logErr := logpipe.ToLoggerWithMinLevel(n.p.Logger().New("component", "op-reth", "src", "stderr", "name", n.name, "chain", n.chainID), log.LevelWarn)
authRPCChan := make(chan string, 1)
diff --git a/op-devstack/sysgo/mixed_runtime.go b/op-devstack/sysgo/mixed_runtime.go
index 3b312b97d1c..b1ceea5bdeb 100644
--- a/op-devstack/sysgo/mixed_runtime.go
+++ b/op-devstack/sysgo/mixed_runtime.go
@@ -371,7 +371,6 @@ func buildMixedOpRethNode(
args,
"--proofs-history",
"--proofs-history.window=10000",
- "--proofs-history.prune-interval=1m",
"--proofs-history.storage-path="+proofHistoryDir,
"--proofs-history.storage-version="+storageVersion,
)
diff --git a/rust/Cargo.lock b/rust/Cargo.lock
index 6e33ac0a414..8b01272a845 100644
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -11191,7 +11191,6 @@ dependencies = [
"eyre",
"futures",
"futures-util",
- "humantime",
"op-alloy-consensus",
"op-alloy-network",
"op-alloy-rpc-types-engine",
@@ -11420,6 +11419,7 @@ dependencies = [
"reth-execution-errors",
"reth-metrics",
"reth-node-api",
+ "reth-optimism-trie",
"reth-primitives-traits",
"reth-provider",
"reth-revm",
diff --git a/rust/op-reth/crates/exex/Cargo.toml b/rust/op-reth/crates/exex/Cargo.toml
index 9ba9500eda0..e112dd325eb 100644
--- a/rust/op-reth/crates/exex/Cargo.toml
+++ b/rust/op-reth/crates/exex/Cargo.toml
@@ -31,12 +31,13 @@ alloy-eips.workspace = true
eyre.workspace = true
futures-util.workspace = true
tracing.workspace = true
-tokio.workspace = true
[dev-dependencies]
futures.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
+reth-optimism-trie = { workspace = true, features = ["test-utils"] }
reth-node-builder.workspace = true
+tokio.workspace = true
reth-optimism-node.workspace = true
reth-optimism-chainspec.workspace = true
reth-primitives-traits.workspace = true
@@ -46,13 +47,14 @@ tempfile.workspace = true
[features]
test-utils = [
- "reth-db/test-utils",
- "reth-trie/test-utils",
- "reth-node-builder/test-utils",
- "reth-optimism-node/test-utils",
- "reth-provider/test-utils",
- "reth-ethereum-primitives/test-utils",
- "reth-primitives-traits/test-utils",
+ "reth-db/test-utils",
+ "reth-trie/test-utils",
+ "reth-node-builder/test-utils",
+ "reth-optimism-node/test-utils",
+ "reth-provider/test-utils",
+ "reth-ethereum-primitives/test-utils",
+ "reth-primitives-traits/test-utils",
+ "reth-optimism-trie/test-utils"
]
metrics = [
"reth-optimism-trie/metrics",
diff --git a/rust/op-reth/crates/exex/src/lib.rs b/rust/op-reth/crates/exex/src/lib.rs
index 96bf159f5df..5c5287af359 100644
--- a/rust/op-reth/crates/exex/src/lib.rs
+++ b/rust/op-reth/crates/exex/src/lib.rs
@@ -15,37 +15,28 @@ use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes};
use reth_optimism_trie::{
- OpProofStoragePrunerTask, OpProofsProviderRO, OpProofsStorage, OpProofsStore,
- live::LiveTrieCollector,
+ OpProofStoragePruner, OpProofsProviderRO, OpProofsStore, engine::EngineHandle,
};
-use reth_provider::{BlockNumReader, BlockReader, TransactionVariant};
+use reth_provider::BlockNumReader;
use reth_trie::{HashedPostStateSorted, SortedTrieData, updates::TrieUpdatesSorted};
-use std::{sync::Arc, time::Duration};
-use tokio::{sync::watch, task, time};
-use tracing::{debug, error, info};
+use std::sync::Arc;
+use tracing::{debug, info};
// Safety threshold for maximum blocks to prune automatically on startup.
// If the required prune exceeds this, the node will error out and require manual pruning. Default
// is 1000 blocks.
const MAX_PRUNE_BLOCKS_STARTUP: u64 = 1000;
-/// How many blocks to process in a single batch before yielding. Default is 50 blocks.
-const SYNC_BLOCKS_BATCH_SIZE: usize = 50;
-
-/// How close to tip before we process blocks in real-time vs batch. Default is 1024 blocks.
-const REAL_TIME_BLOCKS_THRESHOLD: u64 = 1024;
-
-/// How long to sleep when sync task is caught up. Default is 5 seconds.
-const SYNC_IDLE_SLEEP_SECS: u64 = 5;
-
/// Default proofs history window: 1 month of blocks at 2s block time
const DEFAULT_PROOFS_HISTORY_WINDOW: u64 = 1_296_000;
-/// Default interval between proof-storage prune runs. Default is 15 seconds.
-const DEFAULT_PRUNE_INTERVAL: Duration = Duration::from_secs(15);
+/// Default verification interval: disabled (0 = no periodic re-execution)
+const DEFAULT_VERIFICATION_INTERVAL: u64 = 0;
-/// Default verification interval: disabled
-const DEFAULT_VERIFICATION_INTERVAL: u64 = 0; // disabled
+/// Number of blocks behind the chain tip within which we consider the ExEx to be in real-time
+/// operation. Notifications further behind than this are treated as sync catch-up and handled
+/// asynchronously by the engine.
+const REAL_TIME_BLOCKS_THRESHOLD: u64 = 64;
/// Builder for [`OpProofsExEx`].
#[derive(Debug)]
@@ -54,9 +45,8 @@ where
Node: FullNodeComponents,
{
ctx: ExExContext,
- storage: OpProofsStorage,
+ storage: Storage,
proofs_history_window: u64,
- proofs_history_prune_interval: Duration,
verification_interval: u64,
}
@@ -65,12 +55,11 @@ where
Node: FullNodeComponents,
{
/// Create a new builder with required parameters and defaults.
- pub const fn new(ctx: ExExContext, storage: OpProofsStorage) -> Self {
+ pub const fn new(ctx: ExExContext, storage: Storage) -> Self {
Self {
ctx,
storage,
proofs_history_window: DEFAULT_PROOFS_HISTORY_WINDOW,
- proofs_history_prune_interval: DEFAULT_PRUNE_INTERVAL,
verification_interval: DEFAULT_VERIFICATION_INTERVAL,
}
}
@@ -81,13 +70,11 @@ where
self
}
- /// Sets the interval between proof-storage prune runs.
- pub const fn with_proofs_history_prune_interval(mut self, interval: Duration) -> Self {
- self.proofs_history_prune_interval = interval;
- self
- }
-
- /// Sets the verification interval.
+ /// Sets the interval at which blocks are re-executed to verify pre-computed trie data.
+ ///
+ /// Every `interval`-th block (by block number) will be executed in full even when
+ /// pre-computed trie data is available, allowing detection of any divergence.
+ /// Set to `0` (the default) to disable periodic verification.
pub const fn with_verification_interval(mut self, interval: u64) -> Self {
self.verification_interval = interval;
self
@@ -99,7 +86,6 @@ where
ctx: self.ctx,
storage: self.storage,
proofs_history_window: self.proofs_history_window,
- proofs_history_prune_interval: self.proofs_history_prune_interval,
verification_interval: self.verification_interval,
}
}
@@ -124,8 +110,8 @@ where
/// use reth_node_builder::{NodeBuilder, NodeConfig};
/// use reth_optimism_chainspec::BASE_MAINNET;
/// use reth_optimism_exex::OpProofsExEx;
-/// use reth_optimism_node::{OpNode, args::RollupArgs};
-/// use reth_optimism_trie::{InMemoryProofsStorage, OpProofsStorage, db::MdbxProofsStorage};
+/// use reth_optimism_node::{args::RollupArgs, OpNode};
+/// use reth_optimism_trie::{db::MdbxProofsStorageV2, InMemoryProofsStorage, OpProofsStorage};
/// use reth_provider::providers::BlockchainProvider;
/// use std::{sync::Arc, time::Duration};
///
@@ -142,13 +128,12 @@ where
/// # let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
/// # let storage_path = temp_dir.path().join("proofs_storage");
///
-/// # let storage: OpProofsStorage> = Arc::new(
-/// # MdbxProofsStorage::new(&storage_path).expect("Failed to create MdbxProofsStorage"),
+/// # let storage: OpProofsStorage> = Arc::new(
+/// # MdbxProofsStorageV2::new(&storage_path).expect("Failed to create MdbxProofsStorageV2"),
/// # ).into();
///
/// let storage_exec = storage.clone();
/// let proofs_history_window = 1_296_000u64;
-/// let proofs_history_prune_interval = Duration::from_secs(3600);
///
/// // Verification interval: perform full execution every N blocks
/// let verification_interval = 0; // 0 = disabled, 100 = verify every 100 blocks
@@ -162,7 +147,6 @@ where
/// .install_exex("proofs-history", move |exex_context| async move {
/// Ok(OpProofsExEx::builder(exex_context, storage_exec)
/// .with_proofs_history_window(proofs_history_window)
-/// .with_proofs_history_prune_interval(proofs_history_prune_interval)
/// .with_verification_interval(verification_interval)
/// .build()
/// .run()
@@ -180,15 +164,12 @@ where
/// events.
ctx: ExExContext,
/// The type of storage DB.
- storage: OpProofsStorage,
+ storage: Storage,
/// The window to span blocks for proofs history. Value is the number of blocks, received as
/// cli arg.
proofs_history_window: u64,
- /// Interval between proof-storage prune runs
- proofs_history_prune_interval: Duration,
- /// Verification interval: perform full block execution every N blocks for data integrity.
- /// If 0, verification is disabled (always use fast path when available).
- /// If 1, verification is always enabled (always execute blocks).
+ /// How often (in blocks) to re-execute a block for verification even when pre-computed trie
+ /// data is available. `0` disables periodic verification.
verification_interval: u64,
}
@@ -197,14 +178,14 @@ where
Node: FullNodeComponents,
{
/// Create a new `OpProofsExEx` instance.
- pub fn new(ctx: ExExContext, storage: OpProofsStorage) -> Self {
+ pub fn new(ctx: ExExContext, storage: Storage) -> Self {
OpProofsExExBuilder::new(ctx, storage).build()
}
/// Create a new builder for `OpProofsExEx`.
pub const fn builder(
ctx: ExExContext,
- storage: OpProofsStorage,
+ storage: Storage,
) -> OpProofsExExBuilder {
OpProofsExExBuilder::new(ctx, storage)
}
@@ -214,31 +195,28 @@ impl OpProofsExEx
where
Node: FullNodeComponents>,
Primitives: NodePrimitives,
+ Primitives::Block: Clone,
Storage: OpProofsStore + Clone + 'static,
{
/// Main execution loop for the ExEx
pub async fn run(mut self) -> eyre::Result<()> {
self.ensure_initialized()?;
- let sync_target_tx = self.spawn_sync_task();
- let prune_task = OpProofStoragePrunerTask::new(
+ let pruner = OpProofStoragePruner::new(
self.storage.clone(),
self.ctx.provider().clone(),
self.proofs_history_window,
- self.proofs_history_prune_interval,
);
- self.ctx
- .task_executor()
- .spawn_with_graceful_shutdown_signal(|signal| Box::pin(prune_task.run(signal)));
- let collector = LiveTrieCollector::new(
+ let engine_handle = EngineHandle::spawn(
self.ctx.evm_config().clone(),
self.ctx.provider().clone(),
- &self.storage,
+ self.storage.clone(),
+ pruner,
);
while let Some(notification) = self.ctx.notifications.try_next().await? {
- self.handle_notification(notification, &collector, &sync_target_tx)?;
+ self.handle_notification(notification, &engine_handle)?;
}
Ok(())
@@ -257,7 +235,7 @@ where
}
};
- let latest_block_number: u64 = match provider_ro.get_latest_block_number()? {
+ let latest_block_number = match provider_ro.get_latest_block_number()? {
Some((n, _)) => n,
None => {
return Err(eyre::eyre!(
@@ -283,131 +261,23 @@ where
}
}
- // Need to update the earliest block metric on startup as this is not called frequently and
- // can show outdated info. When metrics are disabled, this is a no-op.
- #[cfg(feature = "metrics")]
- {
- self.storage
- .metrics()
- .block_metrics()
- .earliest_number
- .set(earliest_block_number as f64);
- }
-
- Ok(())
- }
-
- /// Spawn the background sync task and return the target sender
- fn spawn_sync_task(&self) -> watch::Sender {
- let (sync_target_tx, sync_target_rx) = watch::channel(0u64);
-
- let task_storage = self.storage.clone();
- let task_provider = self.ctx.provider().clone();
- let task_evm_config = self.ctx.evm_config().clone();
-
- self.ctx.task_executor().spawn_critical_task(
- "optimism::exex::proofs_storage_sync_loop",
- async move {
- let storage = task_storage.clone();
- let task_collector =
- LiveTrieCollector::new(task_evm_config, task_provider.clone(), &storage);
- Self::sync_loop(sync_target_rx, task_storage, task_provider, &task_collector).await;
- },
- );
-
- sync_target_tx
- }
-
- /// Background sync loop that processes blocks up to the target
- async fn sync_loop(
- mut sync_target_rx: watch::Receiver,
- storage: OpProofsStorage,
- provider: Node::Provider,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
- ) {
- debug!(target: "optimism::exex", "Starting proofs storage sync loop");
-
- loop {
- let target = *sync_target_rx.borrow_and_update();
- let latest = match storage.provider_ro().and_then(|p| p.get_latest_block_number()) {
- Ok(Some((n, _))) => n,
- Ok(None) => {
- error!(target: "optimism::exex", "No blocks stored in proofs storage during sync loop");
- continue;
- }
- Err(e) => {
- error!(target: "optimism::exex", error = ?e, "Failed to get latest block");
- continue;
- }
- };
-
- if latest >= target {
- time::sleep(Duration::from_secs(SYNC_IDLE_SLEEP_SECS)).await;
- continue;
- }
-
- // Process one batch
- if let Err(e) =
- Self::process_batch(latest, target, &provider, collector, SYNC_BLOCKS_BATCH_SIZE)
- {
- error!(target: "optimism::exex", error = ?e, "Batch processing failed");
- }
-
- // Yield to allow other tasks to run
- debug!(target: "optimism::exex", latest_stored = latest, target, "Batch processed, yielding");
- task::yield_now().await;
- }
- }
-
- /// Process a batch of blocks from start to target (up to `batch_size`)
- fn process_batch(
- start: u64,
- target: u64,
- provider: &Node::Provider,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
- batch_size: usize,
- ) -> eyre::Result<()> {
- let end = (start + batch_size as u64).min(target);
- debug!(
- target: "optimism::exex",
- start,
- end,
- "Processing proofs storage sync batch"
- );
-
- for block_num in (start + 1)..=end {
- let block = provider
- .recovered_block(block_num.into(), TransactionVariant::NoHash)?
- .ok_or_else(|| eyre::eyre!("Missing block {}", block_num))?;
-
- collector.execute_and_store_block_updates(&block)?;
- }
-
Ok(())
}
fn handle_notification(
&self,
notification: ExExNotification,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
- sync_target_tx: &watch::Sender,
+ engine_handle: &EngineHandle,
) -> eyre::Result<()> {
- let latest_stored = match self.storage.provider_ro()?.get_latest_block_number()? {
- Some((n, _)) => n,
- None => {
- return Err(eyre::eyre!("No blocks stored in proofs storage"));
- }
- };
-
match ¬ification {
ExExNotification::ChainCommitted { new } => {
- self.handle_chain_committed(new.clone(), latest_stored, collector, sync_target_tx)?
+ self.handle_chain_committed(new.clone(), engine_handle)?
}
ExExNotification::ChainReorged { old, new } => {
- self.handle_chain_reorged(old.clone(), new.clone(), latest_stored, collector)?
+ self.handle_chain_reorged(old.clone(), new.clone(), engine_handle)?
}
ExExNotification::ChainReverted { old } => {
- self.handle_chain_reverted(old.clone(), latest_stored, collector)?
+ self.handle_chain_reverted(old.clone(), engine_handle)?
}
}
@@ -421,9 +291,7 @@ where
fn handle_chain_committed(
&self,
new: Arc>,
- latest_stored: u64,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
- sync_target_tx: &watch::Sender,
+ engine_handle: &EngineHandle,
) -> eyre::Result<()> {
debug!(
target: "optimism::exex",
@@ -432,120 +300,35 @@ where
"ChainCommitted notification received",
);
- // If tip is not newer than what we have, nothing to do.
- if new.tip().number() <= latest_stored {
- debug!(
- target: "optimism::exex",
- block_number = new.tip().number(),
- latest_stored,
- "Already processed, skipping"
- );
- return Ok(());
- }
-
let best_block = self.ctx.provider().best_block_number()?;
- let is_sequential = new.tip().number() == latest_stored + 1;
let is_near_tip =
best_block.saturating_sub(new.tip().number()) < REAL_TIME_BLOCKS_THRESHOLD;
-
- if is_sequential && is_near_tip {
- debug!(
- target: "optimism::exex",
- block_number = new.tip().number(),
- latest_stored,
- best_block,
- "Processing in real-time"
- );
-
- // Process each block from latest_stored + 1 to tip
- let start = latest_stored.saturating_add(1);
- for block_number in start..=new.tip().number() {
- self.process_block(block_number, &new, collector)?;
- }
- } else {
- debug!(
- target: "optimism::exex",
- block_number = new.tip().number(),
- latest_stored,
- best_block,
- is_sequential,
- is_near_tip,
- "Scheduling batch processing via sync task"
- );
-
- // Update the sync target to the new tip
- sync_target_tx.send(new.tip().number())?;
+ if !is_near_tip {
+ engine_handle.sync_to(new.tip().number())?;
+ return Ok(());
}
- Ok(())
- }
+ // `Chain::blocks()` is a BTreeMap so iteration is already ordered oldest → newest.
+ for (&block_number, block) in new.blocks() {
+ // Fast path: use pre-computed trie data only when verification is not due.
+ let should_verify = self.verification_interval > 0 &&
+ block_number.is_multiple_of(self.verification_interval);
+ let precomputed = (!should_verify).then(|| new.trie_data_at(block_number)).flatten();
- /// Process a single block - either from chain or provider
- fn process_block(
- &self,
- block_number: u64,
- chain: &Chain,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
- ) -> eyre::Result<()> {
- // Check if this block should be verified via full execution
- let should_verify = self.verification_interval > 0 &&
- block_number.is_multiple_of(self.verification_interval);
-
- // Try to get block data from the chain first
- // 1. Fast Path: Try to use pre-computed state from the notification
- if let Some(block) = chain.blocks().get(&block_number) {
- // Check if we have BOTH trie updates and hashed state.
- // If either is missing, we fall back to execution to ensure data integrity.
- if let Some((trie_updates, hashed_state)) = chain.trie_data_at(block_number).map(|d| {
+ if let Some(d) = precomputed {
let SortedTrieData { hashed_state, trie_updates } = d.get();
- (trie_updates, hashed_state)
- }) {
- // Use fast path only if we're not scheduled to verify this block
- if !should_verify {
- debug!(
- target: "optimism::exex",
- block_number,
- "Using pre-computed state updates from notification"
- );
-
- collector.store_block_updates(
- block.block_with_parent(),
- (**trie_updates).clone(),
- (**hashed_state).clone(),
- )?;
-
- return Ok(());
- }
-
- info!(
- target: "optimism::exex",
- block_number,
- verification_interval = self.verification_interval,
- "Periodic verification: performing full block execution"
- );
+ engine_handle.index_block(
+ block.block_with_parent(),
+ (**trie_updates).clone(),
+ (**hashed_state).clone(),
+ )?;
+ } else {
+ // Slow path: execute the block in full (no trie data, or verification interval
+ // hit).
+ engine_handle.execute_block(block)?;
}
-
- debug!(
- target: "optimism::exex",
- block_number,
- "Block present in notification but state updates missing, falling back to execution"
- );
}
- // 2. Slow Path: Block not in chain (or state missing), fetch from provider and execute
- debug!(
- target: "optimism::exex",
- block_number,
- "Fetching block from provider for execution",
- );
-
- let block = self
- .ctx
- .provider()
- .recovered_block(block_number.into(), TransactionVariant::NoHash)?
- .ok_or_else(|| eyre::eyre!("Missing block {} in provider", block_number))?;
-
- collector.execute_and_store_block_updates(&block)?;
Ok(())
}
@@ -553,8 +336,7 @@ where
&self,
old: Arc>,
new: Arc>,
- latest_stored: u64,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
+ engine_handle: &EngineHandle,
) -> eyre::Result<()> {
info!(
old_block_number = old.tip().number(),
@@ -564,27 +346,20 @@ where
"ChainReorged notification received",
);
- if old.first().number() > latest_stored {
- debug!(target: "optimism::exex", "Reorg beyond stored blocks, skipping");
- return Ok(());
+ if old.fork_block() != new.fork_block() {
+ return Err(eyre::eyre!(
+ "Fork blocks do not match: old fork block {:?}, new fork block {:?}",
+ old.fork_block(),
+ new.fork_block()
+ ));
}
- // find the common ancestor
let mut block_updates: Vec<(
BlockWithParent,
Arc,
Arc,
)> = Vec::with_capacity(new.len());
for block_number in new.blocks().keys() {
- // verify if the fork point matches
- if old.fork_block() != new.fork_block() {
- return Err(eyre::eyre!(
- "Fork blocks do not match: old fork block {:?}, new fork block {:?}",
- old.fork_block(),
- new.fork_block()
- ));
- }
-
let block = new
.blocks()
.get(block_number)
@@ -605,7 +380,7 @@ where
));
}
- collector.unwind_and_store_block_updates(block_updates)?;
+ engine_handle.reorg(block_updates)?;
Ok(())
}
@@ -613,8 +388,7 @@ where
fn handle_chain_reverted(
&self,
old: Arc>,
- latest_stored: u64,
- collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>,
+ engine_handle: &EngineHandle,
) -> eyre::Result<()> {
info!(
target: "optimism::exex",
@@ -623,17 +397,7 @@ where
"ChainReverted notification received",
);
- if old.first().number() > latest_stored {
- debug!(
- target: "optimism::exex",
- first_block_number = old.first().number(),
- latest_stored = latest_stored,
- "Fork block number is greater than latest stored, skipping",
- );
- return Ok(());
- }
-
- collector.unwind_history(old.first().block_with_parent())?;
+ engine_handle.unwind(old.first().block_with_parent())?;
Ok(())
}
}
@@ -647,16 +411,12 @@ mod tests {
use reth_ethereum_primitives::{Block, Receipt};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_optimism_trie::{
- BlockStateDiff, OpProofsProviderRO, OpProofsProviderRw, OpProofsStorage, OpProofsStore,
- db::MdbxProofsStorage,
+ BlockStateDiff, OpProofsProviderRO, OpProofsProviderRw, OpProofsStore,
+ db::MdbxProofsStorageV2, engine::EngineHandle,
};
-
- fn get_latest(proofs: &OpProofsStorage) -> Option<(u64, B256)> {
- proofs.provider_ro().expect("provider_ro").get_latest_block_number().expect("get latest")
- }
use reth_primitives_traits::RecoveredBlock;
use reth_trie::{HashedPostStateSorted, LazyTrieData, updates::TrieUpdatesSorted};
- use std::{collections::BTreeMap, default::Default, sync::Arc, time::Duration};
+ use std::{collections::BTreeMap, default::Default, sync::Arc};
// -------------------------------------------------------------------------
// Helpers: deterministic blocks and deterministic Chain with precomputed updates
@@ -717,25 +477,23 @@ mod tests {
}
// Init_storage to the genesis block
- fn init_storage(storage: OpProofsStorage) {
+ fn init_storage(storage: S) {
let genesis_block = NumHash::new(0, b256(0x00));
- let provider_rw = storage.provider_rw().expect("provider_rw");
- provider_rw
- .set_earliest_block_number(genesis_block.number, genesis_block.hash)
+ let rw = storage.provider_rw().expect("provider rw");
+ rw.set_earliest_block_number(genesis_block.number, genesis_block.hash)
.expect("set earliest");
- provider_rw
- .store_trie_updates(
- BlockWithParent::new(genesis_block.hash, genesis_block),
- BlockStateDiff::default(),
- )
- .expect("store trie update");
- provider_rw.commit().expect("commit");
+ rw.store_trie_updates(
+ BlockWithParent::new(genesis_block.hash, genesis_block),
+ BlockStateDiff::default(),
+ )
+ .expect("store trie update");
+ rw.commit().expect("commit");
}
// Initialize exex with config
fn build_test_exex(
ctx: ExExContext,
- storage: OpProofsStorage,
+ storage: Store,
) -> OpProofsExEx
where
NodeT: FullNodeComponents,
@@ -743,7 +501,6 @@ mod tests {
{
OpProofsExEx::builder(ctx, storage)
.with_proofs_history_window(20)
- .with_proofs_history_prune_interval(Duration::from_secs(3600))
.with_verification_interval(1000)
.build()
}
@@ -752,30 +509,38 @@ mod tests {
async fn handle_notification_chain_committed() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
// Notification: chain committed 1..5
let new_chain = Arc::new(mk_chain_with_updates(1, 1, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
-
- exex.handle_notification(notif, &collector, &sync_target_tx).expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 1);
}
@@ -783,39 +548,53 @@ mod tests {
async fn handle_notification_chain_committed_skips_already_processed() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
// Process blocks 1..5 sequentially to trigger real-time path (synchronous)
for i in 1..=5 {
let new_chain = Arc::new(mk_chain_with_updates(i, i, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
}
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 5);
// Try to handle already processed notification
let new_chain = Arc::new(mk_chain_with_updates(5, 5, Some(hash_for_num(10))));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx).expect("handle chain commit");
- let latest = get_latest(&proofs).expect("ok");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok");
assert_eq!(latest.0, 5);
assert_eq!(latest.1, hash_for_num(5)); // block was not updated
}
@@ -824,32 +603,39 @@ mod tests {
async fn handle_notification_chain_reorged() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
-
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
+ let exex = build_test_exex(ctx, store.clone());
for i in 1..=10 {
let new_chain = Arc::new(mk_chain_with_updates(i, i, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
}
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 10);
// Now the tip is 10, and we want to reorg from block 6..12
@@ -859,9 +645,15 @@ mod tests {
// Notification: chain reorged 6..12
let notif = ExExNotification::ChainReorged { new: new_chain, old: old_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain re-orged");
- let latest = get_latest(&proofs).expect("ok").0;
+ exex.handle_notification(notif, &engine_handle).expect("handle chain re-orged");
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 12);
}
@@ -869,45 +661,60 @@ mod tests {
async fn handle_notification_chain_reorged_skips_beyond_stored_blocks() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
-
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
+ let exex = build_test_exex(ctx, store.clone());
for i in 1..=10 {
let new_chain = Arc::new(mk_chain_with_updates(i, i, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
}
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 10);
- // Now the tip is 10, and we want to reorg from block 12..15
+ // Now the tip is 10, and we want to reorg starting at block 12 (beyond stored tip).
+ // Both chains share the same fork point (block 11), so this is a valid reorg notification
+ // that starts beyond what we've indexed — the engine should skip it.
let old_chain = Arc::new(mk_chain_with_updates(12, 15, None));
- let new_chain = Arc::new(mk_chain_with_updates(10, 20, None));
+ let new_chain = Arc::new(mk_chain_with_updates(12, 20, None));
- // Notification: chain reorged 12..15
+ // Notification: chain reorged 12..20, fork at 11
let notif = ExExNotification::ChainReorged { new: new_chain, old: old_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain re-orged");
- let latest = get_latest(&proofs).expect("ok").0;
+ exex.handle_notification(notif, &engine_handle).expect("handle chain re-orged");
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 10);
}
@@ -915,33 +722,40 @@ mod tests {
async fn handle_notification_chain_reverted() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
-
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
+ let exex = build_test_exex(ctx, store.clone());
for i in 1..=10 {
let new_chain = Arc::new(mk_chain_with_updates(i, i, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
}
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 10);
// Now the tip is 10, and we want to revert from block 9..10
@@ -950,9 +764,15 @@ mod tests {
// Notification: chain reverted 9..10
let notif = ExExNotification::ChainReverted { old: old_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain reverted");
- let latest = get_latest(&proofs).expect("ok").0;
+ exex.handle_notification(notif, &engine_handle).expect("handle chain reverted");
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 8);
}
@@ -960,33 +780,40 @@ mod tests {
async fn handle_notification_chain_reverted_skips_beyond_stored_blocks() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
-
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
+ let exex = build_test_exex(ctx, store.clone());
for i in 1..=5 {
let new_chain = Arc::new(mk_chain_with_updates(i, i, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain commit");
+ exex.handle_notification(notif, &engine_handle).expect("handle chain commit");
}
- let latest = get_latest(&proofs).expect("ok").0;
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 5);
// Now the tip is 10, and we want to revert from block 9..10
@@ -995,9 +822,15 @@ mod tests {
// Notification: chain reverted 9..10
let notif = ExExNotification::ChainReverted { old: old_chain };
- exex.handle_notification(notif, &collector, &sync_target_tx)
- .expect("handle chain reverted");
- let latest = get_latest(&proofs).expect("ok").0;
+ exex.handle_notification(notif, &engine_handle).expect("handle chain reverted");
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get latest block")
+ .expect("ok")
+ .0;
assert_eq!(latest, 5);
}
@@ -1005,13 +838,12 @@ mod tests {
async fn ensure_initialized_errors_on_storage_not_initialized() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
let _ = exex.ensure_initialized().expect_err("should return error");
}
@@ -1019,25 +851,24 @@ mod tests {
async fn ensure_initialized_errors_when_prune_exceeds_threshold() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
for i in 1..1100 {
- let p = proofs.provider_rw().expect("provider_rw");
- p.store_trie_updates(
+ let rw = store.provider_rw().expect("provider rw");
+ rw.store_trie_updates(
BlockWithParent::new(hash_for_num(i - 1), BlockNumHash::new(i, hash_for_num(i))),
BlockStateDiff::default(),
)
.expect("store trie update");
- p.commit().expect("commit");
+ rw.commit().expect("commit");
}
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
let _ = exex.ensure_initialized().expect_err("should return error");
}
@@ -1045,15 +876,14 @@ mod tests {
async fn ensure_initialized_succeeds() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
exex.ensure_initialized().expect("should not return error");
}
@@ -1061,71 +891,74 @@ mod tests {
async fn handle_notification_errors_on_empty_storage() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
+ let exex = build_test_exex(ctx, store.clone());
// Any notification will do
let new_chain = Arc::new(mk_chain_with_updates(1, 5, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- let (sync_target_tx, _) = tokio::sync::watch::channel(0u64);
- let err = exex.handle_notification(notif, &collector, &sync_target_tx).unwrap_err();
- assert_eq!(err.to_string(), "No blocks stored in proofs storage");
+ let err = exex.handle_notification(notif, &engine_handle).unwrap_err();
+ // Error now comes from the engine layer (storage not initialised).
+ assert_eq!(err.to_string(), "No blocks found");
}
#[tokio::test]
async fn handle_notification_schedules_async_on_gap() {
// MDBX proofs storage
let dir = tempdir_path();
- let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env"));
- let proofs: OpProofsStorage> = store.clone().into();
+ let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env"));
- init_storage(proofs.clone());
+ init_storage(store.clone());
let (ctx, _handle) =
reth_exex_test_utils::test_exex_context().await.expect("exex test context");
- let collector = LiveTrieCollector::new(
+ let pruner = OpProofStoragePruner::new(store.clone(), ctx.components.provider.clone(), 20);
+ let engine_handle = EngineHandle::spawn_with_thresholds(
ctx.components.components.evm_config.clone(),
ctx.components.provider.clone(),
- &proofs,
+ store.clone(),
+ pruner,
+ 1,
+ 2,
);
- let exex = build_test_exex(ctx, proofs.clone());
+
+ let exex = build_test_exex(ctx, store.clone());
// Notification: chain committed 5..10 (Blocks 1,2,3,4 are missing from storage)
let new_chain = Arc::new(mk_chain_with_updates(5, 10, None));
let notif = ExExNotification::ChainCommitted { new: new_chain };
- let (sync_target_tx, mut sync_target_rx) = tokio::sync::watch::channel(0u64);
-
- // Process notification
- exex.handle_notification(notif, &collector, &sync_target_tx)
+ // Process notification — should return immediately (gap detected, deferred to engine).
+ exex.handle_notification(notif, &engine_handle)
.expect("handle chain commit should return ok immediately");
- // Verify async signal was sent
- // The target in the channel should now be 10 (the tip of the new chain)
- assert_eq!(
- *sync_target_rx.borrow_and_update(),
- 10,
- "Should have scheduled sync to block 10"
- );
-
- // Verify Main Thread did NOT process it
- // Because we didn't spawn the actual worker thread in this test, storage should still be at
- // 0. This proves the 'handle_notification' returned instantly without doing the
- // heavy lifting.
- let latest = get_latest(&proofs).expect("ok").0;
+ // Verify the notification handler did NOT process blocks synchronously.
+ // The engine has a sync target set but no blocks in the provider, so its catch-up
+ // will error out without writing anything. Storage stays at block 0.
+ engine_handle.flush();
+ let latest = store
+ .provider_ro()
+ .expect("provider ro")
+ .get_latest_block_number()
+ .expect("get")
+ .expect("ok")
+ .0;
assert_eq!(latest, 0, "Main thread should not have processed the blocks synchronously");
}
}
diff --git a/rust/op-reth/crates/node/Cargo.toml b/rust/op-reth/crates/node/Cargo.toml
index de80e7caceb..5b0b0664a79 100644
--- a/rust/op-reth/crates/node/Cargo.toml
+++ b/rust/op-reth/crates/node/Cargo.toml
@@ -77,7 +77,6 @@ clap.workspace = true
serde.workspace = true
eyre.workspace = true
url.workspace = true
-humantime.workspace = true
futures-util.workspace = true
tracing.workspace = true
@@ -148,7 +147,8 @@ test-utils = [
"reth-stages-types/test-utils",
"reth-db-api/test-utils",
"reth-tasks/test-utils",
- "reth-optimism-exex/test-utils"
+ "reth-optimism-exex/test-utils",
+ "reth-optimism-trie/test-utils"
]
reth-codec = ["reth-optimism-primitives/reth-codec"]
diff --git a/rust/op-reth/crates/node/src/args.rs b/rust/op-reth/crates/node/src/args.rs
index 0d9d69e246b..793faefb1f4 100644
--- a/rust/op-reth/crates/node/src/args.rs
+++ b/rust/op-reth/crates/node/src/args.rs
@@ -4,7 +4,7 @@
use clap::builder::ArgPredicate;
use op_alloy_consensus::interop::SafetyLevel;
-use std::{path::PathBuf, time::Duration};
+use std::path::PathBuf;
use url::Url;
/// Storage schema version for the proofs-history database.
@@ -122,25 +122,6 @@ pub struct RollupArgs {
)]
pub proofs_history_window: u64,
- /// Interval between proof-storage prune runs. Accepts human-friendly durations
- /// like "100s", "5m", "1h". Defaults to 15s.
- ///
- /// - Shorter intervals prune smaller batches more often, so each prune run tends to be faster
- /// and the blocking pause for writes is shorter, at the cost of more frequent pauses.
- /// - Longer intervals prune larger batches less often, which reduces how often pruning runs,
- /// but each run can take longer and block writes for longer.
- ///
- /// A shorter interval is preferred so that prune
- /// runs stay small and don’t stall writes for too long.
- ///
- /// CLI: `--proofs-history.prune-interval 10m`
- #[arg(
- long = "proofs-history.prune-interval",
- value_name = "PROOFS_HISTORY_PRUNE_INTERVAL",
- default_value = "15s",
- value_parser = humantime::parse_duration
- )]
- pub proofs_history_prune_interval: Duration,
/// Verification interval: perform full block execution every N blocks for data integrity.
/// - 0: Disabled (Default) (always use fast path with pre-computed data from notifications)
/// - 1: Always verify (always execute blocks, slowest)
@@ -190,7 +171,6 @@ impl Default for RollupArgs {
proofs_history: false,
proofs_history_storage_path: None,
proofs_history_window: 1_296_000,
- proofs_history_prune_interval: Duration::from_secs(15),
proofs_history_verification_interval: 0,
proofs_history_storage_version: ProofsStorageVersion::V1,
}
diff --git a/rust/op-reth/crates/node/src/proof_history.rs b/rust/op-reth/crates/node/src/proof_history.rs
index 024561fcdc6..b5c460d67b5 100644
--- a/rust/op-reth/crates/node/src/proof_history.rs
+++ b/rust/op-reth/crates/node/src/proof_history.rs
@@ -71,12 +71,8 @@ where
let storage: OpProofsStorage> = mdbx.clone().into();
let storage_exec = storage.clone();
- let RollupArgs {
- proofs_history_window,
- proofs_history_prune_interval,
- proofs_history_verification_interval,
- ..
- } = args.clone();
+ let RollupArgs { proofs_history_window, proofs_history_verification_interval, .. } =
+ args.clone();
let handle = builder
.node(OpNode::new(args))
@@ -91,7 +87,6 @@ where
.install_exex("proofs-history", async move |exex_context| {
Ok(OpProofsExEx::builder(exex_context, storage_exec)
.with_proofs_history_window(proofs_history_window)
- .with_proofs_history_prune_interval(proofs_history_prune_interval)
.with_verification_interval(proofs_history_verification_interval)
.build()
.run()
diff --git a/rust/op-reth/crates/trie/Cargo.toml b/rust/op-reth/crates/trie/Cargo.toml
index 3791ae6816d..f652d5adaa8 100644
--- a/rust/op-reth/crates/trie/Cargo.toml
+++ b/rust/op-reth/crates/trie/Cargo.toml
@@ -56,6 +56,7 @@ crossbeam-channel.workspace = true
derive_more.workspace = true
[dev-dependencies]
+reth-optimism-trie = { path = ".", features = ["test-utils"] }
reth-codecs = { workspace = true, features = ["test-utils"] }
tempfile.workspace = true
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros"] }
@@ -81,6 +82,18 @@ eyre.workspace = true
serial_test.workspace = true
[features]
+test-utils = [
+ "reth-codecs/test-utils",
+ "reth-db/test-utils",
+ "reth-ethereum-primitives?/test-utils",
+ "reth-evm/test-utils",
+ "reth-primitives-traits/test-utils",
+ "reth-provider/test-utils",
+ "reth-revm/test-utils",
+ "reth-tasks/test-utils",
+ "reth-trie/test-utils",
+ "reth-trie-common/test-utils"
+]
serde-bincode-compat = [
"reth-trie-common/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
diff --git a/rust/op-reth/crates/trie/src/engine/handle.rs b/rust/op-reth/crates/trie/src/engine/handle.rs
index 177c90e7cad..f50dc414cdb 100644
--- a/rust/op-reth/crates/trie/src/engine/handle.rs
+++ b/rust/op-reth/crates/trie/src/engine/handle.rs
@@ -4,6 +4,7 @@ use super::{
DEFAULT_BACKPRESSURE_THRESHOLD, DEFAULT_PERSISTENCE_THRESHOLD, EngineAction,
error::EngineError,
runner::Engine,
+ service_guard::ServiceGuard,
tasks::{ExecuteBlockTask, IndexBlockTask, ReorgTask, SyncToTask, UnwindTask},
};
use crate::{OpProofStoragePruner, OpProofsStore};
@@ -22,15 +23,10 @@ use tracing::error;
///
/// Every public method (except [`Self::sync_to`]) sends an engine action to the
/// engine thread and blocks on a one-shot reply channel.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct EngineHandle {
sender: Sender>,
-}
-
-impl Clone for EngineHandle {
- fn clone(&self) -> Self {
- Self { sender: self.sender.clone() }
- }
+ _service_guard: Arc,
}
impl EngineHandle {
@@ -87,7 +83,7 @@ impl EngineHandle
.with_persistence_threshold(persistence_threshold)
.with_backpressure_threshold(backpressure_threshold);
- thread::Builder::new()
+ let join_handle = thread::Builder::new()
.name("live-trie-collector".into())
.spawn(move || {
if let Err(panic) = panic::catch_unwind(panic::AssertUnwindSafe(|| engine.run())) {
@@ -101,7 +97,7 @@ impl EngineHandle
})
.expect("failed to spawn live-trie-collector thread");
- Self { sender: tx }
+ Self { sender: tx, _service_guard: Arc::new(ServiceGuard::new(join_handle)) }
}
fn send_and_recv(
@@ -164,7 +160,7 @@ impl EngineHandle
}
/// Block until any in-progress background persistence completes (test/utility only).
- #[cfg(test)]
+ #[cfg(any(test, feature = "test-utils"))]
pub fn flush(&self) {
use super::tasks::FlushTask;
let (reply_tx, reply_rx) = bounded(1);
diff --git a/rust/op-reth/crates/trie/src/engine/mod.rs b/rust/op-reth/crates/trie/src/engine/mod.rs
index 17c9e3f9176..fc24a514588 100644
--- a/rust/op-reth/crates/trie/src/engine/mod.rs
+++ b/rust/op-reth/crates/trie/src/engine/mod.rs
@@ -21,6 +21,7 @@ pub use handle::EngineHandle;
#[cfg(feature = "metrics")]
mod metrics;
mod runner;
+mod service_guard;
mod state;
/// Default number of blocks to keep in memory before persisting.
@@ -32,6 +33,14 @@ const DEFAULT_BACKPRESSURE_THRESHOLD: u64 = 10;
/// Default timeout for waiting on a persistence save/unwind operation (in seconds).
const DEFAULT_PERSISTENCE_TIMEOUT_SECS: u64 = 60;
+/// How long the engine waits with a non-empty memory buffer before flushing it even if the
+/// persistence threshold has not been reached.
+///
+/// Without this, a paused chain (e.g. fault-proof tests that freeze the sequencer) would leave
+/// buffered blocks unpersisted indefinitely, breaking the proofs RPC's strict "is this block
+/// persisted?" check.
+const IDLE_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
+
/// Messages sent from [`EngineHandle`] to the engine thread.
enum EngineAction {
/// Execute a block via the EVM and index the resulting trie diff.
@@ -43,7 +52,7 @@ enum EngineAction {
/// Unwind indexed data back to a given block.
Unwind(tasks::UnwindTask),
/// Block the caller until any in-flight persistence completes.
- #[cfg(test)]
+ #[cfg(any(test, feature = "test-utils"))]
Flush(tasks::FlushTask),
/// Update the sync catch-up target (fire-and-forget).
SyncTo(tasks::SyncToTask),
@@ -69,7 +78,7 @@ impl EngineAction {
Self::IndexBlock(task) => task.execute(state),
Self::Reorg(task) => task.execute(state),
Self::Unwind(task) => task.execute(state),
- #[cfg(test)]
+ #[cfg(any(test, feature = "test-utils"))]
Self::Flush(task) => task.execute(state),
Self::SyncTo(task) => task.execute(state),
}
diff --git a/rust/op-reth/crates/trie/src/engine/persistence/handle.rs b/rust/op-reth/crates/trie/src/engine/persistence/handle.rs
index 5ca9aafe303..b0030e3a1b6 100644
--- a/rust/op-reth/crates/trie/src/engine/persistence/handle.rs
+++ b/rust/op-reth/crates/trie/src/engine/persistence/handle.rs
@@ -1,6 +1,8 @@
//! Handle and action enum for the persistence service.
-use super::{error::PersistenceError, service::PersistenceService};
+use super::{
+ super::service_guard::ServiceGuard, error::PersistenceError, service::PersistenceService,
+};
use crate::{BlockStateDiff, OpProofsStore, prune::OpProofStoragePruner};
use alloy_eips::eip1898::BlockWithParent;
use crossbeam_channel::Sender;
@@ -29,14 +31,10 @@ pub enum PersistenceAction {
#[derive(Debug, Clone)]
pub struct PersistenceHandle {
sender: Sender,
+ _service_guard: Arc,
}
impl PersistenceHandle {
- /// Create a new handle.
- pub const fn new(sender: Sender) -> Self {
- Self { sender }
- }
-
/// Spawn the service in a new thread and return a handle.
pub fn spawn(pruner: OpProofStoragePruner, storage: S) -> Self
where
@@ -46,12 +44,12 @@ impl PersistenceHandle {
let (tx, rx) = crossbeam_channel::bounded(2);
let service = PersistenceService::new(pruner, storage, rx);
- thread::Builder::new()
+ let join_handle = thread::Builder::new()
.name("Live Trie Persistence".into())
.spawn(move || service.run())
.expect("failed to spawn live trie persistence thread");
- Self::new(tx)
+ Self { sender: tx, _service_guard: Arc::new(ServiceGuard::new(join_handle)) }
}
/// Send a save request.
diff --git a/rust/op-reth/crates/trie/src/engine/runner.rs b/rust/op-reth/crates/trie/src/engine/runner.rs
index cf52dedb4be..0bf22570a25 100644
--- a/rust/op-reth/crates/trie/src/engine/runner.rs
+++ b/rust/op-reth/crates/trie/src/engine/runner.rs
@@ -2,7 +2,7 @@
use super::{
DEFAULT_BACKPRESSURE_THRESHOLD, DEFAULT_PERSISTENCE_THRESHOLD, EngineAction,
- error::EngineError, state::EngineState as State,
+ IDLE_FLUSH_INTERVAL, error::EngineError, state::EngineState as State,
};
use crate::{OpProofStoragePruner, OpProofsStore};
use crossbeam_channel::Receiver;
@@ -108,19 +108,23 @@ where
super::tasks::execute_block(&block, &mut self.state)
}
- /// Process one event from the action, persistence, or sync channel.
+ /// Process one event from the action, persistence, sync, or idle-flush channel.
///
- /// Three receivers compete in a single `select!`:
+ /// Four receivers compete in a single `select!`:
/// - **action**: a new [`EngineAction`] from a caller, or [`crossbeam_channel::never`] while
/// backpressure is active — callers naturally block in their bounded `send` until the
/// in-flight save completes and memory is pruned.
/// - **persistence**: signals a completed background save.
/// - **sync**: a zero-duration timer that fires immediately when the engine is behind its sync
/// target and not under backpressure; [`crossbeam_channel::never`] otherwise.
+ /// - **idle-flush**: fires after [`IDLE_FLUSH_INTERVAL`] when memory holds buffered blocks but
+ /// the persistence threshold hasn't been reached and no save is in flight. Keeps a paused
+ /// chain from leaving buffered blocks invisible to the proofs RPC indefinitely.
///
/// Returns [`ControlFlow::Break`] when the action channel disconnects.
fn process_next_event(&mut self) -> ControlFlow<()> {
let backpressure = self.backpressure_active();
+ let save_in_flight = self.state.persistence.in_flight.is_some();
let persist_rx =
self.state.persistence.in_flight.clone().unwrap_or_else(crossbeam_channel::never);
@@ -136,6 +140,14 @@ where
crossbeam_channel::never()
};
+ // Arm the idle-flush timer only when there's buffered work that nothing else will flush.
+ let idle_flush_rx: Receiver =
+ if !self.state.memory.is_empty() && !save_in_flight && !backpressure {
+ crossbeam_channel::after(IDLE_FLUSH_INTERVAL)
+ } else {
+ crossbeam_channel::never()
+ };
+
crossbeam_channel::select! {
recv(incoming_rx) -> msg => match msg {
Ok(action) => action.execute(&mut self.state),
@@ -145,6 +157,9 @@ where
recv(sync_rx) -> _ => if let Err(err) = self.advance_sync() {
error!(target: "live-trie::engine", ?err, "Sync step failed");
},
+ recv(idle_flush_rx) -> _ => if let Err(e) = self.state.advance_persistence() {
+ error!(target: "live-trie::engine", ?e, "Idle flush failed");
+ },
}
ControlFlow::Continue(())
}
diff --git a/rust/op-reth/crates/trie/src/engine/service_guard.rs b/rust/op-reth/crates/trie/src/engine/service_guard.rs
new file mode 100644
index 00000000000..8231a1c81a3
--- /dev/null
+++ b/rust/op-reth/crates/trie/src/engine/service_guard.rs
@@ -0,0 +1,26 @@
+//! Generic guard that joins a service thread on drop.
+
+use std::{fmt, thread::JoinHandle};
+
+/// Joins the wrapped thread when dropped. `None` is allowed for test/mock construction.
+pub(super) struct ServiceGuard(Option>);
+
+impl ServiceGuard {
+ pub(super) const fn new(handle: JoinHandle<()>) -> Self {
+ Self(Some(handle))
+ }
+}
+
+impl fmt::Debug for ServiceGuard {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
+ }
+}
+
+impl Drop for ServiceGuard {
+ fn drop(&mut self) {
+ if let Some(join_handle) = self.0.take() {
+ let _ = join_handle.join();
+ }
+ }
+}
diff --git a/rust/op-reth/crates/trie/src/engine/tasks/execute_block.rs b/rust/op-reth/crates/trie/src/engine/tasks/execute_block.rs
index cfc0db4b4e3..d0604d5b0d5 100644
--- a/rust/op-reth/crates/trie/src/engine/tasks/execute_block.rs
+++ b/rust/op-reth/crates/trie/src/engine/tasks/execute_block.rs
@@ -7,12 +7,12 @@ use crossbeam_channel::Sender;
use reth_evm::{ConfigureEvm, execute::Executor};
use reth_primitives_traits::{AlloyBlockHeader, NodePrimitives, RecoveredBlock};
use reth_provider::{
- BlockHashReader, BlockReader, DatabaseProviderFactory, HashedPostStateProvider,
+ BlockHashReader, BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError,
StateProviderFactory, StateReader, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use std::time::Instant;
-use tracing::{debug, info};
+use tracing::{debug, info, warn};
pub(crate) struct ExecuteBlockTask {
pub(crate) block: RecoveredBlock,
@@ -87,8 +87,24 @@ where
let block_ref =
BlockWithParent::new(block.parent_hash(), NumHash::new(block.number(), block.hash()));
+ let parent_state = match state.provider.state_by_block_hash(block.parent_hash()) {
+ Ok(p) => p,
+ Err(ProviderError::StateForHashNotFound(hash)) => {
+ // Likely a transient reorg race: reth no longer has state for what we believe is the
+ // parent. Skip gracefully; subsequent ChainCommitted/ChainReorged notifications will
+ // resync us.
+ warn!(
+ block_number = block.number(),
+ parent_hash = ?hash,
+ "Parent state not available in reth; skipping execute_block",
+ );
+ return Ok(());
+ }
+ Err(e) => return Err(e.into()),
+ };
+
let inner_provider = OpProofsStateProviderRef::new(
- state.provider.state_by_block_hash(block.parent_hash())?,
+ parent_state,
state.storage.provider_ro()?,
parent_block_number,
);
diff --git a/rust/op-reth/crates/trie/src/engine/tasks/mod.rs b/rust/op-reth/crates/trie/src/engine/tasks/mod.rs
index 84c9b8c6488..9e9e39759c0 100644
--- a/rust/op-reth/crates/trie/src/engine/tasks/mod.rs
+++ b/rust/op-reth/crates/trie/src/engine/tasks/mod.rs
@@ -5,7 +5,7 @@
//! the reply. The engine dispatcher is a thin match with no business logic.
mod execute_block;
-#[cfg(test)]
+#[cfg(any(test, feature = "test-utils"))]
mod flush;
mod index_block;
mod reorg;
@@ -13,7 +13,7 @@ mod sync_to;
mod unwind;
pub(super) use execute_block::{ExecuteBlockTask, run as execute_block};
-#[cfg(test)]
+#[cfg(any(test, feature = "test-utils"))]
pub(super) use flush::FlushTask;
pub(super) use index_block::IndexBlockTask;
pub(super) use reorg::ReorgTask;
diff --git a/rust/op-reth/crates/trie/src/lib.rs b/rust/op-reth/crates/trie/src/lib.rs
index 16b75c20a54..7ef1ea48fa4 100644
--- a/rust/op-reth/crates/trie/src/lib.rs
+++ b/rust/op-reth/crates/trie/src/lib.rs
@@ -50,8 +50,6 @@ pub mod proof;
pub mod provider;
-pub mod live;
-
pub mod engine;
pub use engine::EngineHandle;
diff --git a/rust/op-reth/crates/trie/src/live.rs b/rust/op-reth/crates/trie/src/live.rs
deleted file mode 100644
index ce7cf6c895e..00000000000
--- a/rust/op-reth/crates/trie/src/live.rs
+++ /dev/null
@@ -1,234 +0,0 @@
-//! Live trie collector for external proofs storage.
-
-use crate::{
- BlockStateDiff, OpProofsStorage, OpProofsStorageError, OpProofsStore,
- api::{OpProofsProviderRO, OpProofsProviderRw, OperationDurations},
- provider::OpProofsStateProviderRef,
-};
-use alloy_eips::{BlockNumHash, NumHash, eip1898::BlockWithParent};
-use derive_more::Constructor;
-use reth_evm::{ConfigureEvm, execute::Executor};
-use reth_primitives_traits::{AlloyBlockHeader, BlockTy, RecoveredBlock};
-use reth_provider::{
- DatabaseProviderFactory, HashedPostStateProvider, StateProviderFactory, StateReader,
- StateRootProvider,
-};
-use reth_revm::database::StateProviderDatabase;
-use reth_trie_common::{HashedPostStateSorted, updates::TrieUpdatesSorted};
-use std::{sync::Arc, time::Instant};
-use tracing::info;
-
-/// Live trie collector for external proofs storage.
-#[derive(Debug, Constructor)]
-pub struct LiveTrieCollector<'tx, Evm, Provider, PreimageStore>
-where
- Evm: ConfigureEvm,
- Provider: StateReader + DatabaseProviderFactory + StateProviderFactory,
-{
- evm_config: Evm,
- provider: Provider,
- storage: &'tx OpProofsStorage,
-}
-
-impl<'tx, Evm, Provider, Store> LiveTrieCollector<'tx, Evm, Provider, Store>
-where
- Evm: ConfigureEvm,
- Provider: StateReader + DatabaseProviderFactory + StateProviderFactory,
- Store: 'tx + OpProofsStore + Clone + 'static,
-{
- /// Execute a block and store the updates in the storage.
- pub fn execute_and_store_block_updates(
- &self,
- block: &RecoveredBlock>,
- ) -> Result<(), OpProofsStorageError> {
- let mut operation_durations = OperationDurations::default();
-
- let start = Instant::now();
- // ensure that we have the state of the parent block
- let provider_ro = self.storage.provider_ro()?;
- let (Some((earliest, _)), Some((latest, _))) =
- (provider_ro.get_earliest_block_number()?, provider_ro.get_latest_block_number()?)
- else {
- return Err(OpProofsStorageError::NoBlocksFound);
- };
-
- let parent_block_number = block.number() - 1;
- if parent_block_number < earliest {
- return Err(OpProofsStorageError::UnknownParent);
- }
-
- if parent_block_number > latest {
- return Err(OpProofsStorageError::MissingParentBlock {
- block_number: block.number(),
- parent_block_number,
- latest_block_number: latest,
- });
- }
-
- let block_ref =
- BlockWithParent::new(block.parent_hash(), NumHash::new(block.number(), block.hash()));
-
- // TODO: should we check block hash here?
-
- let state_provider = OpProofsStateProviderRef::new(
- self.provider.state_by_block_hash(block.parent_hash())?,
- self.storage.provider_ro()?,
- parent_block_number,
- );
-
- let db = StateProviderDatabase::new(&state_provider);
- let block_executor = self.evm_config.batch_executor(db);
-
- let execution_result = block_executor.execute(&(*block).clone())?;
-
- operation_durations.execution_duration_seconds = start.elapsed();
-
- let hashed_state = state_provider.hashed_post_state(&execution_result.state);
- let (state_root, trie_updates) =
- state_provider.state_root_with_updates(hashed_state.clone())?;
-
- operation_durations.state_root_duration_seconds =
- start.elapsed() - operation_durations.execution_duration_seconds;
-
- if state_root != block.state_root() {
- return Err(OpProofsStorageError::StateRootMismatch {
- block_number: block.number(),
- current_state_hash: state_root,
- expected_state_hash: block.state_root(),
- });
- }
-
- let provider_rw = self.storage.provider_rw()?;
- let update_result = provider_rw.store_trie_updates(
- block_ref,
- BlockStateDiff {
- sorted_trie_updates: trie_updates.into_sorted(),
- sorted_post_state: hashed_state.into_sorted(),
- },
- )?;
- provider_rw.commit()?;
-
- operation_durations.total_duration_seconds = start.elapsed();
- operation_durations.write_duration_seconds = operation_durations.total_duration_seconds -
- operation_durations.state_root_duration_seconds -
- operation_durations.execution_duration_seconds;
-
- #[cfg(feature = "metrics")]
- {
- let block_metrics = self.storage.metrics().block_metrics();
- block_metrics.record_operation_durations(&operation_durations);
- block_metrics.increment_write_counts(&update_result);
- }
-
- info!(
- block_number = block.number(),
- ?operation_durations,
- ?update_result,
- "Block executed and trie updates stored successfully",
- );
-
- Ok(())
- }
-
- /// Store trie updates for a given block.
- pub fn store_block_updates(
- &self,
- block: BlockWithParent,
- sorted_trie_updates: TrieUpdatesSorted,
- sorted_post_state: HashedPostStateSorted,
- ) -> Result<(), OpProofsStorageError> {
- let start = Instant::now();
- let mut operation_durations = OperationDurations::default();
-
- let provider_rw = self.storage.provider_rw()?;
- let storage_result = provider_rw
- .store_trie_updates(block, BlockStateDiff { sorted_trie_updates, sorted_post_state })?;
- provider_rw.commit()?;
-
- let write_duration = start.elapsed();
- operation_durations.total_duration_seconds = write_duration;
- operation_durations.write_duration_seconds = write_duration;
-
- #[cfg(feature = "metrics")]
- {
- let block_metrics = self.storage.metrics().block_metrics();
- block_metrics.record_operation_durations(&operation_durations);
- block_metrics.increment_write_counts(&storage_result);
- }
-
- info!(
- block_number = block.block.number,
- ?operation_durations,
- ?storage_result,
- "Trie updates stored successfully",
- );
-
- Ok(())
- }
-
- /// Handles chain reorganizations by replacing block updates after a common ancestor.
- ///
- /// This method removes all block updates after the latest common ancestor (the block before
- /// the first block in `new_blocks`) and replaces them with the updates from the provided new
- /// chain.
- ///
- /// # Arguments
- ///
- /// * `new_blocks` - A vector of references to `RecoveredBlock` instances representing the new
- /// blocks to be added to the trie storage.
- pub fn unwind_and_store_block_updates(
- &self,
- block_updates: Vec<(BlockWithParent, Arc, Arc)>,
- ) -> Result<(), OpProofsStorageError> {
- if block_updates.is_empty() {
- return Ok(());
- }
-
- let start = Instant::now();
- let mut operation_durations = OperationDurations::default();
- let first = &block_updates[0].0;
- let latest_common_block =
- BlockNumHash::new(first.block.number.saturating_sub(1), first.parent);
- let mut block_trie_updates: Vec<(BlockWithParent, BlockStateDiff)> =
- Vec::with_capacity(block_updates.len());
-
- for (block, trie_updates, hashed_state) in &block_updates {
- block_trie_updates.push((
- *block,
- BlockStateDiff {
- sorted_trie_updates: (**trie_updates).clone(),
- sorted_post_state: (**hashed_state).clone(),
- },
- ));
- }
-
- let provider_rw = self.storage.provider_rw()?;
- provider_rw.replace_updates(latest_common_block, block_trie_updates)?;
- provider_rw.commit()?;
- let write_duration = start.elapsed();
- operation_durations.total_duration_seconds = write_duration;
- operation_durations.write_duration_seconds = write_duration;
-
- #[cfg(feature = "metrics")]
- {
- let block_metrics = self.storage.metrics().block_metrics();
- block_metrics.record_operation_durations(&operation_durations);
- }
-
- info!(
- start_block_number = block_updates.first().map(|(b, _, _)| b.block.number),
- end_block_number = block_updates.last().map(|(b, _, _)| b.block.number),
- ?operation_durations,
- "Trie updates rewound and stored successfully",
- );
- Ok(())
- }
-
- /// Remove account, storage and trie updates from historical storage for all blocks from
- /// the specified block (inclusive).
- pub fn unwind_history(&self, to: BlockWithParent) -> Result<(), OpProofsStorageError> {
- let provider_rw = self.storage.provider_rw()?;
- provider_rw.unwind_history(to)?;
- provider_rw.commit()
- }
-}
diff --git a/rust/op-reth/crates/trie/src/metrics.rs b/rust/op-reth/crates/trie/src/metrics.rs
index 3d42a07c1ed..27f71ba0d4d 100644
--- a/rust/op-reth/crates/trie/src/metrics.rs
+++ b/rust/op-reth/crates/trie/src/metrics.rs
@@ -4,14 +4,14 @@ use crate::{
BlockStateDiff, OpProofsStorageResult, OpProofsStore,
api::{
InitialStateAnchor, OpProofsInitProvider, OpProofsProviderRO, OpProofsProviderRw,
- OperationDurations, WriteCounts,
+ WriteCounts,
},
cursor,
};
use alloy_eips::{BlockNumHash, eip1898::BlockWithParent};
use alloy_primitives::{B256, U256, map::HashMap};
use derive_more::Constructor;
-use metrics::{Counter, Gauge, Histogram};
+use metrics::{Gauge, Histogram};
use reth_db::DatabaseError;
use reth_metrics::Metrics;
use reth_primitives_traits::Account;
@@ -28,8 +28,8 @@ use std::{
};
use strum::{EnumCount, EnumIter, IntoEnumIterator};
-/// Alias for [`OpProofsStorageWithMetrics`].
-pub type OpProofsStorage = OpProofsStorageWithMetrics;
+/// Alias for [`OpProofsStoreWithMetrics`].
+pub type OpProofsStorage = OpProofsStoreWithMetrics;
/// Alias for [`TrieCursor`](cursor::OpProofsTrieCursor) with metrics layer.
pub type OpProofsTrieCursor = cursor::OpProofsTrieCursor>;
@@ -87,13 +87,23 @@ impl StorageOperation {
}
}
+/// Metrics tracking the range of blocks available for proof generation.
+#[derive(Metrics, Clone)]
+#[metrics(scope = "optimism_trie.proof_window")]
+pub struct ProofWindowMetrics {
+ /// Earliest block number available in the proof window.
+ pub earliest: Gauge,
+ /// Latest block number available in the proof window.
+ pub latest: Gauge,
+}
+
/// Metrics for storage operations.
#[derive(Debug)]
pub struct StorageMetrics {
/// Cache of operation metrics handles, keyed by (operation, context)
operations: HashMap,
- /// Block-level metrics
- block_metrics: BlockMetrics,
+ /// Proof window metrics
+ pub proof_window: ProofWindowMetrics,
}
impl StorageMetrics {
@@ -101,7 +111,7 @@ impl StorageMetrics {
pub fn new() -> Self {
Self {
operations: Self::generate_operation_handles(),
- block_metrics: BlockMetrics::new_with_labels(&[] as &[(&str, &str)]),
+ proof_window: ProofWindowMetrics::new_with_labels(&[] as &[(&str, &str)]),
}
}
@@ -139,11 +149,6 @@ impl StorageMetrics {
result
}
- /// Get block metrics for recording high-level timing.
- pub const fn block_metrics(&self) -> &BlockMetrics {
- &self.block_metrics
- }
-
/// Record a pre-measured duration for an operation.
pub fn record_duration(&self, operation: StorageOperation, duration: Duration) {
if let Some(metrics) = self.operations.get(&operation) {
@@ -201,52 +206,6 @@ impl OperationMetrics {
}
}
-/// High-level block processing metrics.
-#[derive(Metrics, Clone)]
-#[metrics(scope = "optimism_trie.block")]
-pub struct BlockMetrics {
- /// Total time to process a block (end-to-end) in seconds
- pub total_duration_seconds: Histogram,
- /// Time spent executing the block (EVM) in seconds
- pub execution_duration_seconds: Histogram,
- /// Time spent calculating state root in seconds
- pub state_root_duration_seconds: Histogram,
- /// Time spent writing trie updates to storage in seconds
- pub write_duration_seconds: Histogram,
- /// Number of trie updates written
- pub account_trie_updates_written_total: Counter,
- /// Number of storage trie updates written
- pub storage_trie_updates_written_total: Counter,
- /// Number of hashed accounts written
- pub hashed_accounts_written_total: Counter,
- /// Number of hashed storages written
- pub hashed_storages_written_total: Counter,
- /// Earliest block number that the proofs storage has stored.
- pub earliest_number: Gauge,
- /// Latest block number that the proofs storage has stored.
- pub latest_number: Gauge,
-}
-
-impl BlockMetrics {
- /// Record operation durations for the processing of a block.
- pub fn record_operation_durations(&self, durations: &OperationDurations) {
- self.total_duration_seconds.record(durations.total_duration_seconds);
- self.execution_duration_seconds.record(durations.execution_duration_seconds);
- self.state_root_duration_seconds.record(durations.state_root_duration_seconds);
- self.write_duration_seconds.record(durations.write_duration_seconds);
- }
-
- /// Increment write counts of historical trie updates for a single block.
- pub fn increment_write_counts(&self, counts: &WriteCounts) {
- self.account_trie_updates_written_total
- .increment(counts.account_trie_updates_written_total);
- self.storage_trie_updates_written_total
- .increment(counts.storage_trie_updates_written_total);
- self.hashed_accounts_written_total.increment(counts.hashed_accounts_written_total);
- self.hashed_storages_written_total.increment(counts.hashed_storages_written_total);
- }
-}
-
/// Wrapper for [`TrieCursor`] that records metrics.
#[derive(Debug, Constructor, Clone)]
pub struct OpProofsTrieCursorWithMetrics {
@@ -336,12 +295,12 @@ impl HashedStorageCursor for OpProofsHashedCursorWithMet
/// Wrapper around [`OpProofsStore`] type that records metrics for all operations.
#[derive(Debug, Clone)]
-pub struct OpProofsStorageWithMetrics {
+pub struct OpProofsStoreWithMetrics {
storage: S,
metrics: Arc,
}
-impl OpProofsStorageWithMetrics {
+impl OpProofsStoreWithMetrics {
/// Initializes new [`StorageMetrics`] and wraps given storage instance.
pub fn new(storage: S) -> Self {
Self { storage, metrics: Arc::new(StorageMetrics::default()) }
@@ -358,7 +317,7 @@ impl OpProofsStorageWithMetrics {
}
}
-impl OpProofsStore for OpProofsStorageWithMetrics
+impl OpProofsStore for OpProofsStoreWithMetrics
where
S: OpProofsStore,
{
@@ -422,7 +381,7 @@ impl OpProofsProviderRO for OpProofsProviderROWithMetrics
fn get_earliest_block_number(&self) -> OpProofsStorageResult