diff --git a/crates/phaser-query/Cargo.toml b/crates/phaser-query/Cargo.toml index ed978dc..bdb0961 100644 --- a/crates/phaser-query/Cargo.toml +++ b/crates/phaser-query/Cargo.toml @@ -51,6 +51,9 @@ uuid = { version = "1.18", features = ["v4"] } tokio-stream = "0.1" alloy-consensus.workspace = true alloy-rlp.workspace = true +chrono = "0.4.42" +ratatui = "0.29.0" +crossterm = "0.29.0" [build-dependencies] tonic-build = "0.13" diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 23276e7..0caf45a 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -1,10 +1,39 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use serde::{Deserialize, Serialize}; // Use the generated admin proto from phaser_query use phaser_query::proto::admin::sync_service_client::SyncServiceClient; use phaser_query::proto::admin::*; +#[derive(Debug, Serialize, Deserialize)] +struct ProgressOutput { + job_id: String, + status: String, + timestamp: String, + total_blocks_synced: u64, + total_blocks: u64, + overall_rate: f64, + total_bytes_written: u64, + workers: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +struct WorkerOutput { + worker_id: u32, + stage: String, + from_block: u64, + to_block: u64, + current_block: u64, + blocks_processed: u64, + rate: f64, + bytes_written: u64, + files_created: u32, + started_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + elapsed: Option, +} + #[derive(Parser, Debug)] #[clap(author, version, about = "CLI for phaser-query admin operations", long_about = None)] struct Args { @@ -80,9 +109,275 @@ enum Commands { Progress { /// Job ID to monitor job_id: String, + + /// Output as JSON (one line per update) + #[clap(long, conflicts_with = "tui")] + json: bool, + + /// Output as TUI (default, overwrites same lines) + #[clap(long, conflicts_with = "json")] + tui: bool, }, } +async fn run_progress_json(mut client: SyncServiceClient, job_id: &str) -> Result<()> { + use futures::StreamExt; + use std::collections::HashMap; + + let request = tonic::Request::new(SyncProgressRequest { + job_id: job_id.to_string(), + }); + + let mut stream = client.stream_sync_progress(request).await?.into_inner(); + let mut completed_workers: HashMap = HashMap::new(); + + while let Some(update) = stream.next().await { + let update = update?; + + let status_str = match SyncStatus::try_from(update.status) { + Ok(SyncStatus::Pending) => "PENDING", + Ok(SyncStatus::Running) => "RUNNING", + Ok(SyncStatus::Completed) => "COMPLETED", + Ok(SyncStatus::Failed) => "FAILED", + Ok(SyncStatus::Cancelled) => "CANCELLED", + _ => "UNKNOWN", + }; + + let timestamp = chrono::DateTime::from_timestamp(update.timestamp, 0) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| update.timestamp.to_string()); + + let workers = update.workers.iter().map(|w| { + let started_at = chrono::DateTime::from_timestamp(w.started_at, 0) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| w.started_at.to_string()); + + let elapsed = if w.stage == "completed" { + if !completed_workers.contains_key(&w.worker_id) { + let elapsed_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - w.started_at; + completed_workers.insert(w.worker_id, elapsed_secs as u64); + } + Some(format_duration(completed_workers[&w.worker_id])) + } else { + let elapsed_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - w.started_at; + Some(format_duration(elapsed_secs as u64)) + }; + + WorkerOutput { + worker_id: w.worker_id, + stage: w.stage.clone(), + from_block: w.from_block, + to_block: w.to_block, + current_block: w.current_block, + blocks_processed: w.blocks_processed, + rate: w.rate, + bytes_written: w.bytes_written, + files_created: w.files_created, + started_at, + elapsed, + } + }).collect(); + + let output = ProgressOutput { + job_id: update.job_id.clone(), + status: status_str.to_string(), + timestamp, + total_blocks_synced: update.total_blocks_synced, + total_blocks: update.total_blocks, + overall_rate: update.overall_rate, + total_bytes_written: update.total_bytes_written, + workers, + }; + + println!("{}", serde_json::to_string(&output)?); + } + + Ok(()) +} + +async fn run_progress_tui(mut client: SyncServiceClient, job_id: &str) -> Result<()> { + use futures::StreamExt; + use ratatui::prelude::*; + use ratatui::widgets::{Block, Borders, Gauge, Paragraph, Row, Table}; + use crossterm::{ + event::{self, Event, KeyCode}, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, + ExecutableCommand, + }; + use std::io::stdout; + use std::time::Duration; + use std::collections::HashMap; + + // Setup terminal + enable_raw_mode()?; + stdout().execute(EnterAlternateScreen)?; + let mut terminal = Terminal::new(CrosstermBackend::new(stdout()))?; + + let request = tonic::Request::new(SyncProgressRequest { + job_id: job_id.to_string(), + }); + + let mut stream = client.stream_sync_progress(request).await?.into_inner(); + let mut completed_workers: HashMap = HashMap::new(); + let mut last_update: Option = None; + let mut should_quit = false; + + while !should_quit { + // Check for keyboard input + if event::poll(Duration::from_millis(100))? { + if let Event::Key(key) = event::read()? { + if key.code == KeyCode::Char('q') { + should_quit = true; + } + } + } + + // Try to get new update + if let Ok(Some(Ok(update))) = tokio::time::timeout( + Duration::from_millis(100), + stream.next() + ).await { + last_update = Some(update); + } + + // Render UI + if let Some(ref update) = last_update { + let status_str = match SyncStatus::try_from(update.status) { + Ok(SyncStatus::Pending) => "PENDING", + Ok(SyncStatus::Running) => "RUNNING", + Ok(SyncStatus::Completed) => "COMPLETED", + Ok(SyncStatus::Failed) => "FAILED", + Ok(SyncStatus::Cancelled) => "CANCELLED", + _ => "UNKNOWN", + }; + + // Update completed workers tracking + for worker in &update.workers { + if worker.stage == "completed" && !completed_workers.contains_key(&worker.worker_id) { + let elapsed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - worker.started_at; + completed_workers.insert(worker.worker_id, elapsed as u64); + } + } + + terminal.draw(|f| { + let chunks = Layout::default() + .direction(Direction::Vertical) + .margin(1) + .constraints([ + Constraint::Length(3), + Constraint::Length(3), + Constraint::Min(5), + Constraint::Length(1), + ]) + .split(f.area()); + + // Job status + let status_text = format!( + "Job: {} | Status: {} | Total: {}/{} blocks", + job_id, status_str, update.total_blocks_synced, update.total_blocks + ); + let status = Paragraph::new(status_text) + .block(Block::default().borders(Borders::ALL).title("Status")); + f.render_widget(status, chunks[0]); + + // Overall progress bar + let percent = if update.total_blocks > 0 { + (update.total_blocks_synced as f64 / update.total_blocks as f64) * 100.0 + } else { + 0.0 + }; + let gauge = Gauge::default() + .block(Block::default().borders(Borders::ALL).title("Overall Progress")) + .gauge_style(Style::default().fg(Color::Green)) + .percent(percent as u16) + .label(format!("{:.1}% | {:.1} blocks/sec | {:.2} GB", + percent, + update.overall_rate, + update.total_bytes_written as f64 / 1_000_000_000.0)); + f.render_widget(gauge, chunks[1]); + + // Workers table + let headers = Row::new(vec!["ID", "Stage", "Blocks", "Current", "Rate", "Elapsed"]) + .style(Style::default().fg(Color::Yellow)); + + let rows: Vec = update.workers.iter().map(|w| { + let blocks_range = format!("{}-{}", w.from_block, w.to_block); + let current = if w.stage == "completed" { + "✓".to_string() + } else { + w.current_block.to_string() + }; + + let elapsed = if w.stage == "completed" { + format_duration(completed_workers[&w.worker_id]) + } else { + let e = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - w.started_at; + format_duration(e as u64) + }; + + Row::new(vec![ + w.worker_id.to_string(), + w.stage.clone(), + blocks_range, + current, + format!("{:.1}/s", w.rate), + elapsed, + ]) + }).collect(); + + let table = Table::new( + rows, + [ + Constraint::Length(4), + Constraint::Length(15), + Constraint::Length(25), + Constraint::Length(12), + Constraint::Length(10), + Constraint::Length(10), + ] + ) + .header(headers) + .block(Block::default().borders(Borders::ALL).title("Workers")); + + f.render_widget(table, chunks[2]); + + // Footer + let footer = Paragraph::new("Press 'q' to quit"); + f.render_widget(footer, chunks[3]); + })?; + + // Check if job finished + if matches!(status_str, "COMPLETED" | "FAILED" | "CANCELLED") { + // Wait a bit so user can see final state + std::thread::sleep(Duration::from_secs(2)); + should_quit = true; + } + } + } + + // Cleanup terminal + disable_raw_mode()?; + stdout().execute(LeaveAlternateScreen)?; + + Ok(()) +} + #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); @@ -389,75 +684,13 @@ async fn main() -> Result<()> { } } } - Commands::Progress { job_id } => { - use futures::StreamExt; - - let request = tonic::Request::new(SyncProgressRequest { - job_id: job_id.clone(), - }); - - let mut stream = client.stream_sync_progress(request).await?.into_inner(); - - println!("Streaming progress for job {}...\n", job_id); - - while let Some(update) = stream.next().await { - let update = update?; - - let status_str = match SyncStatus::try_from(update.status) { - Ok(SyncStatus::Pending) => "PENDING", - Ok(SyncStatus::Running) => "RUNNING", - Ok(SyncStatus::Completed) => "COMPLETED", - Ok(SyncStatus::Failed) => "FAILED", - Ok(SyncStatus::Cancelled) => "CANCELLED", - _ => "UNKNOWN", - }; - - let percent = if update.total_blocks > 0 { - (update.total_blocks_synced as f64 / update.total_blocks as f64) * 100.0 - } else { - 0.0 - }; - - println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); - println!( - "Status: {} | Progress: {}/{} blocks ({:.1}%)", - status_str, update.total_blocks_synced, update.total_blocks, percent - ); - println!( - "Rate: {:.1} blocks/sec | Bytes: {:.2} GB", - update.overall_rate, - update.total_bytes_written as f64 / 1_000_000_000.0 - ); - - if !update.workers.is_empty() { - println!("\nActive Workers: {}", update.workers.len()); - for worker in &update.workers { - let elapsed = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64 - - worker.started_at; - - println!( - " Worker {}: {} | blocks {}-{} | {:.1} blocks/sec | {} elapsed", - worker.worker_id, - worker.stage, - worker.from_block, - worker.to_block, - worker.rate, - format_duration(elapsed as u64) - ); - } - } - - println!(); - - // Check if job is finished - if status_str == "COMPLETED" || status_str == "FAILED" || status_str == "CANCELLED" - { - println!("Job finished with status: {}", status_str); - break; - } + Commands::Progress { job_id, json, tui } => { + if json { + // JSON mode + run_progress_json(client, &job_id).await?; + } else { + // TUI mode (default) + run_progress_tui(client, &job_id).await?; } } }