Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ exclude = [".github/"]

[workspace.lints.rust]
type_alias_bounds = "allow"
unreachable_pub = "warn"

[workspace.lints.clippy]
tabs_in_doc_comments = "allow"
Expand Down
2 changes: 1 addition & 1 deletion examples/custom-bundle-type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ struct CustomBundleType {
}

impl CustomBundleType {
pub fn with_min_profit(min_profit: U256) -> Self {
fn with_min_profit(min_profit: U256) -> Self {
Self {
txs: Vec::new(),
reverting_txs: Vec::new(),
Expand Down
10 changes: 6 additions & 4 deletions src/pipelines/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,25 @@ use {
/// - In a pipeline with nested pipelines, the top-level pipline's event bus is
/// responsible for handling all events of all contained pipelines and steps.
#[derive(Default, Debug)]
pub struct EventsBus<P: Platform> {
pub(super) struct EventsBus<P: Platform> {
publishers: DashMap<TypeId, Sender<Arc<dyn Any + Send + Sync>>>,
phantom: PhantomData<P>,
}

impl<P: Platform> EventsBus<P> {
/// Publishes an event of type `E` to all current subscribers.
/// Each subscriber will receive a clone of the event.
pub fn publish<E>(&self, event: E)
pub(super) fn publish<E>(&self, event: E)
where
E: Clone + Any + Send + Sync + 'static,
{
let _ = self.sender::<E>().send(Arc::new(event));
}

/// Returns a stream that yields events of type `E`.
pub fn subscribe<E>(&self) -> impl Stream<Item = E> + Send + Sync + 'static
pub(super) fn subscribe<E>(
&self,
) -> impl Stream<Item = E> + Send + Sync + 'static
where
E: Clone + Any + Send + Sync + 'static,
{
Expand Down Expand Up @@ -63,7 +65,7 @@ impl<P: Platform> EventsBus<P> {
}

/// System events emitted by the pipeline itself.
pub mod system_events {
pub(super) mod system_events {
use {
super::*,
derive_more::{Deref, From, Into},
Expand Down
4 changes: 2 additions & 2 deletions src/pipelines/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
PipelineExecutor<P, Provider>
{
/// Begins the execution of a pipeline for a new block/payload job.
pub fn run(
pub(super) fn run(
pipeline: Arc<Pipeline<P>>,
block: BlockContext<P>,
service: Arc<ServiceContext<P, Provider>>,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
}

/// Returns the payload id for which we are building a payload.
pub fn payload_id(&self) -> PayloadId {
pub(super) fn payload_id(&self) -> PayloadId {
self.block.payload_id()
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/pipelines/exec/navi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl StepPath {
/// If the step path points at a nested pipeline, this method will create a
/// navigator that points to the first executable step starting from the
/// nested pipeline.
pub fn navigator<'a, P: Platform>(
pub(crate) fn navigator<'a, P: Platform>(
&self,
root: &'a Pipeline<P>,
) -> Option<StepNavigator<'a, P>> {
Expand Down Expand Up @@ -67,25 +67,25 @@ impl StepPath {
///
/// When this path points to an item, this value is the number of pipelines
/// that contain the item starting from the top-level pipeline.
pub fn depth(&self) -> usize {
pub(crate) fn depth(&self) -> usize {
self.0.len()
}

/// Returns `true` if the path points to a step in a top-level pipeline.
/// This means that this path is inside a pipeline that has no parents.
///
/// In other words, it checks if the path is a single element path.
pub fn is_toplevel(&self) -> bool {
pub(crate) fn is_toplevel(&self) -> bool {
self.depth() == 1
}

/// Returns `true` the the path is pointing to a prologue of a pipeline.
pub fn is_prologue(&self) -> bool {
pub(crate) fn is_prologue(&self) -> bool {
self.leaf() == PROLOGUE_INDEX
}

/// Returns `true` if the path is pointing to an epilogue of a pipeline.
pub fn is_epilogue(&self) -> bool {
pub(crate) fn is_epilogue(&self) -> bool {
self.leaf() == EPILOGUE_INDEX
}
}
Expand Down Expand Up @@ -324,7 +324,7 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
/// deeper into the nested pipeline to find the first executable item.
///
/// In empty pipelines, this will return None.
pub fn entrypoint(pipeline: &'a Pipeline<P>) -> Option<Self> {
pub(crate) fn entrypoint(pipeline: &'a Pipeline<P>) -> Option<Self> {
if pipeline.is_empty() {
return None;
}
Expand All @@ -351,7 +351,7 @@ impl<'a, P: Platform> StepNavigator<'a, P> {

/// Returns a reference to the instance of the step that this path is
/// currently pointing to.
pub fn instance(&self) -> &Arc<StepInstance<P>> {
pub(crate) fn instance(&self) -> &Arc<StepInstance<P>> {
let step_index = self.0.leaf();
let enclosing_pipeline = self.pipeline();

Expand Down Expand Up @@ -380,15 +380,15 @@ impl<'a, P: Platform> StepNavigator<'a, P> {

/// Returns a reference to the immediate enclosing pipeline that contains the
/// current step.
pub fn pipeline(&self) -> &'a Pipeline<P> {
pub(crate) fn pipeline(&self) -> &'a Pipeline<P> {
self.1.last().expect(
"StepNavigator should always have at least one enclosing pipeline",
)
}

/// Returns a reference to the top-level pipeline that contains the
/// current step.
pub fn root_pipeline(&self) -> &'a Pipeline<P> {
pub(crate) fn root_pipeline(&self) -> &'a Pipeline<P> {
self.1.first().expect(
"StepNavigator should always have at least one enclosing pipeline",
)
Expand All @@ -398,7 +398,7 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
/// step's execution returns `ControlFlow::Ok`.
///
/// Returns `None` if there are no more steps to execute in the pipeline.
pub fn next_ok(self) -> Option<Self> {
pub(crate) fn next_ok(self) -> Option<Self> {
if self.is_epilogue() {
// the loop is over.
return self.next_in_parent();
Expand Down Expand Up @@ -447,7 +447,7 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
/// step's execution returns `ControlFlow::Break`.
///
/// Returns `None` if there are no more steps to execute in the pipeline.
pub fn next_break(self) -> Option<Self> {
pub(crate) fn next_break(self) -> Option<Self> {
if self.is_epilogue() {
// the loop is over.
return self.next_in_parent();
Expand Down
31 changes: 17 additions & 14 deletions src/pipelines/exec/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ use {
/// Scopes manage:
/// - The metrics name for each pipeline and its nested pipelines
/// - Limits calculation and renewal for pipeline steps.
pub struct RootScope<P: Platform> {
pub(crate) struct RootScope<P: Platform> {
root: RwLock<Scope<P>>,
current: RefCell<StepPath>,
}

impl<P: Platform> RootScope<P> {
/// Initialize all scopes in a given top-level pipeline.
pub fn new(pipeline: &Pipeline<P>, init_checkpoint: &Checkpoint<P>) -> Self {
pub(crate) fn new(
pipeline: &Pipeline<P>,
init_checkpoint: &Checkpoint<P>,
) -> Self {
let current = RefCell::new(StepPath::empty());
let root = Scope::rooted_at(pipeline, init_checkpoint);
let root = RwLock::new(root);
Expand All @@ -79,7 +82,7 @@ impl<P: Platform> RootScope<P> {
}

/// Given a path to a step in the pipeline, returns its current limits.
pub fn limits_of(&self, step_path: &StepPath) -> Option<Limits> {
pub(crate) fn limits_of(&self, step_path: &StepPath) -> Option<Limits> {
self
.root
.read()
Expand All @@ -88,7 +91,7 @@ impl<P: Platform> RootScope<P> {
}

/// Returns the instant when the scope was last entered.
pub fn entered_at(&self, step_path: &StepPath) -> Option<Instant> {
pub(crate) fn entered_at(&self, step_path: &StepPath) -> Option<Instant> {
self
.root
.read()
Expand All @@ -100,7 +103,7 @@ impl<P: Platform> RootScope<P> {
/// It detects if the next step is in a different scope and enters and leaves
/// scopes accordingly. This will leave and enter all intermediate scopes
/// between the previous and next steps.
pub fn switch_context(
pub(crate) fn switch_context(
&self,
next_step: &StepPath,
checkpoint: &Checkpoint<P>,
Expand Down Expand Up @@ -132,18 +135,18 @@ impl<P: Platform> RootScope<P> {
}
}

pub fn enter(&self, checkpoint: &Checkpoint<P>) {
pub(crate) fn enter(&self, checkpoint: &Checkpoint<P>) {
let mut root = self.root.write();
let limits = root.limits;
root.enter(checkpoint, &limits);
}

pub fn leave(&self) {
pub(crate) fn leave(&self) {
let mut root = self.root.write();
root.leave();
}

pub fn is_active(&self) -> bool {
pub(crate) fn is_active(&self) -> bool {
self.root.read().is_active()
}
}
Expand All @@ -165,7 +168,7 @@ fn scope_of(step: &StepPath) -> StepPath {
/// execution. All steps in a pipeline run within the scopes of the pipelines
/// that contain it. When a scope is active, then all its parent scopes are
/// active as well.
pub struct Scope<P: Platform> {
pub(crate) struct Scope<P: Platform> {
limits: Limits,
metrics: Metrics,
limits_factory: Option<Arc<dyn ScopedLimits<P>>>,
Expand All @@ -178,23 +181,23 @@ pub struct Scope<P: Platform> {
impl<P: Platform> Scope<P> {
/// When a scope is active it means that one of its steps (or in its nested
/// scopes) is currently being executed,
pub const fn is_active(&self) -> bool {
pub(crate) const fn is_active(&self) -> bool {
self.entered_at.is_some()
}

/// Returns the elapsed time since the scope was entered.
/// This will only return a value if the scope is currently active.
pub fn elapsed(&self) -> Option<Duration> {
pub(crate) fn elapsed(&self) -> Option<Duration> {
self.entered_at.map(|start| start.elapsed())
}

/// Returns when the scope was entered most recently.
pub fn started_at(&self) -> Option<Instant> {
pub(crate) fn started_at(&self) -> Option<Instant> {
self.entered_at
}

/// Returns the payload limits for steps running within the current scope.
pub const fn limits(&self) -> &Limits {
pub(crate) const fn limits(&self) -> &Limits {
&self.limits
}
}
Expand Down Expand Up @@ -353,7 +356,7 @@ unsafe impl<P: Platform> Send for Scope<P> {}
unsafe impl<P: Platform> Sync for Scope<P> {}

#[derive(MetricsSet)]
pub struct Metrics {
pub(crate) struct Metrics {
/// Histogram of the number of iterations.
pub iter_count_histogram: Histogram,

Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct Frame<'a, P: Platform> {
}

impl<'a, P: Platform> StepPathIter<'a, P> {
pub fn new(pipeline: &'a Pipeline<P>) -> Self {
pub(crate) fn new(pipeline: &'a Pipeline<P>) -> Self {
Self {
stack: vec![Frame {
pipeline,
Expand Down
8 changes: 4 additions & 4 deletions src/pipelines/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
/// This is a long-running job that will be polled by the CL node until it is
/// resolved. The job future must resolve within 1 second from the moment
/// [`PayloadJob::resolve_kind`] is called with [`PaylodKind::Earliest`].
pub struct PayloadJob<P, Provider>
pub(super) struct PayloadJob<P, Provider>
where
P: Platform,
Provider: traits::ProviderBounds<P>,
Expand All @@ -45,7 +45,7 @@ where
P: Platform,
Provider: traits::ProviderBounds<P>,
{
pub fn new(
pub(super) fn new(
pipeline: &Arc<Pipeline<P>>,
block: BlockContext<P>,
service: &Arc<ServiceContext<P, Provider>>,
Expand Down Expand Up @@ -143,7 +143,7 @@ where
/// This future wraps the `PipelineExecutor` and is used to poll the
/// internal executor of the pipeline. Once this future is resolved, it
/// can be polled again and will return copie of the resolved payload.
pub struct ExecutorFuture<P, Provider>
pub(super) struct ExecutorFuture<P, Provider>
where
P: Platform,
Provider: traits::ProviderBounds<P>,
Expand Down Expand Up @@ -175,7 +175,7 @@ where
P: Platform,
Provider: traits::ProviderBounds<P>,
{
pub fn new(executor: PipelineExecutor<P, Provider>) -> Self {
pub(super) fn new(executor: PipelineExecutor<P, Provider>) -> Self {
Self {
started_at: Instant::now(),
payload_id: executor.payload_id(),
Expand Down
14 changes: 7 additions & 7 deletions src/pipelines/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,38 +317,38 @@ mod tests {

impl TestStep {
#[allow(dead_code)]
pub fn break_after(self, iterations: u32) -> Self {
fn break_after(self, iterations: u32) -> Self {
self.break_after.store(iterations, Ordering::Relaxed);
self
}

pub fn expect_gas_limit(mut self, gas_limit: u64) -> Self {
fn expect_gas_limit(mut self, gas_limit: u64) -> Self {
self.expected_gas_limit = Some(gas_limit);
self
}

pub fn expect_deadline(mut self, deadline: Duration) -> Self {
fn expect_deadline(mut self, deadline: Duration) -> Self {
self.expected_deadline = Some(deadline);
self
}

pub fn expect_minimum_iterations(mut self, iterations: u32) -> Self {
fn expect_minimum_iterations(mut self, iterations: u32) -> Self {
self.minimum_iterations = Some(iterations);
self
}

pub fn expect_maximum_iterations(mut self, iterations: u32) -> Self {
fn expect_maximum_iterations(mut self, iterations: u32) -> Self {
self.maximum_iterations = Some(iterations);
self
}

pub fn sleep_on_step(mut self, duration: Duration) -> Self {
fn sleep_on_step(mut self, duration: Duration) -> Self {
self.sleep_on_step = Some(duration);
self
}

#[track_caller]
pub fn must_run(mut self) -> Self {
fn must_run(mut self) -> Self {
self.must_run = true;
self
}
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/macros/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod set;

pub use set::metrics_set_derive;
pub(crate) use set::metrics_set_derive;

fn rblib_path() -> proc_macro2::TokenStream {
use {
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/macros/src/metrics/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use {
/// before registering.
/// - Adds a simple Debug impl (non-exhaustive) similar to metrics-derive style.
#[allow(clippy::too_many_lines)]
pub fn metrics_set_derive(input: TokenStream) -> TokenStream {
pub(crate) fn metrics_set_derive(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let ident = &input.ident;

Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/macros/src/variants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
/// ```
///
/// This will generate implementations for tuples of size 1 through 10.
pub fn impl_into_pipeline_steps(input: TokenStream) -> TokenStream {
pub(crate) fn impl_into_pipeline_steps(input: TokenStream) -> TokenStream {
let count = parse_macro_input!(input as LitInt);
let count_value = count
.base10_parse::<usize>()
Expand Down
Loading
Loading