Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/payload/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ enum Mutation<P: Platform> {
/// should not be discarded or reordered. An example of this would be placing
/// a barrier after applying sequencer transactions to ensure that they do
/// not get reordered by pipelines. Another example would be placing a barrier
/// after every commited flashblock, to ensure that any steps in the pipeline
/// do not modify the commited state of the payload in process.
/// after every committed flashblock, to ensure that any steps in the pipeline
/// do not modify the committed state of the payload in process.
///
/// If there are multiple barriers in the history, the last one is considered
/// as the beginning of the staging history.
Expand Down
2 changes: 1 addition & 1 deletion src/payload/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl<P: Platform> IntoExecutable<P, Variant<1>>
}

/// Signature-recovered individual transactions are always infallibly
/// convertable into an executable.
/// convertible into an executable.
impl<P: Platform> IntoExecutable<P, Variant<2>>
for Recovered<types::Transaction<P>>
{
Expand Down
4 changes: 2 additions & 2 deletions src/payload/ext/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait BlockExt<P: Platform>: super::sealed::Sealed {
/// Address of the fees recipient for the block.
fn coinbase(&self) -> Address;

/// Returns the balance of the given address at the begining of the block
/// Returns the balance of the given address at the beginning of the block
/// before any transactions are executed.
fn balance_of(&self, address: Address) -> ProviderResult<U256>;
}
Expand Down Expand Up @@ -101,7 +101,7 @@ impl<P: Platform> BlockExt<P> for BlockContext<P> {
self.attributes().suggested_fee_recipient()
}

/// Returns the balance of the given address at the begining of the block
/// Returns the balance of the given address at the beginning of the block
/// before any transactions are executed.
fn balance_of(&self, address: Address) -> ProviderResult<U256> {
self
Expand Down
5 changes: 3 additions & 2 deletions src/pipelines/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use {
/// pipeline.
///
/// Notes:
/// - In a pipeline with nested pipelines, the top-level pipline's event bus is
/// responsible for handling all events of all contained pipelines and steps.
/// - In a pipeline with nested pipelines, the top-level pipeline's event bus
/// is responsible for handling all events of all contained pipelines and
/// steps.
#[derive(Default, Debug)]
pub(super) struct EventsBus<P: Platform> {
publishers: DashMap<TypeId, Sender<Arc<dyn Any + Send + Sync>>>,
Expand Down
14 changes: 7 additions & 7 deletions src/pipelines/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PipelineOutput<P: Platform> =

/// This type is responsible for executing a single run of a pipeline.
///
/// It's execution is driven by the future poll that it implements. Each call to
/// The execution is driven by the future poll that it implements. Each call to
/// `poll` will run one step of the pipeline at a time, or parts of a step if
/// the step is async and needs many polls before it completes. The executor
/// future will resolve when the whole pipeline has been executed, or when an
Expand Down Expand Up @@ -123,7 +123,7 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
impl<P: Platform, Provider: traits::ProviderBounds<P>>
PipelineExecutor<P, Provider>
{
/// This method creates a future that encapsulates the execution an an async
/// This method creates a future that encapsulates the execution as an async
/// step. The created future will be held inside `Cursor::StepInProgress` and
/// polled until it resolves.
///
Expand Down Expand Up @@ -243,7 +243,7 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>

async move {
// invoke the `after_job` method of each step in the pipeline
// if any of them failes we fail the pipeline execution, othwerwise
// if any of them fails, we fail the pipeline execution, otherwise
// we return the output of the pipeline.
for step in pipeline.iter_steps() {
let navi = step.navigator(&pipeline).expect(
Expand Down Expand Up @@ -279,7 +279,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let executor = self.get_mut();

// The executor has not ran any steps yet, it is invoking the `before_job`
// The executor has not run any steps yet, it is invoking the `before_job`
// method of each step in the pipeline.
if let Cursor::Initializing(ref mut future) = executor.cursor {
if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) {
Expand Down Expand Up @@ -404,7 +404,7 @@ enum Cursor<P: Platform> {

/// The pipeline is currently initializing all steps for a new payload job.
///
/// This happens once before any step is executed and it calls the
/// This happens once before any step is executed, and it calls the
/// `before_job` method of each step in the pipeline.
Initializing(
Pin<
Expand All @@ -419,8 +419,8 @@ enum Cursor<P: Platform> {
Finalizing(Pin<Box<dyn Future<Output = PipelineOutput<P>> + Send>>),

/// The pipeline is currently preparing to execute the next step.
/// We are in this state only for a brief moment inside the `poll` method
/// and it will never be seen by the `run_step` method.
/// We are in this state only for a brief moment inside the `poll` method, and
/// it will never be seen by the `run_step` method.
PreparingStep,
}

Expand Down
24 changes: 12 additions & 12 deletions src/pipelines/exec/navi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl StepPath {
self.depth() == 1
}

/// Returns `true` the the path is pointing to a prologue of a pipeline.
/// Returns `true` if the path is pointing to a prologue of a pipeline.
pub(crate) fn is_prologue(&self) -> bool {
self.leaf() == PROLOGUE_INDEX
}
Expand Down Expand Up @@ -183,7 +183,7 @@ impl StepPath {
/// Given two paths, where one is an ancestor of the other, returns the
/// intermediate paths between them.
///
/// If the one othe paths is not an ancestor of the other an empty vector is
/// If the other path is not an ancestor of the other an empty vector is
/// returned.
///
/// if the paths are equal, an empty vector is returned.
Expand Down Expand Up @@ -426,11 +426,11 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
let is_last = position + 1 >= enclosing_pipeline.steps().len();

match (self.behavior(), is_last) {
(Behavior::Loop, true) => {
(Loop, true) => {
// we are the last step in a loop pipeline, go to first step.
Self(self.0.replace_leaf(STEP0_INDEX), self.1.clone()).enter()
}
(Behavior::Once, true) => {
(Once, true) => {
// we are last step in a non-loop pipeline, this is the end of a
// single iteration loop.
self.after_loop()
Expand Down Expand Up @@ -469,7 +469,7 @@ impl<P: Platform> StepNavigator<'_, P> {
fn behavior(&self) -> Behavior {
// top-level pipelines are always `Once`.
if self.0.is_toplevel() {
return Behavior::Once;
return Once;
}

// to identify the behavior of the pipeline that contains the current step
Expand Down Expand Up @@ -560,7 +560,7 @@ impl<P: Platform> StepNavigator<'_, P> {
}
}

/// Finds the next step to run afer the prologue of the current pipeline.
/// Finds the next step to run after the prologue of the current pipeline.
fn after_prologue(self) -> Option<Self> {
if self.pipeline().steps().is_empty() {
// no steps, go to epilogue.
Expand All @@ -583,8 +583,8 @@ impl<P: Platform> StepNavigator<'_, P> {
// is last step in the enclosing pipeline?
if step_index + 1 >= enclosing_pipeline.steps().len() {
match ancestor.behavior() {
Behavior::Loop => ancestor.after_prologue(),
Behavior::Once => ancestor.after_loop(),
Loop => ancestor.after_prologue(),
Once => ancestor.after_loop(),
}
} else {
// there are more items in the enclosing pipeline, so we can just
Expand Down Expand Up @@ -681,7 +681,7 @@ mod test {
Pipeline::<Ethereum>::default().with_step(Step1),
StepPath::step0(),
// name autogenerated from source location
vec![format!("navi_{}", (line!() - 3))]
vec![format!("navi_{}", line!() - 3)]
);

// one step with prologue
Expand Down Expand Up @@ -773,17 +773,17 @@ mod test {
.with_step(Step2)
.with_step(Step3);
let navigator = StepNavigator::entrypoint(&pipeline).unwrap();
assert_eq!(navigator.behavior(), Behavior::Once);
assert_eq!(navigator.behavior(), Once);

let pipeline =
Pipeline::<Ethereum>::default().with_pipeline(Loop, (Step1,));
let navigator = StepNavigator::entrypoint(&pipeline).unwrap();
assert_eq!(navigator.behavior(), Behavior::Loop);
assert_eq!(navigator.behavior(), Loop);

let pipeline =
Pipeline::<Ethereum>::default().with_pipeline(Once, (Step1,));
let navigator = StepNavigator::entrypoint(&pipeline).unwrap();
assert_eq!(navigator.behavior(), Behavior::Once);
assert_eq!(navigator.behavior(), Once);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/exec/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use {
/// `PipelineExecutor::advance_cursor`.
///
/// Scopes manage:
/// - The metrics name for each pipeline and its nested pipelines
/// - The metrics name for each pipeline and its nested pipelines
/// - Limits calculation and renewal for pipeline steps.
pub(crate) struct RootScope<P: Platform> {
root: RwLock<Scope<P>>,
Expand Down
16 changes: 8 additions & 8 deletions src/pipelines/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use {
///
/// This is a long-running job that will be polled by the CL node until it is
/// resolved. The job future must resolve within 1 second from the moment
/// [`PayloadJob::resolve_kind`] is called with [`PaylodKind::Earliest`].
/// [`PayloadJob::resolve_kind`] is called with [`PayloadKind::Earliest`].
pub(super) struct PayloadJob<P, Provider>
where
P: Platform,
Expand Down Expand Up @@ -96,7 +96,7 @@ where
// protocol.
PayloadKind::Earliest => {
debug!(
"Resolving earliest payload for job {}",
"Resolving the earliest payload for job {}",
self.block.attributes().payload_id()
);
(self.fut.clone(), KeepPayloadJobAlive::No)
Expand All @@ -120,7 +120,7 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// When the payload job future is polled, we begin executing the pipeline
// production future immediatelly as well, so that the time between the
// production future immediately as well, so that the time between the
// creation of the job and the call to `resolve_kind` is utilized for the
// pipeline execution.
//
Expand All @@ -134,15 +134,15 @@ where
return Poll::Ready(Err(e));
}

// On happy paths or in-progress pipielines, keep the future alive. Reth
// On happy paths or in-progress pipelines, keep the future alive. Reth
// will drop it when a payload is resolved.
Poll::Pending
}
}

/// This future wraps the `PipelineExecutor` and is used to poll the
/// internal executor of the pipeline. Once this future is resolved, it
/// can be polled again and will return copie of the resolved payload.
/// can be polled again and will return copy of the resolved payload.
pub(super) struct ExecutorFuture<P, Provider>
where
P: Platform,
Expand All @@ -154,8 +154,8 @@ where
}

/// This enum allows us to wrap the `PipelineExecutor` future
/// and cache the result of the execution. Also it makes the executor future
/// clonable, so that many copies of the future could be returned from
/// and cache the result of the execution. Also, it makes the executor future
/// cloneable, so that many copies of the future could be returned from
/// `resolve_kind`.
///
/// Whenever any of the copies of the future is polled, it will poll the
Expand Down Expand Up @@ -271,7 +271,7 @@ where
}
}

/// We want this to be clonable because the `resolve_kind` method could
/// We want this to be cloneable because the `resolve_kind` method could
/// potentially return multiple copies of the future, and we want all of them to
/// resolve with the same result at the same time.
impl<P, Provider> Clone for ExecutorFuture<P, Provider>
Expand Down
32 changes: 2 additions & 30 deletions src/pipelines/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait ScopedLimits<P: Platform>: Send + Sync + 'static {
fn create(&self, payload: &Checkpoint<P>, enclosing: &Limits) -> Limits;
}

/// Convinience trait that allows API users to either use a `ScopedLimits` type
/// Convenience trait that allows API users to either use a `ScopedLimits` type
/// or a concrete limits value in [`Pipeline::with_limits`].
pub trait IntoScopedLimits<P: Platform, Marker = ()> {
/// Convert the type into a limits factory.
Expand Down Expand Up @@ -105,35 +105,7 @@ impl<T> From<Zero> for ScaleOp<T> {

impl<P: Platform> ScopedLimits<P> for Scaled {
fn create(&self, _: &Checkpoint<P>, enclosing: &Limits) -> Limits {
let mut limits = *enclosing;

if let Some(ref op) = self.gas {
limits.gas_limit = op.apply(limits.gas_limit);
}

if let Some(ref op) = self.deadline {
limits.deadline = op.apply(limits.deadline);
}

if let Some(ref op) = self.max_txs {
limits.max_transactions = op.apply(limits.max_transactions);
}

if let Some(ref op) = self.max_blob_count {
limits.blob_params = limits.blob_params.map(|params| BlobParams {
max_blob_count: op.apply(params.max_blob_count),
..params
});
}

if let Some(ref op) = self.max_blobs_per_tx {
limits.blob_params = limits.blob_params.map(|params| BlobParams {
max_blobs_per_tx: op.apply(params.max_blobs_per_tx),
..params
});
}

limits
self.from(enclosing)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(super) struct Payload {
pub fees_total: Counter,

/// The time given by the EL for the payload job to complete.
/// This can be also interpretted as the block time.
/// This can be also interpreted as the block time.
pub job_deadline: Gauge,

/// The latest block number produced by the payload builder.
Expand Down
4 changes: 2 additions & 2 deletions src/pipelines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<P: Platform> Pipeline<P> {

/// Observability
impl<P: Platform> Pipeline<P> {
/// Returns true if the pipieline has no steps, prologue or epilogue.
/// Returns true if the pipeline has no steps, prologue or epilogue.
pub fn is_empty(&self) -> bool {
self.prologue.is_none() && self.epilogue.is_none() && self.steps.is_empty()
}
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<P: Platform> core::fmt::Debug for StepOrPipeline<P> {
}
}

/// This trait is used to enable various syntatic sugar for defining nested
/// This trait is used to enable various syntactic sugar for defining nested
/// pipelines
pub trait IntoPipeline<P: Platform, Marker = ()> {
#[track_caller]
Expand Down
6 changes: 3 additions & 3 deletions src/pipelines/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {

/// This type is the bridge between Reth's payload builder API and the
/// pipelines API. It will take a pipeline instance and turn it into what
/// eventually becomes a Paylod Job Generator. They payload Job Generator
/// eventually becomes a Payload Job Generator. The payload Job Generator
/// will be responsible for creating new [`PayloadJob`] instances
/// whenever a new payload request comes in from the CL Node.
pub(super) struct PipelineServiceBuilder<P: Platform> {
Expand Down Expand Up @@ -133,8 +133,8 @@ where
}

/// This type is stored inside the [`PayloadBuilderService`] type in Reth.
/// There's one instance of this type per node and it is instantiated during the
/// node startup inside `spawn_payload_builder_service`.
/// There's one instance of this type per node, and it is instantiated during
/// the node startup inside `spawn_payload_builder_service`.
///
/// The responsibility of this type is to respond to new payload requests when
/// FCU calls come from the CL Node. Each FCU call will generate a new
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/step/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<P: Platform> StepContext<P> {
}
}

/// Access to the state of the chain at the begining of block that we are
/// Access to the state of the chain at the beginning of block that we are
/// building. This state does not include any changes made by the pipeline
/// during the payload building process. It does however include changes
/// applied by platform-specific [`BlockBuilder::apply_pre_execution_changes`]
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/step/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<P: Platform> StepInstance<P> {
// `PipelineServiceBuilder::spawn_payload_builder_service` for a
// given instance of a `Pipeline<P>`. The service builder owns the
// pipeline instance and is guaranteed to be the only one calling
// the setup function. Also `setup` is a synchronous function.
// the setup function. Also, `setup` is a synchronous function.
#[expect(invalid_reference_casting)]
let step = unsafe {
let ptr = core::ptr::from_ref(step);
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use {context::StepContext, reth::payload::builder::PayloadBuilderError};
/// can be generic over the platform they run on or specialized for a specific
/// platform.
///
/// The instance of the step is long-lived and it's lifetime is equal to the
/// The instance of the step is long-lived, and it's lifetime is equal to the
/// lifetime of the pipeline it is part of. All invocations of the step will
/// repeatedly call into the `step` async function on the same instance.
///
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/step/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn short_type_name(full_name: &str) -> String {
// parameters) separated by the characters that we try to find below.
// Then, each individual typename is shortened to its last path component.
//
// Note: Instead of `find`, `split_inclusive` would be nice but it's still
// Note: Instead of `find`, `split_inclusive` would be nice, but it's still
// unstable...
let mut remainder = full_name;
while let Some(index) =
Expand Down
Loading
Loading