diff --git a/crates/bridges/erigon-bridge/src/bridge.rs b/crates/bridges/erigon-bridge/src/bridge.rs index 8857c56..ed38be0 100644 --- a/crates/bridges/erigon-bridge/src/bridge.rs +++ b/crates/bridges/erigon-bridge/src/bridge.rs @@ -564,8 +564,8 @@ impl FlightBridge for ErigonFlightBridge { .await?, ) } - QueryMode::Live { from_block, .. } => { - info!("Creating live stream from block {:?}", from_block); + QueryMode::Live => { + info!("Creating live stream from current head"); let receiver = match stream_type { StreamType::Blocks => self.streaming_service.subscribe_blocks(), StreamType::Transactions => self.streaming_service.subscribe_transactions(), @@ -580,17 +580,6 @@ impl FlightBridge for ErigonFlightBridge { } }) } - QueryMode::Hybrid { - historical_start, - then_follow, - } => { - info!( - "Creating hybrid stream starting at block {}, follow: {}", - historical_start, then_follow - ); - // TODO: Implement hybrid mode (historical then live) - return Err(Status::unimplemented("Hybrid mode not yet implemented")); - } }; let schema = Self::get_schema_for_type(stream_type); diff --git a/crates/phaser-bridge/src/client.rs b/crates/phaser-bridge/src/client.rs index 856f230..2e19edb 100644 --- a/crates/phaser-bridge/src/client.rs +++ b/crates/phaser-bridge/src/client.rs @@ -68,7 +68,12 @@ impl FlightBridgeClient { } } else { // TCP connection - Channel::from_shared(endpoint.clone())?.connect().await? + let uri = if endpoint.starts_with("http://") || endpoint.starts_with("https://") { + endpoint.clone() + } else { + format!("http://{}", endpoint) + }; + Channel::from_shared(uri)?.connect().await? }; let client = FlightClient::new(channel); diff --git a/crates/phaser-bridge/src/descriptors.rs b/crates/phaser-bridge/src/descriptors.rs index 4472c98..6c68d48 100644 --- a/crates/phaser-bridge/src/descriptors.rs +++ b/crates/phaser-bridge/src/descriptors.rs @@ -45,71 +45,16 @@ impl BlockchainDescriptor { } } - /// Create a descriptor for live subscription - pub fn live(stream_type: StreamType, from_block: Option) -> Self { + /// Create a descriptor for live subscription (starts from current head) + pub fn live(stream_type: StreamType) -> Self { Self { stream_type, chain_id: None, - query_mode: QueryMode::Live { - from_block, - buffer_size: 100, - }, + query_mode: QueryMode::Live, subscription_options: None, include_reorgs: false, } } - - /// Create a hybrid descriptor (historical then live) - pub fn hybrid(stream_type: StreamType, historical_start: u64) -> Self { - Self { - stream_type, - chain_id: None, - query_mode: QueryMode::Hybrid { - historical_start, - then_follow: true, - }, - subscription_options: None, - include_reorgs: false, - } - } - - /// For backward compatibility - create from old-style parameters - pub fn from_legacy( - stream_type: StreamType, - chain_id: Option, - start_block: Option, - end_block: Option, - follow_head: bool, - include_reorgs: bool, - ) -> Self { - let query_mode = match (start_block, end_block, follow_head) { - (Some(start), Some(end), false) => QueryMode::Historical { start, end }, - (Some(start), None, true) => QueryMode::Hybrid { - historical_start: start, - then_follow: true, - }, - (None, None, true) | (_, _, true) => QueryMode::Live { - from_block: start_block, - buffer_size: 100, - }, - (Some(start), None, false) => QueryMode::Historical { - start, - end: u64::MAX, - }, - _ => QueryMode::Live { - from_block: None, - buffer_size: 100, - }, - }; - - Self { - stream_type, - chain_id, - query_mode, - subscription_options: None, - include_reorgs, - } - } } impl BlockchainDescriptor { diff --git a/crates/phaser-bridge/src/subscription.rs b/crates/phaser-bridge/src/subscription.rs index 0fc4644..cd49578 100644 --- a/crates/phaser-bridge/src/subscription.rs +++ b/crates/phaser-bridge/src/subscription.rs @@ -79,16 +79,8 @@ pub enum ControlAction { pub enum QueryMode { /// Query historical data between start and end blocks Historical { start: u64, end: u64 }, - /// Subscribe to live data, optionally starting from a specific block - Live { - from_block: Option, - buffer_size: u32, - }, - /// Query historical then transition to live - Hybrid { - historical_start: u64, - then_follow: bool, - }, + /// Subscribe to live data from current head + Live, } /// Data availability information for query planning diff --git a/crates/phaser-query/proto/admin/sync.proto b/crates/phaser-query/proto/admin/sync.proto index 42cfcf2..f1a2d71 100644 --- a/crates/phaser-query/proto/admin/sync.proto +++ b/crates/phaser-query/proto/admin/sync.proto @@ -18,6 +18,9 @@ service SyncService { // Stream real-time progress updates for a sync job rpc StreamSyncProgress(SyncProgressRequest) returns (stream SyncProgressUpdate); + + // Analyze gaps in existing data without starting a sync job + rpc AnalyzeGaps(AnalyzeGapsRequest) returns (AnalyzeGapsResponse); } message SyncRequest { @@ -43,6 +46,44 @@ message SyncResponse { // Whether the request was accepted bool accepted = 3; + + // Gap analysis - existing data overview + GapAnalysis gap_analysis = 4; +} + +message GapAnalysis { + // Total segments in the requested range + uint64 total_segments = 1; + + // Number of complete segments (all data types present) + uint64 complete_segments = 2; + + // Number of missing or incomplete segments + uint64 missing_segments = 3; + + // Completion percentage (0-100) + double completion_percentage = 4; + + // Number of stale temp files cleaned + uint64 cleaned_temp_files = 5; + + // Segments that need syncing + repeated uint64 segments_to_sync = 6; + + // Incomplete segments with details + repeated IncompleteSegment incomplete_details = 7; +} + +message IncompleteSegment { + // Segment number + uint64 segment_num = 1; + + // Block range for this segment + uint64 from_block = 2; + uint64 to_block = 3; + + // Missing data types (e.g., "blocks", "txs", "logs") + repeated string missing_data_types = 4; } message SyncStatusRequest { @@ -81,6 +122,9 @@ message SyncStatusResponse { // Block range uint64 from_block = 10; uint64 to_block = 11; + + // Gap analysis from when job started + GapAnalysis gap_analysis = 12; } message ListSyncJobsRequest { @@ -154,6 +198,31 @@ message WorkerProgress { // Number of parquet files created uint32 files_created = 9; + + // When this worker started (Unix timestamp in seconds) + int64 started_at = 10; +} + +message AnalyzeGapsRequest { + // Chain ID to analyze + uint64 chain_id = 1; + + // Bridge name + string bridge_name = 2; + + // Starting block number (inclusive) + uint64 from_block = 3; + + // Ending block number (inclusive) + uint64 to_block = 4; +} + +message AnalyzeGapsResponse { + // Gap analysis for the requested range + GapAnalysis gap_analysis = 1; + + // Message summarizing the analysis + string message = 2; } enum SyncStatus { diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 0eefb2f..23276e7 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -58,6 +58,29 @@ enum Commands { /// Job ID to cancel job_id: String, }, + /// Analyze gaps in existing data without starting a sync + Analyze { + /// Chain ID + #[clap(short, long)] + chain_id: u64, + + /// Bridge name + #[clap(short, long)] + bridge: String, + + /// Starting block number (inclusive) + #[clap(short, long)] + from: u64, + + /// Ending block number (inclusive) + #[clap(short, long)] + to: u64, + }, + /// Stream live progress updates for a sync job + Progress { + /// Job ID to monitor + job_id: String, + }, } #[tokio::main] @@ -87,6 +110,47 @@ async fn main() -> Result<()> { println!("✓ Sync job started"); println!(" Job ID: {}", resp.job_id); println!(" {}", resp.message); + + // Show gap analysis if available + if let Some(gap) = resp.gap_analysis { + println!("\nGap Analysis:"); + if gap.cleaned_temp_files > 0 { + println!(" Cleaned {} stale temp files", gap.cleaned_temp_files); + } + println!(" Total segments: {}", gap.total_segments); + println!( + " Complete: {} ({:.1}%)", + gap.complete_segments, gap.completion_percentage + ); + println!(" Missing: {}", gap.missing_segments); + + if !gap.incomplete_details.is_empty() && gap.incomplete_details.len() <= 10 { + println!("\n Incomplete segments:"); + for detail in &gap.incomplete_details { + println!( + " Segment {} (blocks {}-{}): missing {}", + detail.segment_num, + detail.from_block, + detail.to_block, + detail.missing_data_types.join(", ") + ); + } + } else if gap.incomplete_details.len() > 10 { + println!( + "\n {} incomplete segments (showing first 5):", + gap.incomplete_details.len() + ); + for detail in gap.incomplete_details.iter().take(5) { + println!( + " Segment {} (blocks {}-{}): missing {}", + detail.segment_num, + detail.from_block, + detail.to_block, + detail.missing_data_types.join(", ") + ); + } + } + } } else { println!("✗ Sync job rejected"); println!(" {}", resp.message); @@ -277,7 +341,140 @@ async fn main() -> Result<()> { println!("✗ {}", resp.message); } } + Commands::Analyze { + chain_id, + bridge, + from, + to, + } => { + let request = tonic::Request::new(AnalyzeGapsRequest { + chain_id, + bridge_name: bridge, + from_block: from, + to_block: to, + }); + + let response = client.analyze_gaps(request).await?; + let resp = response.into_inner(); + + println!("{}", resp.message); + + if let Some(gap) = resp.gap_analysis { + println!("\nGap Analysis:"); + if gap.cleaned_temp_files > 0 { + println!(" Cleaned {} stale temp files", gap.cleaned_temp_files); + } + println!(" Total segments: {}", gap.total_segments); + println!( + " Complete: {} ({:.1}%)", + gap.complete_segments, gap.completion_percentage + ); + println!(" Missing: {}", gap.missing_segments); + + if !gap.incomplete_details.is_empty() { + println!("\n Incomplete segments:"); + for detail in &gap.incomplete_details { + println!( + " Segment {} (blocks {}-{}): missing {}", + detail.segment_num, + detail.from_block, + detail.to_block, + detail.missing_data_types.join(", ") + ); + } + } + + if gap.missing_segments > 0 && gap.segments_to_sync.len() <= 20 { + println!("\n Segments to sync: {:?}", gap.segments_to_sync); + } + } + } + Commands::Progress { job_id } => { + use futures::StreamExt; + + let request = tonic::Request::new(SyncProgressRequest { + job_id: job_id.clone(), + }); + + let mut stream = client.stream_sync_progress(request).await?.into_inner(); + + println!("Streaming progress for job {}...\n", job_id); + + while let Some(update) = stream.next().await { + let update = update?; + + let status_str = match SyncStatus::try_from(update.status) { + Ok(SyncStatus::Pending) => "PENDING", + Ok(SyncStatus::Running) => "RUNNING", + Ok(SyncStatus::Completed) => "COMPLETED", + Ok(SyncStatus::Failed) => "FAILED", + Ok(SyncStatus::Cancelled) => "CANCELLED", + _ => "UNKNOWN", + }; + + let percent = if update.total_blocks > 0 { + (update.total_blocks_synced as f64 / update.total_blocks as f64) * 100.0 + } else { + 0.0 + }; + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!( + "Status: {} | Progress: {}/{} blocks ({:.1}%)", + status_str, update.total_blocks_synced, update.total_blocks, percent + ); + println!( + "Rate: {:.1} blocks/sec | Bytes: {:.2} GB", + update.overall_rate, + update.total_bytes_written as f64 / 1_000_000_000.0 + ); + + if !update.workers.is_empty() { + println!("\nActive Workers: {}", update.workers.len()); + for worker in &update.workers { + let elapsed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - worker.started_at; + + println!( + " Worker {}: {} | blocks {}-{} | {:.1} blocks/sec | {} elapsed", + worker.worker_id, + worker.stage, + worker.from_block, + worker.to_block, + worker.rate, + format_duration(elapsed as u64) + ); + } + } + + println!(); + + // Check if job is finished + if status_str == "COMPLETED" || status_str == "FAILED" || status_str == "CANCELLED" + { + println!("Job finished with status: {}", status_str); + break; + } + } + } } Ok(()) } + +fn format_duration(seconds: u64) -> String { + let hours = seconds / 3600; + let minutes = (seconds % 3600) / 60; + let secs = seconds % 60; + + if hours > 0 { + format!("{}h{}m{}s", hours, minutes, secs) + } else if minutes > 0 { + format!("{}m{}s", minutes, secs) + } else { + format!("{}s", secs) + } +} diff --git a/crates/phaser-query/src/bin/phaser-query.rs b/crates/phaser-query/src/bin/phaser-query.rs index 3da7cfe..f26f9a4 100644 --- a/crates/phaser-query/src/bin/phaser-query.rs +++ b/crates/phaser-query/src/bin/phaser-query.rs @@ -1,7 +1,11 @@ use anyhow::Result; use clap::Parser; -use phaser_query::{streaming_with_writer::StreamingServiceWithWriter, PhaserConfig, PhaserQuery}; +use phaser_query::{ + streaming_with_writer::StreamingServiceWithWriter, LiveStreamingState, PhaserConfig, + PhaserQuery, +}; use std::path::PathBuf; +use std::sync::Arc; use tracing::{error, info}; use tracing_subscriber; @@ -85,6 +89,9 @@ async fn main() -> Result<()> { let phaser = PhaserQuery::new(config.clone()).await?; info!("Initialized phaser-query with catalog"); + // Create shared live streaming state + let live_state = Arc::new(LiveStreamingState::new()); + // Start services based on flags let mut handles = vec![]; @@ -115,9 +122,12 @@ async fn main() -> Result<()> { let config_clone = config.clone(); let catalog = phaser.catalog.clone(); let bridge_clone = bridge.clone(); + let live_state_clone = live_state.clone(); let handle = tokio::spawn(async move { - if let Err(e) = start_streaming_service(config_clone, catalog, bridge_clone).await { + if let Err(e) = + start_streaming_service(config_clone, catalog, bridge_clone, live_state_clone).await + { error!("Streaming service error: {}", e); } }); @@ -147,9 +157,12 @@ async fn main() -> Result<()> { let config_clone = config.clone(); let catalog = phaser.catalog.clone(); let bridge_clone = bridge.clone(); + let live_state_clone = live_state.clone(); let handle = tokio::spawn(async move { - if let Err(e) = start_trie_streaming_service(config_clone, catalog, bridge_clone).await + if let Err(e) = + start_trie_streaming_service(config_clone, catalog, bridge_clone, live_state_clone) + .await { error!("Trie streaming service error: {}", e); } @@ -182,9 +195,10 @@ async fn main() -> Result<()> { ); let config_clone = config.clone(); + let live_state_clone = live_state.clone(); let handle = tokio::spawn(async move { - if let Err(e) = start_sync_admin_server(config_clone).await { + if let Err(e) = start_sync_admin_server(config_clone, live_state_clone).await { error!("Sync admin server error: {}", e); } }); @@ -213,6 +227,7 @@ async fn start_streaming_service( config: PhaserConfig, catalog: std::sync::Arc, bridge: phaser_query::BridgeConfig, + live_state: Arc, ) -> Result<()> { let data_dir = config.bridge_data_dir(bridge.chain_id, &bridge.name); @@ -221,29 +236,14 @@ async fn start_streaming_service( data_dir.clone(), config.max_file_size_mb, config.segment_size, + bridge.chain_id, + bridge.name.clone(), + live_state, ) .await?; info!("Connected to bridge, starting streaming to {:?}", data_dir); - // Start streaming with periodic index updates - let catalog_clone = catalog.clone(); - let config_clone = config.clone(); - - tokio::spawn(async move { - loop { - // Re-index every 60 seconds - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - - info!("Updating indexes..."); - if let Err(e) = - phaser_query::indexer::build_indexes(&catalog_clone, &config_clone).await - { - error!("Failed to update indexes: {}", e); - } - } - }); - service.start_streaming().await?; Ok(()) @@ -253,6 +253,7 @@ async fn start_trie_streaming_service( config: PhaserConfig, catalog: std::sync::Arc, bridge: phaser_query::BridgeConfig, + live_state: Arc, ) -> Result<()> { let data_dir = config.bridge_data_dir(bridge.chain_id, &bridge.name); @@ -262,6 +263,9 @@ async fn start_trie_streaming_service( data_dir, config.max_file_size_mb, config.segment_size, + bridge.chain_id, + bridge.name.clone(), + live_state, ) .await?; @@ -288,10 +292,13 @@ async fn start_rpc_server( Ok(()) } -async fn start_sync_admin_server(config: PhaserConfig) -> Result<()> { +async fn start_sync_admin_server( + config: PhaserConfig, + live_state: Arc, +) -> Result<()> { use phaser_query::sync::SyncServer; - let server = SyncServer::new(std::sync::Arc::new(config.clone())); + let server = SyncServer::new(Arc::new(config.clone()), live_state); server.start(config.sync_admin_port).await?; Ok(()) diff --git a/crates/phaser-query/src/generated/phaser.admin.rs b/crates/phaser-query/src/generated/phaser.admin.rs index ed10f1d..50a6fe1 100644 --- a/crates/phaser-query/src/generated/phaser.admin.rs +++ b/crates/phaser-query/src/generated/phaser.admin.rs @@ -25,6 +25,47 @@ pub struct SyncResponse { /// Whether the request was accepted #[prost(bool, tag = "3")] pub accepted: bool, + /// Gap analysis - existing data overview + #[prost(message, optional, tag = "4")] + pub gap_analysis: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GapAnalysis { + /// Total segments in the requested range + #[prost(uint64, tag = "1")] + pub total_segments: u64, + /// Number of complete segments (all data types present) + #[prost(uint64, tag = "2")] + pub complete_segments: u64, + /// Number of missing or incomplete segments + #[prost(uint64, tag = "3")] + pub missing_segments: u64, + /// Completion percentage (0-100) + #[prost(double, tag = "4")] + pub completion_percentage: f64, + /// Number of stale temp files cleaned + #[prost(uint64, tag = "5")] + pub cleaned_temp_files: u64, + /// Segments that need syncing + #[prost(uint64, repeated, tag = "6")] + pub segments_to_sync: ::prost::alloc::vec::Vec, + /// Incomplete segments with details + #[prost(message, repeated, tag = "7")] + pub incomplete_details: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IncompleteSegment { + /// Segment number + #[prost(uint64, tag = "1")] + pub segment_num: u64, + /// Block range for this segment + #[prost(uint64, tag = "2")] + pub from_block: u64, + #[prost(uint64, tag = "3")] + pub to_block: u64, + /// Missing data types (e.g., "blocks", "txs", "logs") + #[prost(string, repeated, tag = "4")] + pub missing_data_types: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SyncStatusRequest { @@ -66,6 +107,9 @@ pub struct SyncStatusResponse { pub from_block: u64, #[prost(uint64, tag = "11")] pub to_block: u64, + /// Gap analysis from when job started + #[prost(message, optional, tag = "12")] + pub gap_analysis: ::core::option::Option, } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ListSyncJobsRequest { @@ -152,6 +196,33 @@ pub struct WorkerProgress { /// Number of parquet files created #[prost(uint32, tag = "9")] pub files_created: u32, + /// When this worker started (Unix timestamp in seconds) + #[prost(int64, tag = "10")] + pub started_at: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AnalyzeGapsRequest { + /// Chain ID to analyze + #[prost(uint64, tag = "1")] + pub chain_id: u64, + /// Bridge name + #[prost(string, tag = "2")] + pub bridge_name: ::prost::alloc::string::String, + /// Starting block number (inclusive) + #[prost(uint64, tag = "3")] + pub from_block: u64, + /// Ending block number (inclusive) + #[prost(uint64, tag = "4")] + pub to_block: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AnalyzeGapsResponse { + /// Gap analysis for the requested range + #[prost(message, optional, tag = "1")] + pub gap_analysis: ::core::option::Option, + /// Message summarizing the analysis + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] @@ -407,6 +478,31 @@ pub mod sync_service_client { ); self.inner.server_streaming(req, path, codec).await } + /// Analyze gaps in existing data without starting a sync job + pub async fn analyze_gaps( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/phaser.admin.SyncService/AnalyzeGaps", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("phaser.admin.SyncService", "AnalyzeGaps")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -465,6 +561,14 @@ pub mod sync_service_server { tonic::Response, tonic::Status, >; + /// Analyze gaps in existing data without starting a sync job + async fn analyze_gaps( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// Admin service for managing historical data synchronization #[derive(Debug)] @@ -768,6 +872,51 @@ pub mod sync_service_server { }; Box::pin(fut) } + "/phaser.admin.SyncService/AnalyzeGaps" => { + #[allow(non_camel_case_types)] + struct AnalyzeGapsSvc(pub Arc); + impl< + T: SyncService, + > tonic::server::UnaryService + for AnalyzeGapsSvc { + type Response = super::AnalyzeGapsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::analyze_gaps(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = AnalyzeGapsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/crates/phaser-query/src/lib.rs b/crates/phaser-query/src/lib.rs index a4dbbc6..dc1e18f 100644 --- a/crates/phaser-query/src/lib.rs +++ b/crates/phaser-query/src/lib.rs @@ -16,6 +16,65 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::RwLock; + +/// Shared state tracking live streaming boundaries per chain/bridge +/// This allows the sync service to know where live streaming has started +#[derive(Debug, Clone)] +pub struct LiveStreamingState { + /// Map from (chain_id, bridge_name) to the current block number being streamed + /// Set when the first block is received by the streaming service + boundaries: Arc>>, +} + +impl LiveStreamingState { + pub fn new() -> Self { + Self { + boundaries: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Set the live streaming boundary for a chain/bridge + pub async fn set_boundary(&self, chain_id: u64, bridge_name: &str, block_number: u64) { + let mut boundaries = self.boundaries.write().await; + boundaries.insert((chain_id, bridge_name.to_string()), block_number); + } + + /// Get the live streaming boundary for a chain/bridge + /// Returns None if live streaming hasn't started yet for this chain/bridge + pub async fn get_boundary(&self, chain_id: u64, bridge_name: &str) -> Option { + let boundaries = self.boundaries.read().await; + boundaries + .get(&(chain_id, bridge_name.to_string())) + .copied() + } + + /// Wait for live streaming to initialize (with timeout) + /// Returns the boundary block number if initialized within timeout + pub async fn wait_for_boundary( + &self, + chain_id: u64, + bridge_name: &str, + timeout_secs: u64, + ) -> Option { + use tokio::time::{sleep, Duration}; + + let start = std::time::Instant::now(); + while start.elapsed() < Duration::from_secs(timeout_secs) { + if let Some(boundary) = self.get_boundary(chain_id, bridge_name).await { + return Some(boundary); + } + sleep(Duration::from_millis(100)).await; + } + None + } +} + +impl Default for LiveStreamingState { + fn default() -> Self { + Self::new() + } +} /// Main phaser-query service that combines RPC and SQL interfaces pub struct PhaserQuery { diff --git a/crates/phaser-query/src/parquet_writer.rs b/crates/phaser-query/src/parquet_writer.rs index 89ddab2..1119011 100644 --- a/crates/phaser-query/src/parquet_writer.rs +++ b/crates/phaser-query/src/parquet_writer.rs @@ -28,8 +28,6 @@ struct CurrentFile { row_count: usize, start_block: u64, end_block: u64, - segment_start: u64, - segment_end: u64, } impl ParquetWriter { @@ -125,18 +123,13 @@ impl ParquetWriter { } fn start_new_file(&mut self, block_num: u64, schema: arrow_schema::SchemaRef) -> Result<()> { - // Calculate segment boundaries - let segment_start = (block_num / self.segment_size) * self.segment_size; - let segment_end = segment_start + self.segment_size - 1; - - // Create temporary filename showing the actual range we'll write - // Format: {topic}_from_{actual_start}_to_{segment_end}.parquet.tmp - // Live sync starts mid-segment and writes to segment boundary - // Historical sync starts at segment boundary - let filename = format!( - "{}_from_{}_to_{}.parquet.tmp", - self.data_type, block_num, segment_end - ); + // Create temporary filename - will be renamed with actual range when finalized + // Format: {data_type}_temp_{timestamp}.parquet.tmp + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis(); + let filename = format!("{}_{}.parquet.tmp", self.data_type, timestamp); let temp_path = self.data_dir.join(filename); info!( @@ -156,8 +149,6 @@ impl ParquetWriter { row_count: 0, start_block: block_num, end_block: block_num, - segment_start, - segment_end, }); Ok(()) @@ -190,6 +181,12 @@ impl ParquetWriter { builder = builder.set_compression(Compression::SNAPPY); } + // Always enable statistics for the block number column (_block_num) + // This allows us to query min/max block ranges from parquet metadata + // without reading the entire file + builder = + builder.set_column_statistics_enabled("_block_num".into(), EnabledStatistics::Page); + Ok(builder.build()) } @@ -235,14 +232,10 @@ impl ParquetWriter { current.writer.close()?; // Build final filename with actual block range - // Format: {topic}_{segment_start}-{segment_end}_from_{actual_start}_to_{actual_end}.parquet + // Format: {data_type}_from_{start}_to_{end}.parquet let final_filename = format!( - "{}_{}-{}_from_{}_to_{}.parquet", - self.data_type, - current.segment_start, - current.segment_end, - current.start_block, - current.end_block + "{}_from_{}_to_{}.parquet", + self.data_type, current.start_block, current.end_block ); let final_path = self.data_dir.join(final_filename); diff --git a/crates/phaser-query/src/streaming_with_writer.rs b/crates/phaser-query/src/streaming_with_writer.rs index 1299438..637c5ec 100644 --- a/crates/phaser-query/src/streaming_with_writer.rs +++ b/crates/phaser-query/src/streaming_with_writer.rs @@ -21,6 +21,9 @@ pub struct StreamingServiceWithWriter { max_file_size_mb: u64, segment_size: u64, db: Option>, + chain_id: u64, + bridge_name: String, + live_state: Option>, } impl StreamingServiceWithWriter { @@ -29,6 +32,9 @@ impl StreamingServiceWithWriter { data_dir: PathBuf, max_file_size_mb: u64, segment_size: u64, + chain_id: u64, + bridge_name: String, + live_state: Arc, ) -> Result { let mut bridges = Vec::new(); @@ -47,6 +53,9 @@ impl StreamingServiceWithWriter { max_file_size_mb, segment_size, db: None, + chain_id, + bridge_name, + live_state: Some(live_state), }) } @@ -93,33 +102,54 @@ impl StreamingServiceWithWriter { + Unpin + 'static, sender: mpsc::Sender, + live_state: Option>, + chain_id: u64, + bridge_name: String, ) { let stream_name = format!("{:?}", stream_type).to_lowercase(); + let mut first_block_received = false; tokio::spawn(async move { while let Some(batch_result) = stream.next().await { match batch_result { Ok(batch) => { // Special logging for blocks to show block number - if matches!(stream_type, StreamType::Blocks) { + let block_number = if matches!(stream_type, StreamType::Blocks) { + let block_num = batch + .column(0) + .as_any() + .downcast_ref::() + .and_then(|a| if a.len() > 0 { Some(a.value(0)) } else { None }) + .unwrap_or(0); + info!( "Received {} batch with {} rows, block #{}", stream_name, batch.num_rows(), - batch - .column(0) - .as_any() - .downcast_ref::() - .and_then(|a| if a.len() > 0 { Some(a.value(0)) } else { None }) - .unwrap_or(0) + block_num ); + + // Set live streaming boundary on first block + if !first_block_received && block_num > 0 { + if let Some(ref state) = live_state { + info!( + "Live streaming started for chain {} bridge '{}' at block {}", + chain_id, bridge_name, block_num + ); + state.set_boundary(chain_id, &bridge_name, block_num).await; + first_block_received = true; + } + } + + Some(block_num) } else { info!( "Received {} batch with {} rows", stream_name, batch.num_rows() ); - } + None + }; if let Err(e) = sender.send(batch).await { error!("Failed to send {} batch to writer: {}", stream_name, e); @@ -176,22 +206,43 @@ impl StreamingServiceWithWriter { } // Subscribe to blocks - let blocks_descriptor = BlockchainDescriptor::live(StreamType::Blocks, None); + let blocks_descriptor = BlockchainDescriptor::live(StreamType::Blocks); info!("Subscribing to blocks from bridge"); let blocks_stream = bridge.subscribe(&blocks_descriptor).await?; - Self::spawn_stream_processor(StreamType::Blocks, blocks_stream, blocks_tx.clone()); + Self::spawn_stream_processor( + StreamType::Blocks, + blocks_stream, + blocks_tx.clone(), + self.live_state.clone(), + self.chain_id, + self.bridge_name.clone(), + ); // Subscribe to transactions - let txs_descriptor = BlockchainDescriptor::live(StreamType::Transactions, None); + let txs_descriptor = BlockchainDescriptor::live(StreamType::Transactions); info!("Subscribing to transactions from bridge"); let txs_stream = bridge.subscribe(&txs_descriptor).await?; - Self::spawn_stream_processor(StreamType::Transactions, txs_stream, txs_tx.clone()); + Self::spawn_stream_processor( + StreamType::Transactions, + txs_stream, + txs_tx.clone(), + self.live_state.clone(), + self.chain_id, + self.bridge_name.clone(), + ); // Subscribe to logs - let logs_descriptor = BlockchainDescriptor::live(StreamType::Logs, None); + let logs_descriptor = BlockchainDescriptor::live(StreamType::Logs); info!("Subscribing to logs from bridge"); let logs_stream = bridge.subscribe(&logs_descriptor).await?; - Self::spawn_stream_processor(StreamType::Logs, logs_stream, logs_tx.clone()); + Self::spawn_stream_processor( + StreamType::Logs, + logs_stream, + logs_tx.clone(), + self.live_state.clone(), + self.chain_id, + self.bridge_name.clone(), + ); } Ok(()) @@ -280,13 +331,20 @@ impl StreamingServiceWithWriter { } // Subscribe to trie stream - let trie_descriptor = BlockchainDescriptor::live(StreamType::Trie, None); + let trie_descriptor = BlockchainDescriptor::live(StreamType::Trie); info!("Subscribing to trie data from bridge"); match bridge.subscribe(&trie_descriptor).await { Ok(trie_stream) => { info!("Successfully subscribed to trie stream"); - Self::spawn_stream_processor(StreamType::Trie, trie_stream, trie_tx.clone()); + Self::spawn_stream_processor( + StreamType::Trie, + trie_stream, + trie_tx.clone(), + self.live_state.clone(), + self.chain_id, + self.bridge_name.clone(), + ); } Err(e) => { error!("Failed to subscribe to trie stream: {}", e); diff --git a/crates/phaser-query/src/sync/data_scanner.rs b/crates/phaser-query/src/sync/data_scanner.rs index b5bfe7e..879bc3f 100644 --- a/crates/phaser-query/src/sync/data_scanner.rs +++ b/crates/phaser-query/src/sync/data_scanner.rs @@ -1,5 +1,7 @@ use anyhow::{Context, Result}; -use std::fs; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::file::statistics::Statistics; +use std::fs::{self, File}; use std::path::{Path, PathBuf}; use tracing::{debug, info, warn}; @@ -10,6 +12,37 @@ pub struct BlockRange { pub end: u64, } +/// Analysis of what segments need syncing +#[derive(Debug, Clone)] +pub struct GapAnalysis { + pub total_segments: u64, + pub complete_segments: Vec, + pub missing_segments: Vec, + pub incomplete_segments: Vec<(u64, Vec)>, // (segment_num, missing data types) + pub cleaned_temp_files: usize, +} + +impl GapAnalysis { + pub fn complete_count(&self) -> usize { + self.complete_segments.len() + } + + pub fn missing_count(&self) -> usize { + self.missing_segments.len() + } + + pub fn completion_percentage(&self) -> f64 { + if self.total_segments == 0 { + return 100.0; + } + (self.complete_count() as f64 / self.total_segments as f64) * 100.0 + } + + pub fn needs_sync(&self) -> bool { + !self.missing_segments.is_empty() + } +} + /// Scanner for detecting existing blockchain data pub struct DataScanner { data_dir: PathBuf, @@ -53,6 +86,85 @@ impl DataScanner { Ok(ranges) } + /// Read block range from Parquet file statistics + /// This reads only the metadata (file footer), not the actual data + fn read_block_range_from_parquet(&self, path: &Path) -> Result> { + // Only read finalized parquet files (not .tmp files) for statistics + if path.extension().and_then(|s| s.to_str()) != Some("parquet") { + return Ok(None); + } + + let file = match File::open(path) { + Ok(f) => f, + Err(e) => { + warn!("Failed to open parquet file {:?}: {}", path, e); + return Ok(None); + } + }; + + let builder = match ParquetRecordBatchReaderBuilder::try_new(file) { + Ok(b) => b, + Err(e) => { + warn!("Failed to read parquet metadata from {:?}: {}", path, e); + return Ok(None); + } + }; + + let parquet_metadata = builder.metadata(); + let arrow_schema = builder.schema(); + + // Find the _block_num column index + let block_num_col_idx = match arrow_schema.column_with_name("_block_num") { + Some((idx, _field)) => idx, + None => { + debug!("No _block_num column found in {:?}", path); + return Ok(None); + } + }; + + // Iterate through row groups and collect min/max statistics + let mut overall_min: Option = None; + let mut overall_max: Option = None; + + for row_group_idx in 0..parquet_metadata.num_row_groups() { + let row_group_metadata = parquet_metadata.row_group(row_group_idx); + + if block_num_col_idx < row_group_metadata.num_columns() { + let column_metadata = row_group_metadata.column(block_num_col_idx); + + if let Some(stats) = column_metadata.statistics() { + // Parquet stores UInt64 as Int64 at the physical level + // We need to reinterpret the bytes + if let Statistics::Int64(int_stats) = stats { + if let (Some(&min_val), Some(&max_val)) = + (int_stats.min_opt(), int_stats.max_opt()) + { + // Reinterpret as unsigned + let min_u64 = min_val as u64; + let max_u64 = max_val as u64; + + overall_min = Some(overall_min.map_or(min_u64, |m| m.min(min_u64))); + overall_max = Some(overall_max.map_or(max_u64, |m| m.max(max_u64))); + } + } else { + debug!( + "Unexpected statistics type for _block_num column in {:?}", + path + ); + } + } + } + } + + if let (Some(start), Some(end)) = (overall_min, overall_max) { + debug!("Read block range from {:?}: {}-{}", path, start, end); + Ok(Some(BlockRange { start, end })) + } else { + debug!("No statistics found for _block_num in {:?}", path); + Ok(None) + } + } + /// Parse filename to extract block range fn parse_filename(&self, path: &Path) -> Result> { let filename = match path.file_name().and_then(|n| n.to_str()) { @@ -100,51 +212,74 @@ impl DataScanner { Ok(None) } - /// Find the first gap in block coverage or the start of live sync data + /// Find where live streaming data starts by detecting temp files /// Returns the block number where historical sync can safely backfill up to pub fn find_historical_boundary(&self, segment_size: u64) -> Result> { - let ranges = self.scan_existing_ranges()?; - - if ranges.is_empty() { + if !self.data_dir.exists() { info!("No existing data found, historical sync can start from genesis"); return Ok(None); } - // Check for the first gap or find where continuous data starts - let mut expected_start = 0u64; + debug!( + "Scanning for live streaming temp files in {:?}", + self.data_dir + ); - for range in &ranges { - if range.start > expected_start { - // Found a gap! Historical sync should backfill up to range.start - 1 - info!( - "Found gap in data: blocks {} to {} are missing. Historical sync can backfill up to {}", - expected_start, - range.start - 1, - range.start - 1 - ); - return Ok(Some(range.start - 1)); + // Find the lowest block number in temp files (indicates live streaming start) + let mut min_temp_block = None; + let mut temp_files_found = 0; + + for entry in fs::read_dir(&self.data_dir).context("Failed to read data directory")? { + let entry = entry?; + let path = entry.path(); + + if !path.is_file() { + continue; + } + + let filename = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name, + None => continue, + }; + + // Only look at temp files + if !filename.ends_with(".parquet.tmp") { + continue; } - // Update expected start to after this range - expected_start = range.end + 1; + temp_files_found += 1; + debug!("Found temp file: {}", filename); + + // Parse the block range from temp file + if let Some(range) = self.parse_filename(&path)? { + debug!("Parsed range: {}-{}", range.start, range.end); + min_temp_block = + Some(min_temp_block.map_or(range.start, |min: u64| min.min(range.start))); + } else { + debug!("Failed to parse range from: {}", filename); + } } - // No gaps found, but we should find the last contiguous segment boundary - // The last range.end might be in the middle of a segment - // Round down to the previous segment boundary - if let Some(last_range) = ranges.last() { - let last_segment_boundary = (last_range.end / segment_size) * segment_size; - if last_segment_boundary > 0 { + debug!( + "Total temp files found: {}, min_temp_block: {:?}", + temp_files_found, min_temp_block + ); + + if let Some(min_temp) = min_temp_block { + // Round down to segment boundary + let segment_boundary = (min_temp / segment_size) * segment_size; + if segment_boundary > 0 { + let boundary = segment_boundary.saturating_sub(1); info!( - "Found continuous data up to block {}. Historical sync can backfill up to segment boundary {}", - last_range.end, - last_segment_boundary - 1 + "Found live streaming temp files starting at block {}. Historical sync can backfill up to {}", + min_temp, + boundary ); - return Ok(Some(last_segment_boundary - 1)); + return Ok(Some(boundary)); } } - info!("No clear boundary found for historical sync"); + info!("No live streaming temp files found, no boundary needed"); Ok(None) } @@ -173,29 +308,104 @@ impl DataScanner { Ok(summary) } - /// Find and clean incomplete segments, returning which ones need to be synced - /// This method: - /// 1. Finds .tmp files (incomplete segments) - /// 2. Deletes them (they're partial/corrupted) - /// 3. Returns segment numbers that need to be synced - pub fn find_missing_segments( + /// Clean temp files that conflict with segments we're about to sync + /// Only removes .parquet.tmp files for segments in the provided list + /// This prevents deleting active live streaming temp files + pub fn clean_conflicting_temp_files( + &self, + segments: &[u64], + segment_size: u64, + ) -> Result { + if !self.data_dir.exists() { + return Ok(0); + } + + let mut cleaned_count = 0; + + for entry in fs::read_dir(&self.data_dir)? { + let entry = entry?; + let path = entry.path(); + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + // Only process temp files + if !filename_str.ends_with(".parquet.tmp") { + continue; + } + + // Parse the temp file to see what segment it covers + if let Some(range) = self.parse_filename(&path)? { + let file_segment = range.start / segment_size; + + // Only delete if this temp file is for a segment we're about to sync + if segments.contains(&file_segment) { + info!( + "Cleaning conflicting temp file for segment {} (blocks {}-{}): {}", + file_segment, + range.start, + range.end, + path.display() + ); + + if let Err(e) = fs::remove_file(&path) { + warn!("Failed to remove temp file {}: {}", path.display(), e); + } else { + cleaned_count += 1; + } + } else { + debug!( + "Preserving non-conflicting temp file for segment {}: {}", + file_segment, + path.display() + ); + } + } + } + + if cleaned_count > 0 { + info!("Cleaned {} conflicting temp files", cleaned_count); + } + + Ok(cleaned_count) + } + + /// Analyze sync range and find gaps + /// Returns detailed analysis of what needs syncing + pub fn analyze_sync_range( &self, from_block: u64, to_block: u64, segment_size: u64, - ) -> Result> { + ) -> Result { // Calculate total segments in the requested range let first_segment = from_block / segment_size; let last_segment = to_block / segment_size; + let total_segments = last_segment - first_segment + 1; + + info!( + "Analyzing sync range: blocks {}-{} ({} segments)", + from_block, to_block, total_segments + ); let mut missing_segments = Vec::new(); + let mut complete_segments = Vec::new(); + let mut incomplete_segments = Vec::new(); // For detailed logging if !self.data_dir.exists() { - // No data directory means all segments are missing + info!( + "Data directory doesn't exist - all {} segments need syncing", + total_segments + ); for segment_num in first_segment..=last_segment { missing_segments.push(segment_num); } - return Ok(missing_segments); + return Ok(GapAnalysis { + total_segments, + complete_segments: Vec::new(), + missing_segments, + incomplete_segments: Vec::new(), + cleaned_temp_files: 0, + }); } for segment_num in first_segment..=last_segment { @@ -209,48 +419,206 @@ impl DataScanner { self.has_completed_segment("transactions", segment_start, segment_end)?; let logs_complete = self.has_completed_segment("logs", segment_start, segment_end)?; - if !blocks_complete || !txs_complete || !logs_complete { + if blocks_complete && txs_complete && logs_complete { + complete_segments.push(segment_num); + debug!( + "Segment {} (blocks {}-{}) is complete", + segment_num, segment_start, segment_end + ); + } else { + // Track what's missing for better logging + let mut missing_parts = Vec::new(); + if !blocks_complete { + missing_parts.push("blocks"); + } + if !txs_complete { + missing_parts.push("txs"); + } + if !logs_complete { + missing_parts.push("logs"); + } + + info!( + "Segment {} (blocks {}-{}) incomplete - missing: {}", + segment_num, + segment_start, + segment_end, + missing_parts.join(", ") + ); + // Clean any temp files for this segment self.clean_temp_files_for_segment(segment_start, segment_end)?; missing_segments.push(segment_num); + incomplete_segments.push(( + segment_num, + missing_parts.into_iter().map(|s| s.to_string()).collect(), + )); } } - info!( - "Found {} missing segments out of {} total segments in range {}-{}", - missing_segments.len(), - (last_segment - first_segment + 1), - from_block, - to_block - ); + // Summary of gap analysis + if complete_segments.is_empty() { + info!("No existing segments found - full sync required"); + } else { + info!( + "Found {} complete segments that overlap with requested range:", + complete_segments.len() + ); + // Log ranges of complete segments + let mut ranges = Vec::new(); + let mut range_start = None; + let mut range_end = None; + + for &seg in &complete_segments { + if range_start.is_none() { + range_start = Some(seg); + range_end = Some(seg); + } else if range_end == Some(seg - 1) { + // Consecutive + range_end = Some(seg); + } else { + // Gap found, log previous range + if let (Some(start), Some(end)) = (range_start, range_end) { + let block_start = start * segment_size; + let block_end = (end + 1) * segment_size - 1; + ranges.push(format!( + " segments {}-{} (blocks {}-{})", + start, end, block_start, block_end + )); + } + range_start = Some(seg); + range_end = Some(seg); + } + } + // Log final range + if let (Some(start), Some(end)) = (range_start, range_end) { + let block_start = start * segment_size; + let block_end = (end + 1) * segment_size - 1; + ranges.push(format!( + " segments {}-{} (blocks {}-{})", + start, end, block_start, block_end + )); + } - Ok(missing_segments) + for range in ranges { + info!("{}", range); + } + } + + if missing_segments.is_empty() { + info!( + "All {} segments already synced - nothing to do", + total_segments + ); + } else { + info!( + "Need to sync {} missing segments ({}% of range)", + missing_segments.len(), + (missing_segments.len() as f64 / total_segments as f64 * 100.0) as u32 + ); + } + + Ok(GapAnalysis { + total_segments, + complete_segments, + missing_segments, + incomplete_segments, + cleaned_temp_files: 0, // Will be filled in by caller + }) + } + + /// Legacy method - kept for backward compatibility + /// Use analyze_sync_range() for detailed analysis + pub fn find_missing_segments( + &self, + from_block: u64, + to_block: u64, + segment_size: u64, + ) -> Result> { + let analysis = self.analyze_sync_range(from_block, to_block, segment_size)?; + Ok(analysis.missing_segments) } - /// Check if a completed parquet file exists for a specific segment + /// Check if completed parquet file(s) cover a specific segment + /// Now uses Parquet statistics instead of filename parsing + /// One or more files can cover a segment fn has_completed_segment( &self, data_type: &str, segment_start: u64, segment_end: u64, ) -> Result { + // Collect all ranges from completed files for this data type + let mut ranges = Vec::new(); + for entry in fs::read_dir(&self.data_dir)? { let entry = entry?; + let path = entry.path(); let filename = entry.file_name(); let filename_str = filename.to_string_lossy(); - // Look for completed files matching this segment - // Format: {data_type}_{segment_start}-{segment_end}_from_*_to_*.parquet - if filename_str.starts_with(&format!( - "{}_{}-{}_from_", - data_type, segment_start, segment_end - )) && filename_str.ends_with(".parquet") - && !filename_str.ends_with(".parquet.tmp") - { + // Look for completed parquet files for this data type + // Support both old and new formats: + // - Old: {data_type}_{segment_start}-{segment_end}_from_*_to_*.parquet + // - New: {data_type}_from_*_to_*.parquet + let matches_format = (filename_str.starts_with(&format!("{}_from_", data_type)) + || filename_str.starts_with(&format!("{}_", data_type))) + && filename_str.ends_with(".parquet") + && !filename_str.ends_with(".parquet.tmp"); + + if matches_format { + // Read block range from statistics + if let Some(range) = self.read_block_range_from_parquet(&path)? { + // Only consider ranges that overlap with this segment + if range.start <= segment_end && range.end >= segment_start { + debug!( + "File {} covers blocks {}-{} (overlaps segment {}-{})", + filename_str, range.start, range.end, segment_start, segment_end + ); + ranges.push(range); + } + } + } + } + + if ranges.is_empty() { + return Ok(false); + } + + // Sort ranges by start block + ranges.sort_by_key(|r| r.start); + + // Check if the union of ranges covers [segment_start, segment_end] + let mut covered_up_to = segment_start.saturating_sub(1); + + for range in ranges { + // If there's a gap, segment is not complete + if range.start > covered_up_to + 1 { + debug!( + "Gap found for segment {}-{}: covered up to {}, next range starts at {}", + segment_start, segment_end, covered_up_to, range.start + ); + return Ok(false); + } + + // Extend coverage + covered_up_to = covered_up_to.max(range.end); + + // If we've covered the entire segment, we're done + if covered_up_to >= segment_end { + debug!( + "Segment {}-{} fully covered (up to {})", + segment_start, segment_end, covered_up_to + ); return Ok(true); } } + // Check if we covered the entire segment + debug!( + "Segment {}-{} incomplete: only covered up to {}", + segment_start, segment_end, covered_up_to + ); Ok(false) } diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 939a0d0..645a7aa 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -11,6 +11,37 @@ use tonic::{Request, Response, Status}; use tracing::{error, info}; use uuid::Uuid; +use crate::proto::admin::{GapAnalysis as ProtoGapAnalysis, IncompleteSegment}; +use crate::sync::data_scanner::GapAnalysis as DataGapAnalysis; + +/// Convert internal GapAnalysis to proto +fn gap_analysis_to_proto(analysis: &DataGapAnalysis, segment_size: u64) -> ProtoGapAnalysis { + let incomplete_details = analysis + .incomplete_segments + .iter() + .map(|(segment_num, missing_types)| { + let from_block = segment_num * segment_size; + let to_block = from_block + segment_size - 1; + IncompleteSegment { + segment_num: *segment_num, + from_block, + to_block, + missing_data_types: missing_types.clone(), + } + }) + .collect(); + + ProtoGapAnalysis { + total_segments: analysis.total_segments, + complete_segments: analysis.complete_count() as u64, + missing_segments: analysis.missing_count() as u64, + completion_percentage: analysis.completion_percentage(), + cleaned_temp_files: analysis.cleaned_temp_files as u64, + segments_to_sync: analysis.missing_segments.clone(), + incomplete_details, + } +} + /// Job state for tracking sync progress #[derive(Debug, Clone)] struct SyncJobState { @@ -25,19 +56,22 @@ struct SyncJobState { active_workers: u32, error: Option, progress_tracker: ProgressTracker, + gap_analysis: Option, } /// Server implementation for the sync admin service pub struct SyncServer { config: Arc, jobs: Arc>>, + live_state: Arc, } impl SyncServer { - pub fn new(config: Arc) -> Self { + pub fn new(config: Arc, live_state: Arc) -> Self { Self { config, jobs: Arc::new(RwLock::new(HashMap::new())), + live_state, } } @@ -67,6 +101,7 @@ impl SyncServer { from_block: u64, to_block: u64, progress_tracker: ProgressTracker, + historical_boundary: Option, ) -> Result<()> { // Update status to RUNNING { @@ -88,13 +123,45 @@ impl SyncServer { let data_dir = config.bridge_data_dir(chain_id, &bridge_name); let scanner = DataScanner::new(data_dir.clone()); - let missing_segments = scanner - .find_missing_segments(from_block, to_block, segment_size) - .map_err(|e| anyhow::anyhow!("Failed to find missing segments: {}", e))?; + // Use boundary from LiveStreamingState (already computed in start_sync) + // This is more reliable than scanning temp files + + // Analyze what needs syncing + let mut analysis = scanner + .analyze_sync_range(from_block, to_block, segment_size) + .map_err(|e| anyhow::anyhow!("Failed to analyze sync range: {}", e))?; + + // Filter out segments >= live sync boundary to avoid cleaning active live streaming temp files + let segments_to_clean: Vec = if let Some(boundary) = historical_boundary { + let live_segment = (boundary + 1) / segment_size; + analysis + .missing_segments + .iter() + .filter(|&seg| *seg < live_segment) + .copied() + .collect() + } else { + analysis.missing_segments.clone() + }; - let total_segments = missing_segments.len() as u64; + // Clean only temp files that conflict with segments we're about to sync (excluding live sync segments) + info!("Cleaning conflicting temp files in {:?}", data_dir); + let cleaned_count = scanner + .clean_conflicting_temp_files(&segments_to_clean, segment_size) + .map_err(|e| anyhow::anyhow!("Failed to clean temp files: {}", e))?; - if total_segments == 0 { + analysis.cleaned_temp_files = cleaned_count; + + // Log summary for CLI/API consumers + info!( + "Gap analysis: {}/{} segments complete ({:.1}%), {} need syncing", + analysis.complete_count(), + analysis.total_segments, + analysis.completion_percentage(), + analysis.missing_count() + ); + + if !analysis.needs_sync() { info!( "All segments already synced for range {}-{}", from_block, to_block @@ -109,6 +176,9 @@ impl SyncServer { return Ok(()); } + let missing_segments = analysis.missing_segments; + let total_segments = missing_segments.len() as u64; + info!( "Found {} segments to sync ({} blocks per segment)", total_segments, segment_size @@ -162,7 +232,7 @@ impl SyncServer { worker_id, segment_num, segment_from, segment_to ); - // Create and run worker for this segment with timeout + // Create and run worker for this segment let mut worker = SyncWorker::new( worker_id, bridge_endpoint.clone(), @@ -176,16 +246,11 @@ impl SyncServer { ) .with_progress_tracker(progress_tracker.clone()); - // Add 10 minute timeout per segment - let result = - tokio::time::timeout(std::time::Duration::from_secs(600), worker.run()) - .await; - - match result { - Ok(Ok(())) => { + match worker.run().await { + Ok(()) => { info!("Worker {} completed segment {}", worker_id, segment_num); } - Ok(Err(e)) => { + Err(e) => { error!( "Worker {} failed on segment {}: {}", worker_id, segment_num, e @@ -195,14 +260,6 @@ impl SyncServer { // Continue to next segment instead of stopping worker } - Err(_) => { - error!( - "Worker {} timeout on segment {} after 10 minutes", - worker_id, segment_num - ); - worker_errors += 1; - failed_segments.lock().await.push(segment_num); - } } } @@ -304,31 +361,31 @@ impl SyncService for SyncServer { )); } - // Get data directory for this bridge to scan for existing data - let data_dir = self.config.bridge_data_dir(req.chain_id, &req.bridge_name); - let scanner = DataScanner::new(data_dir); + // Wait for live streaming to initialize (if it's running) + // This gives us the exact boundary where historical sync should stop + info!("Waiting for live streaming boundary (timeout: 10 seconds)..."); + let historical_boundary = self + .live_state + .wait_for_boundary(req.chain_id, &req.bridge_name, 10) + .await; - // Find where live sync data starts (if any) - let historical_boundary = scanner - .find_historical_boundary(self.config.segment_size) - .map_err(|e| Status::internal(format!("Failed to scan existing data: {}", e)))?; + // Determine final to_block based on live streaming boundary + let to_block = if let Some(boundary_block) = historical_boundary { + // Live streaming has started - historical sync goes right up to where it started + let safe_boundary = boundary_block.saturating_sub(1); - // Determine final to_block - let to_block = if let Some(boundary) = historical_boundary { - // Live sync data detected, ensure we don't overlap - if req.to_block > boundary { + if req.to_block > safe_boundary { info!( - "Live sync detected at block {}. Adjusting to_block from {} to {}", - boundary + 1, - req.to_block, - boundary + "Live streaming detected at block {}. Adjusting to_block from {} to {}", + boundary_block, req.to_block, safe_boundary ); - boundary + safe_boundary } else { req.to_block } } else { - // No live sync data, use requested to_block + // No live streaming detected - use full requested range + info!("No live streaming boundary detected, using full requested range"); req.to_block }; @@ -346,6 +403,36 @@ impl SyncService for SyncServer { // Generate job ID let job_id = Uuid::new_v4().to_string(); + // Perform gap analysis before starting job + let data_dir = self.config.bridge_data_dir(req.chain_id, &req.bridge_name); + let scanner = DataScanner::new(data_dir.clone()); + + // Analyze what needs syncing + let mut gap_analysis = scanner + .analyze_sync_range(req.from_block, to_block, self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to analyze sync range: {}", e)))?; + + // Filter out segments >= live sync boundary to avoid cleaning active live streaming temp files + let segments_to_clean: Vec = if let Some(boundary_block) = historical_boundary { + let live_segment = boundary_block / self.config.segment_size; + gap_analysis + .missing_segments + .iter() + .filter(|&seg| *seg < live_segment) + .copied() + .collect() + } else { + // No live streaming - safe to clean all missing segments + gap_analysis.missing_segments.clone() + }; + + // Clean only temp files that conflict with segments we're about to sync (excluding live sync segments) + let cleaned_count = scanner + .clean_conflicting_temp_files(&segments_to_clean, self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to clean temp files: {}", e)))?; + + gap_analysis.cleaned_temp_files = cleaned_count; + // Create progress tracker let progress_tracker = Arc::new(RwLock::new(HashMap::new())); @@ -362,6 +449,7 @@ impl SyncService for SyncServer { active_workers: 0, error: None, progress_tracker: progress_tracker.clone(), + gap_analysis: Some(gap_analysis.clone()), }; // Store job state @@ -390,6 +478,7 @@ impl SyncService for SyncServer { from_block, to_block, progress_tracker, + historical_boundary, ) .await { @@ -399,13 +488,26 @@ impl SyncService for SyncServer { info!("Created sync job {}", job_id); - Ok(Response::new(SyncResponse { - job_id, - message: format!( + let message = if to_block != req.to_block { + format!( + "Sync job created for blocks {}-{} on chain {} via bridge '{}' (adjusted from {} to avoid live sync overlap)", + req.from_block, to_block, req.chain_id, req.bridge_name, req.to_block + ) + } else { + format!( "Sync job created for blocks {}-{} on chain {} via bridge '{}'", req.from_block, to_block, req.chain_id, req.bridge_name - ), + ) + }; + + Ok(Response::new(SyncResponse { + job_id, + message, accepted: true, + gap_analysis: Some(gap_analysis_to_proto( + &gap_analysis, + self.config.segment_size, + )), })) } @@ -464,6 +566,10 @@ impl SyncService for SyncServer { bridge_name: job.bridge_name.clone(), from_block: job.from_block, to_block: job.to_block, + gap_analysis: job + .gap_analysis + .as_ref() + .map(|ga| gap_analysis_to_proto(ga, self.config.segment_size)), })) } @@ -497,6 +603,10 @@ impl SyncService for SyncServer { bridge_name: job.bridge_name.clone(), from_block: job.from_block, to_block: job.to_block, + gap_analysis: job + .gap_analysis + .as_ref() + .map(|ga| gap_analysis_to_proto(ga, self.config.segment_size)), }) .collect(); @@ -534,6 +644,80 @@ impl SyncService for SyncServer { })) } + async fn analyze_gaps( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + // Verify bridge is configured + let _bridge = self + .config + .get_bridge(req.chain_id, &req.bridge_name) + .ok_or_else(|| { + Status::not_found(format!( + "Bridge '{}' for chain {} not found in configuration", + req.bridge_name, req.chain_id + )) + })?; + + // Perform gap analysis + let data_dir = self.config.bridge_data_dir(req.chain_id, &req.bridge_name); + let scanner = DataScanner::new(data_dir.clone()); + + // Analyze what needs syncing + let mut gap_analysis = scanner + .analyze_sync_range(req.from_block, req.to_block, self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to analyze sync range: {}", e)))?; + + // Get historical boundary from LiveStreamingState to avoid cleaning live streaming temp files + let historical_boundary = self + .live_state + .get_boundary(req.chain_id, &req.bridge_name) + .await; + + // Filter out segments >= live sync boundary to avoid cleaning active live streaming temp files + let segments_to_clean: Vec = if let Some(boundary_block) = historical_boundary { + let live_segment = boundary_block / self.config.segment_size; + gap_analysis + .missing_segments + .iter() + .filter(|&seg| *seg < live_segment) + .copied() + .collect() + } else { + // No live streaming - safe to clean all missing segments + gap_analysis.missing_segments.clone() + }; + + // Clean only temp files that conflict with segments we're analyzing (excluding live sync segments) + let cleaned_count = scanner + .clean_conflicting_temp_files(&segments_to_clean, self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to clean temp files: {}", e)))?; + + gap_analysis.cleaned_temp_files = cleaned_count; + + let message = if gap_analysis.needs_sync() { + format!( + "{}/{} segments complete ({:.1}%), {} need syncing", + gap_analysis.complete_count(), + gap_analysis.total_segments, + gap_analysis.completion_percentage(), + gap_analysis.missing_count() + ) + } else { + format!("All {} segments complete", gap_analysis.total_segments) + }; + + Ok(Response::new(AnalyzeGapsResponse { + gap_analysis: Some(gap_analysis_to_proto( + &gap_analysis, + self.config.segment_size, + )), + message, + })) + } + type StreamSyncProgressStream = std::pin::Pin> + Send>>; @@ -562,6 +746,38 @@ impl SyncService for SyncServer { let update = { let jobs_lock = jobs.read().await; if let Some(job) = jobs_lock.get(&job_id) { + // Read worker progress + let progress_lock = job.progress_tracker.read().await; + let workers: Vec = progress_lock + .values() + .map(|p| { + let started_at = p.started_at + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + let elapsed = p.started_at.elapsed().unwrap_or_default().as_secs(); + let rate = if elapsed > 0 { + p.blocks_processed as f64 / elapsed as f64 + } else { + 0.0 + }; + + WorkerProgress { + worker_id: p.worker_id, + stage: p.current_phase.clone(), + from_block: p.from_block, + to_block: p.to_block, + current_block: p.current_block, + blocks_processed: p.blocks_processed, + rate, + bytes_written: p.bytes_written, + files_created: p.files_created, + started_at, + } + }) + .collect(); + Some(SyncProgressUpdate { job_id: job.job_id.clone(), status: job.status, @@ -569,7 +785,7 @@ impl SyncService for SyncServer { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64, - workers: vec![], // TODO: Actual worker progress + workers, total_blocks_synced: job.blocks_synced, total_blocks: job.to_block - job.from_block + 1, overall_rate: 0.0, diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index 8631720..99dd3b9 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -20,6 +20,11 @@ pub struct WorkerProgress { pub blocks_completed: bool, pub transactions_completed: bool, pub logs_completed: bool, + pub started_at: std::time::SystemTime, + pub current_block: u64, + pub blocks_processed: u64, + pub bytes_written: u64, + pub files_created: u32, } pub type ProgressTracker = Arc>>; @@ -75,9 +80,20 @@ impl SyncWorker { blocks_done: bool, txs_done: bool, logs_done: bool, + current_block: u64, + blocks_processed: u64, + bytes_written: u64, + files_created: u32, ) { if let Some(tracker) = &self.progress_tracker { let mut tracker_lock = tracker.write().await; + + // Get existing started_at or use current time for new worker + let started_at = tracker_lock + .get(&self.worker_id) + .map(|p| p.started_at) + .unwrap_or_else(std::time::SystemTime::now); + tracker_lock.insert( self.worker_id, WorkerProgress { @@ -88,6 +104,11 @@ impl SyncWorker { blocks_completed: blocks_done, transactions_completed: txs_done, logs_completed: logs_done, + started_at, + current_block, + blocks_processed, + bytes_written, + files_created, }, ); } @@ -100,7 +121,8 @@ impl SyncWorker { ); // Initialize progress - self.update_progress("blocks", false, false, false).await; + self.update_progress("blocks", false, false, false, self.from_block, 0, 0, 0) + .await; // Connect to bridge via Arrow Flight let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) @@ -110,21 +132,50 @@ impl SyncWorker { info!("Worker {} connected to bridge", self.worker_id); // Sync blocks, transactions, and logs - self.sync_blocks(&mut client).await?; - self.update_progress("transactions", true, false, false) - .await; - - self.sync_transactions(&mut client).await?; - self.update_progress("logs", true, true, false).await; - - self.sync_logs(&mut client).await?; - self.update_progress("completed", true, true, true).await; + let (blocks_processed, blocks_bytes) = self.sync_blocks(&mut client).await?; + self.update_progress( + "transactions", + true, + false, + false, + self.to_block, + blocks_processed, + blocks_bytes, + 1, + ) + .await; + + let (txs_processed, txs_bytes) = self.sync_transactions(&mut client).await?; + self.update_progress( + "logs", + true, + true, + false, + self.to_block, + blocks_processed + txs_processed, + blocks_bytes + txs_bytes, + 2, + ) + .await; + + let (logs_processed, logs_bytes) = self.sync_logs(&mut client).await?; + self.update_progress( + "completed", + true, + true, + true, + self.to_block, + blocks_processed + txs_processed + logs_processed, + blocks_bytes + txs_bytes + logs_bytes, + 3, + ) + .await; info!("Worker {} completed sync successfully", self.worker_id); Ok(()) } - async fn sync_blocks(&mut self, client: &mut FlightBridgeClient) -> Result<()> { + async fn sync_blocks(&mut self, client: &mut FlightBridgeClient) -> Result<(u64, u64)> { info!( "Worker {} syncing blocks {}-{}", self.worker_id, self.from_block, self.to_block @@ -148,7 +199,8 @@ impl SyncWorker { .await .context("Failed to subscribe to block stream")?; - let mut blocks_processed = 0u64; + let mut batches_processed = 0u64; + let mut bytes_written = 0u64; while let Some(batch_result) = stream.next().await { let batch = batch_result.context("Failed to receive block batch")?; @@ -158,25 +210,28 @@ impl SyncWorker { batch.num_rows() ); + let batch_bytes = batch.get_array_memory_size() as u64; + bytes_written += batch_bytes; + // Write Arrow RecordBatch directly to parquet writer .write_batch(batch) .await .context("Failed to write block batch")?; - blocks_processed += 1; + batches_processed += 1; } writer.finalize_current_file()?; info!( - "Worker {} completed block sync ({} batches)", - self.worker_id, blocks_processed + "Worker {} completed block sync ({} batches, {} bytes)", + self.worker_id, batches_processed, bytes_written ); - Ok(()) + Ok((batches_processed, bytes_written)) } - async fn sync_transactions(&mut self, client: &mut FlightBridgeClient) -> Result<()> { + async fn sync_transactions(&mut self, client: &mut FlightBridgeClient) -> Result<(u64, u64)> { info!( "Worker {} syncing transactions {}-{}", self.worker_id, self.from_block, self.to_block @@ -204,6 +259,7 @@ impl SyncWorker { .context("Failed to subscribe to transaction stream")?; let mut batches_processed = 0u64; + let mut bytes_written = 0u64; while let Some(batch_result) = stream.next().await { let batch = batch_result.context("Failed to receive transaction batch")?; @@ -213,6 +269,9 @@ impl SyncWorker { batch.num_rows() ); + let batch_bytes = batch.get_array_memory_size() as u64; + bytes_written += batch_bytes; + // Write Arrow RecordBatch directly to parquet writer .write_batch(batch) @@ -225,13 +284,13 @@ impl SyncWorker { writer.finalize_current_file()?; info!( - "Worker {} completed transaction sync ({} batches)", - self.worker_id, batches_processed + "Worker {} completed transaction sync ({} batches, {} bytes)", + self.worker_id, batches_processed, bytes_written ); - Ok(()) + Ok((batches_processed, bytes_written)) } - async fn sync_logs(&mut self, client: &mut FlightBridgeClient) -> Result<()> { + async fn sync_logs(&mut self, client: &mut FlightBridgeClient) -> Result<(u64, u64)> { info!( "Worker {} syncing logs {}-{}", self.worker_id, self.from_block, self.to_block @@ -256,6 +315,7 @@ impl SyncWorker { .context("Failed to subscribe to log stream")?; let mut batches_processed = 0u64; + let mut bytes_written = 0u64; while let Some(batch_result) = stream.next().await { let batch = batch_result.context("Failed to receive log batch")?; @@ -265,6 +325,9 @@ impl SyncWorker { batch.num_rows() ); + let batch_bytes = batch.get_array_memory_size() as u64; + bytes_written += batch_bytes; + // Write Arrow RecordBatch directly to parquet writer .write_batch(batch) @@ -277,9 +340,9 @@ impl SyncWorker { writer.finalize_current_file()?; info!( - "Worker {} completed log sync ({} batches)", - self.worker_id, batches_processed + "Worker {} completed log sync ({} batches, {} bytes)", + self.worker_id, batches_processed, bytes_written ); - Ok(()) + Ok((batches_processed, bytes_written)) } } diff --git a/docs/parquet-files.md b/docs/parquet-files.md new file mode 100644 index 0000000..5e633b7 --- /dev/null +++ b/docs/parquet-files.md @@ -0,0 +1,238 @@ +# Parquet File Format and Naming + +## Overview + +Phaser uses Parquet files to store blockchain data (blocks, transactions, logs). Files are organized by data type and block range, with metadata stored in Parquet column statistics for efficient querying. + +## Filename Convention + +### Final Files +``` +{data_type}_from_{start_block}_to_{end_block}.parquet +``` + +Examples: +- `blocks_from_0_to_99999.parquet` +- `transactions_from_100000_to_199999.parquet` +- `logs_from_200000_to_299999.parquet` + +**Key points:** +- `data_type`: One of `blocks`, `transactions`, or `logs` +- `start_block` and `end_block`: The actual block range contained in the file +- Range is **inclusive** on both ends +- No segment information in filename - segments are a logical concept for parallel processing + +### Temporary Files During Writing +``` +{data_type}_{timestamp}.parquet.tmp +``` + +Examples: +- `blocks_1733160000123.parquet.tmp` +- `transactions_1733160001456.parquet.tmp` + +**Key points:** +- `.tmp` extension indicates file is actively being written +- Timestamp (Unix epoch milliseconds) ensures uniqueness +- Renamed to final format when file is finalized +- Temporary files are cleaned up on worker restart if incomplete + +## File Metadata + +### Block Range Statistics + +Every Parquet file has **column-level statistics** enabled for the `_block_num` column: + +```rust +// In ParquetWriter +builder.set_column_statistics_enabled("_block_num".into(), EnabledStatistics::Page); +``` + +This stores min/max block numbers in the Parquet file footer, allowing: +- **Fast gap detection**: Read block ranges without opening files +- **Efficient queries**: Skip files outside requested range +- **Resume logic**: Determine what data exists without scanning contents + +Reading statistics: +```rust +let file = File::open(path)?; +let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; +let metadata = builder.metadata(); + +// Read min/max from row group statistics +for row_group in metadata.row_groups() { + let column_stats = row_group.column(block_num_col_idx).statistics(); + // Statistics are stored as Int64 at Parquet physical level + if let Statistics::Int64(int_stats) = stats { + let min_block = int_stats.min_opt().map(|&v| v as u64); + let max_block = int_stats.max_opt().map(|&v| v as u64); + } +} +``` + +**Note**: Parquet stores UInt64 as Int64 at the physical level, so we cast when reading. + +## File Creation Process + +### 1. Start New File +```rust +ParquetWriter::start_new_file(block_num, schema) +``` +- Creates temporary file with timestamp +- Initializes ArrowWriter with schema and WriterProperties +- Tracks `start_block` (set to first block number written) + +### 2. Write Batches +```rust +ParquetWriter::write_batch(batch) +``` +- Writes RecordBatch to current file +- Updates `end_block` to latest block written +- Checks if file should be finalized (segment boundary or size limit) + +### 3. Finalize File +```rust +ParquetWriter::finalize_current_file() +``` +- Closes writer (flushes all data and writes footer with statistics) +- Renames `.tmp` file to final format with actual block range +- Logs completion with row count and block range + +### 4. Finalization Triggers + +Files are finalized and a new file started when: +1. **Segment boundary crossed**: `(block_num / segment_size) != (start_block / segment_size)` +2. **Size limit exceeded**: File size >= `max_file_size_bytes` + +This means: +- Multiple files can cover a single segment +- Files never span segment boundaries +- File sizes are bounded + +## Multiple Files Per Segment + +Segments are logical units for parallel processing (e.g., 500K blocks). Physical files can be smaller: + +``` +Segment 0 (blocks 0-499,999): + - blocks_from_0_to_249999.parquet + - blocks_from_250000_to_499999.parquet + +Segment 1 (blocks 500,000-999,999): + - blocks_from_500000_to_999999.parquet +``` + +Gap detection checks if the **union** of file ranges covers the segment: +```rust +// Sort files by start block +ranges.sort_by_key(|r| r.start); + +// Check continuous coverage +let mut covered_up_to = segment_start - 1; +for range in ranges { + if range.start > covered_up_to + 1 { + return false; // Gap found + } + covered_up_to = covered_up_to.max(range.end); +} + +covered_up_to >= segment_end // Segment complete if covered to end +``` + +## Configuration + +Parquet writing is configured per data directory: + +```yaml +# config.yaml +max_file_size_mb: 500 # Maximum file size before finalization +segment_size: 500000 # Logical segment size for parallelization + +parquet: + default_compression: "zstd" + row_group_size_mb: 128 + column_options: + data: + compression: "zstd" + encoding: "plain" + _block_num: + statistics: "page" # Always enabled regardless of config +``` + +**Note**: Statistics for `_block_num` are **always** enabled at the Page level, regardless of config, as they are essential for gap detection and query optimization. + +## Historical vs Live Streaming + +### Historical Sync +- Fetches specific block ranges via Historical mode +- Writes to Parquet files with known boundaries +- Multiple workers can write different segments in parallel +- Files are immediately finalized when range complete + +### Live Streaming +- Subscribes to current head via Live mode +- Writes to Parquet files as blocks arrive +- Sets `LiveStreamingState` boundary when first block received +- Files are finalized at segment boundaries or size limits +- Creates `.tmp` files that are renamed when finalized + +The boundary between historical and live data is tracked in `LiveStreamingState` to ensure no gaps or overlaps. + +## Cleanup and Recovery + +### Worker Restart +When a sync worker restarts: +1. Scans existing `.parquet` files to find completed ranges +2. Deletes orphaned `.parquet.tmp` files from previous run +3. Identifies gaps in segment coverage +4. Resumes from gaps, not from beginning + +### Interrupted Writes +If a write is interrupted: +- `.tmp` file remains on disk +- On restart, worker detects gap and re-fetches that range +- Old `.tmp` file is deleted before starting new worker +- No corrupted data in final `.parquet` files + +## Example: File Lifecycle + +``` +1. Worker starts segment 0 (blocks 0-499,999) + Create: blocks_1733160000123.parquet.tmp + +2. Write blocks 0-99,999 + Update: end_block = 99999 + +3. Write blocks 100,000-199,999 + Update: end_block = 199999 + +4. Block 200,000 arrives, size limit reached + Close writer, rename to: blocks_from_0_to_199999.parquet + Create: blocks_1733160005678.parquet.tmp + +5. Write blocks 200,000-299,999 + Update: end_block = 299999 + +6. Write blocks 300,000-399,999 + Update: end_block = 399999 + +7. Write blocks 400,000-499,999 + Update: end_block = 499999 + +8. Block 500,000 arrives, segment boundary crossed + Close writer, rename to: blocks_from_200000_to_499999.parquet + +Result: Segment 0 covered by 2 files +``` + +## Querying Files + +To find data for a specific block range: + +1. **List files** in data directory +2. **Read Parquet metadata** for each file (cheap - just footer) +3. **Check block range** from `_block_num` statistics +4. **Skip files** outside query range +5. **Open relevant files** and apply additional filters + +This allows efficient queries without scanning all files.