Skip to content

Commit

Permalink
feat(node-core, stages): stage preparation notification (paradigmxyz#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored and Ruteri committed Apr 17, 2024
1 parent 51857c1 commit 83fdd8a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 3 deletions.
24 changes: 24 additions & 0 deletions crates/node-core/src/events/node.rs
Expand Up @@ -71,6 +71,30 @@ impl<DB> NodeState<DB> {
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {
stage_id,
eta: match &self.current_stage {
Some(current_stage) if current_stage.stage_id == stage_id => {
current_stage.eta
}
_ => Eta::default(),
},
checkpoint,
target,
};

info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
"Preparing stage",
);

self.current_stage = Some(current_stage);
}
PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {
Expand Down
11 changes: 11 additions & 0 deletions crates/stages/src/pipeline/event.rs
Expand Up @@ -14,6 +14,17 @@ use std::fmt::{Display, Formatter};
/// - The pipeline will loop indefinitely unless a target block is set
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PipelineEvent {
/// Emitted when a stage is about to be prepared for a run.
Prepare {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
checkpoint: Option<StageCheckpoint>,
/// The block number up to which the stage is running, if known.
target: Option<BlockNumber>,
},
/// Emitted when a stage is about to be run.
Run {
/// Pipeline stages progress.
Expand Down
76 changes: 76 additions & 0 deletions crates/stages/src/pipeline/mod.rs
Expand Up @@ -371,6 +371,16 @@ where

let exec_input = ExecInput { target, checkpoint: prev_checkpoint };

self.listeners.notify(PipelineEvent::Prepare {
pipeline_stages_progress: event::PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
target,
});

if let Err(err) = stage.execute_ready(exec_input).await {
self.listeners.notify(PipelineEvent::Error { stage_id });
match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
Expand Down Expand Up @@ -611,6 +621,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
Expand All @@ -622,6 +638,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
Expand Down Expand Up @@ -683,6 +705,12 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
Expand All @@ -694,6 +722,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
Expand All @@ -705,6 +739,12 @@ mod tests {
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
Expand Down Expand Up @@ -797,6 +837,12 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
Expand All @@ -808,6 +854,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
Expand Down Expand Up @@ -896,6 +948,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
Expand All @@ -907,6 +965,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
Expand All @@ -926,6 +990,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0)),
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
Expand All @@ -937,6 +1007,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
Expand Down
6 changes: 3 additions & 3 deletions crates/stages/src/stages/headers.rs
Expand Up @@ -120,7 +120,7 @@ where
for (index, header) in self.header_collector.iter()?.enumerate() {
let (_, header_buf) = header?;

if index > 0 && index % interval == 0 {
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
}

Expand All @@ -146,7 +146,7 @@ where
writer.append_header(header, td, header_hash)?;
}

info!(target: "sync::stages::headers", total = total_headers, "Writing header hash index");
info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index");

let mut cursor_header_numbers = tx.cursor_write::<RawTable<tables::HeaderNumbers>>()?;
let mut first_sync = false;
Expand All @@ -166,7 +166,7 @@ where
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;

if index > 0 && index % interval == 0 {
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
}

Expand Down

0 comments on commit 83fdd8a

Please sign in to comment.