Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions crates/op-rbuilder/src/builders/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{

/// Container type that holds all necessities to build a new payload.
#[derive(Debug)]
pub struct OpPayloadBuilderCtx {
pub struct OpPayloadBuilderCtx<ExtraCtx: Debug + Default = ()> {
/// The type that knows how to perform system calls and configure the evm.
pub evm_config: OpEvmConfig,
/// The DA config for the payload builder
Expand All @@ -68,9 +68,11 @@ pub struct OpPayloadBuilderCtx {
pub builder_signer: Option<Signer>,
/// The metrics for the builder
pub metrics: Arc<OpRBuilderMetrics>,
/// Extra context for the payload builder
pub extra_ctx: ExtraCtx,
}

impl OpPayloadBuilderCtx {
impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
/// Returns the parent block the payload will be build on.
pub fn parent(&self) -> &SealedHeader {
&self.config.parent_header
Expand Down Expand Up @@ -195,7 +197,7 @@ impl OpPayloadBuilderCtx {
}
}

impl OpPayloadBuilderCtx {
impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
/// Constructs a receipt for the given transaction.
fn build_receipt<E: Evm>(
&self,
Expand Down
117 changes: 76 additions & 41 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ struct ExtraExecutionInfo {
pub last_flashblock_index: usize,
}

#[derive(Debug, Default)]
struct FlashblocksExtraCtx {
/// Current flashblock index
pub flashblock_index: u64,
/// Target flashblock count
pub target_flashblock_count: u64,
}

impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
/// Returns the current flashblock index
pub fn flashblock_index(&self) -> u64 {
self.extra_ctx.flashblock_index
}

/// Returns the target flashblock count
pub fn target_flashblock_count(&self) -> u64 {
self.extra_ctx.target_flashblock_count
}

/// Increments the flashblock index
pub fn increment_flashblock_index(&mut self) -> u64 {
self.extra_ctx.flashblock_index += 1;
self.extra_ctx.flashblock_index
}

/// Sets the target flashblock count
pub fn set_target_flashblock_count(&mut self, target_flashblock_count: u64) -> u64 {
self.extra_ctx.target_flashblock_count = target_flashblock_count;
self.extra_ctx.target_flashblock_count
}

/// Returns if the flashblock is the last one
pub fn is_last_flashblock(&self) -> bool {
self.flashblock_index() == self.target_flashblock_count() - 1
}
}

/// Optimism's payload builder
#[derive(Debug, Clone)]
pub struct OpPayloadBuilder<Pool, Client, BT> {
Expand Down Expand Up @@ -199,7 +236,7 @@ where
.next_evm_env(&config.parent_header, &block_env_attributes)
.map_err(PayloadBuilderError::other)?;

let mut ctx = OpPayloadBuilderCtx {
let mut ctx = OpPayloadBuilderCtx::<FlashblocksExtraCtx> {
evm_config: self.evm_config.clone(),
chain_spec: self.client.chain_spec(),
config,
Expand All @@ -210,6 +247,10 @@ where
da_config: self.config.da_config.clone(),
builder_signer: self.config.builder_signer,
metrics: Default::default(),
extra_ctx: FlashblocksExtraCtx {
flashblock_index: 0,
target_flashblock_count: self.config.flashblocks_per_block(),
},
};

let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?;
Expand Down Expand Up @@ -281,27 +322,28 @@ where
// We adjust our flashblocks timings based on time_drift if dynamic adjustment enable
let (flashblocks_per_block, first_flashblock_offset) =
self.calculate_flashblocks(timestamp);
ctx.set_target_flashblock_count(flashblocks_per_block);
info!(
target: "payload_builder",
message = "Performed flashblocks timing derivation",
flashblocks_per_block,
flashblocks_per_block = ctx.target_flashblock_count(),
first_flashblock_offset = first_flashblock_offset.as_millis(),
flashblocks_interval = self.config.specific.interval.as_millis(),
);
ctx.metrics.reduced_flashblocks_number.record(
self.config
.flashblocks_per_block()
.saturating_sub(flashblocks_per_block) as f64,
.saturating_sub(ctx.target_flashblock_count()) as f64,
);
ctx.metrics
.first_flashblock_time_offset
.record(first_flashblock_offset.as_millis() as f64);
let gas_per_batch = ctx.block_gas_limit() / flashblocks_per_block;
let gas_per_batch = ctx.block_gas_limit() / ctx.target_flashblock_count();
let mut total_gas_per_batch = gas_per_batch;
let da_per_batch = ctx
.da_config
.max_da_block_size()
.map(|da_limit| da_limit / flashblocks_per_block);
.map(|da_limit| da_limit / ctx.target_flashblock_count());
// Check that builder tx won't affect fb limit too much
if let Some(da_limit) = da_per_batch {
// We error if we can't insert any tx aside from builder tx in flashblock
Expand All @@ -317,10 +359,6 @@ where
*da_limit = da_limit.saturating_sub(builder_tx_da_size);
}

// TODO: we should account for a case when we will issue only 1 flashblock
let last_flashblock = flashblocks_per_block.saturating_sub(1);

let mut flashblock_count = 0;
// This channel coordinates flashblock building
let (fb_cancel_token_rx, mut fb_cancel_token_tx) =
mpsc::channel((self.config.flashblocks_per_block() + 1) as usize);
Expand Down Expand Up @@ -354,11 +392,11 @@ where
// execute_best_transaction without cancelling parent token
ctx.cancel = cancel_token;
// TODO: remove this
if flashblock_count >= flashblocks_per_block {
if ctx.flashblock_index() >= ctx.target_flashblock_count() {
info!(
target: "payload_builder",
target = flashblocks_per_block,
flashblock_count = flashblock_count,
target = ctx.target_flashblock_count(),
flashblock_count = ctx.flashblock_index(),
block_number = ctx.block_number(),
"Skipping flashblock reached target",
);
Expand All @@ -368,7 +406,7 @@ where
info!(
target: "payload_builder",
block_number = ctx.block_number(),
flashblock_count = flashblock_count,
flashblock_count = ctx.flashblock_index(),
target_gas = total_gas_per_batch,
gas_used = info.cumulative_gas_used,
target_da = total_da_per_batch.unwrap_or(0),
Expand All @@ -377,13 +415,14 @@ where
);
let flashblock_build_start_time = Instant::now();
let state = StateProviderDatabase::new(&state_provider);
invoke_on_last_flashblock(flashblock_count, last_flashblock, || {
// If it is the last flashblock, we need to account for the builder tx
if ctx.is_last_flashblock() {
total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas);
// saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size
if let Some(da_limit) = total_da_per_batch.as_mut() {
*da_limit = da_limit.saturating_sub(builder_tx_da_size);
}
});
}
let mut db = State::builder()
.with_database(state)
.with_bundle_update()
Expand Down Expand Up @@ -416,12 +455,14 @@ where
// Caution: this assume that block cancel token only cancelled when new FCU is received
if block_cancel.is_cancelled() {
ctx.metrics.block_built_success.increment(1);
ctx.metrics.flashblock_count.record(flashblock_count as f64);
ctx.metrics
.flashblock_count
.record(ctx.flashblock_index() as f64);
debug!(
target: "payload_builder",
message = "Payload building complete, job cancelled during execution"
);
span.record("flashblock_count", flashblock_count);
span.record("flashblock_count", ctx.flashblock_index());
return Ok(());
}

Expand All @@ -434,9 +475,9 @@ where
.set(payload_tx_simulation_time);

// If it is the last flashblocks, add the builder txn to the block if enabled
invoke_on_last_flashblock(flashblock_count, last_flashblock, || {
if ctx.is_last_flashblock() {
ctx.add_builder_tx(&mut info, &mut db, builder_tx_gas, message.clone());
});
};

let total_block_built_duration = Instant::now();
let build_result = build_block(db, &ctx, &mut info);
Expand All @@ -453,12 +494,12 @@ where
Err(err) => {
// Track invalid/bad block
ctx.metrics.invalid_blocks_count.increment(1);
error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), flashblock_count, err);
error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), ctx.flashblock_index(), err);
// Return the error
return Err(err);
}
Ok((new_payload, mut fb_payload, new_bundle_state)) => {
fb_payload.index = flashblock_count + 1; // we do this because the fallback block is index 0
fb_payload.index = ctx.increment_flashblock_index(); // fallback block is index 0, so we need to increment here
fb_payload.base = None;

// We check that child_job got cancelled before sending flashblock.
Expand Down Expand Up @@ -493,11 +534,11 @@ where
error!("Builder end up in faulty invariant, if da_per_batch is set then total_da_per_batch must be set");
}
}
flashblock_count += 1;

info!(
target: "payload_builder",
message = "Flashblock built",
?flashblock_count,
flashblock_count = ctx.flashblock_index(),
current_gas = info.cumulative_gas_used,
current_da = info.cumulative_da_bytes_used,
target_flashblocks = flashblocks_per_block,
Expand All @@ -508,17 +549,19 @@ where
None => {
// Exit loop if channel closed or cancelled
ctx.metrics.block_built_success.increment(1);
ctx.metrics.flashblock_count.record(flashblock_count as f64);
ctx.metrics
.flashblock_count
.record(ctx.flashblock_index() as f64);
ctx.metrics
.missing_flashblocks_count
.record(flashblocks_per_block.saturating_sub(flashblock_count) as f64);
.record(flashblocks_per_block.saturating_sub(ctx.flashblock_index()) as f64);
debug!(
target: "payload_builder",
message = "Payload building complete, channel closed or job cancelled",
missing_falshblocks = flashblocks_per_block.saturating_sub(flashblock_count),
missing_falshblocks = flashblocks_per_block.saturating_sub(ctx.flashblock_index()),
reduced_flashblocks = self.config.flashblocks_per_block().saturating_sub(flashblocks_per_block),
);
span.record("flashblock_count", flashblock_count);
span.record("flashblock_count", ctx.flashblock_index());
return Ok(());
}
}
Expand Down Expand Up @@ -660,12 +703,13 @@ struct FlashblocksMetadata {
block_number: u64,
}

fn execute_pre_steps<DB>(
fn execute_pre_steps<DB, ExtraCtx>(
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx,
ctx: &OpPayloadBuilderCtx<ExtraCtx>,
) -> Result<ExecutionInfo<ExtraExecutionInfo>, PayloadBuilderError>
where
DB: Database<Error = ProviderError> + std::fmt::Debug,
ExtraCtx: std::fmt::Debug + Default,
{
// 1. apply pre-execution changes
ctx.evm_config
Expand All @@ -679,14 +723,15 @@ where
Ok(info)
}

fn build_block<DB, P>(
fn build_block<DB, P, ExtraCtx>(
mut state: State<DB>,
ctx: &OpPayloadBuilderCtx,
ctx: &OpPayloadBuilderCtx<ExtraCtx>,
info: &mut ExecutionInfo<ExtraExecutionInfo>,
) -> Result<(OpBuiltPayload, FlashblocksPayloadV1, BundleState), PayloadBuilderError>
where
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
ExtraCtx: std::fmt::Debug + Default,
{
// TODO: We must run this only once per block, but we are running it on every flashblock
// merge all transitions into bundle state, this would apply the withdrawal balance changes
Expand Down Expand Up @@ -897,13 +942,3 @@ where
new_bundle,
))
}

pub fn invoke_on_last_flashblock<F: FnOnce()>(
current_flashblock: u64,
flashblock_limit: u64,
fun: F,
) {
if current_flashblock == flashblock_limit {
fun()
}
}
1 change: 1 addition & 0 deletions crates/op-rbuilder/src/builders/standard/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ where
cancel,
builder_signer: self.config.builder_signer,
metrics: self.metrics.clone(),
extra_ctx: Default::default(),
};

let builder = OpBuilder::new(best);
Expand Down
Loading