diff --git a/.gitignore b/.gitignore index 34100eb..841301e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ test-data/streaming test-data/rocksdb erigon-bridge.log commit-changes.sh +*config.yaml diff --git a/Cargo.lock b/Cargo.lock index 26d2b09..ea8a980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3785,6 +3785,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-stream", diff --git a/crates/bridges/erigon-bridge/src/bridge.rs b/crates/bridges/erigon-bridge/src/bridge.rs index d5087fc..8857c56 100644 --- a/crates/bridges/erigon-bridge/src/bridge.rs +++ b/crates/bridges/erigon-bridge/src/bridge.rs @@ -292,13 +292,76 @@ impl ErigonFlightBridge { } } StreamType::Logs => { - error!("Historical log streaming not yet implemented"); - yield Err(arrow_flight::error::FlightError::ExternalError( - Box::new(std::io::Error::new( - std::io::ErrorKind::Unsupported, - "Historical log streaming not yet implemented" - )) - )); + // For logs, we need to execute blocks to generate receipts + let mut client_guard = blockdata_client.lock().await; + + // First get blocks to extract timestamps + let mut timestamps = HashMap::new(); + let mut block_stream = match client_guard.stream_blocks(start, end, 100).await { + Ok(s) => s, + Err(e) => { + error!("Failed to start block stream for timestamps: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + return; + } + }; + + // Collect timestamps + while let Some(batch_result) = block_stream.message().await.transpose() { + if let Ok(block_batch) = batch_result { + use alloy_consensus::Header; + use alloy_rlp::Decodable; + for block in &block_batch.blocks { + // Decode RLP header to get timestamp + if let Ok(header) = Header::decode(&mut block.rlp_header.as_slice()) { + timestamps.insert(block.block_number, header.timestamp as i64); + } + } + } + } + + // Now execute blocks to get receipts (which contain logs) + info!("Executing blocks {}-{} to generate receipts/logs (this may be slow)", start, end); + let mut receipt_stream = match client_guard.execute_blocks(start, end, 100).await { + Ok(s) => s, + Err(e) => { + error!("Failed to start receipt stream: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + return; + } + }; + drop(client_guard); + + while let Some(batch_result) = receipt_stream.message().await.transpose() { + match batch_result { + Ok(receipt_batch) => { + match BlockDataConverter::receipts_to_logs_arrow(receipt_batch, ×tamps) { + Ok(record_batch) => { + debug!("Converted receipt batch to {} log rows", record_batch.num_rows()); + yield Ok(record_batch); + } + Err(e) => { + error!("Failed to convert receipt batch: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + break; + } + } + } + Err(e) => { + error!("Failed to receive receipt batch: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + break; + } + } + } } StreamType::Trie => { error!("Historical trie streaming not supported"); diff --git a/crates/phaser-bridge/src/bridge.rs b/crates/phaser-bridge/src/bridge.rs index f996ef0..7f9112b 100644 --- a/crates/phaser-bridge/src/bridge.rs +++ b/crates/phaser-bridge/src/bridge.rs @@ -8,7 +8,7 @@ use futures::Stream; use std::pin::Pin; use tonic::{Request, Response, Status, Streaming}; -use crate::descriptors::{BlockchainDescriptor, BridgeInfo}; +use crate::descriptors::BridgeInfo; /// Capabilities that a bridge can advertise #[derive(Debug, Clone)] diff --git a/crates/phaser-bridge/src/client.rs b/crates/phaser-bridge/src/client.rs index 783c293..856f230 100644 --- a/crates/phaser-bridge/src/client.rs +++ b/crates/phaser-bridge/src/client.rs @@ -10,7 +10,6 @@ use crate::descriptors::{BlockchainDescriptor, BridgeInfo}; /// Client for connecting to blockchain data bridges pub struct FlightBridgeClient { client: FlightClient, - endpoint: String, info: Option, } @@ -33,7 +32,7 @@ impl FlightBridgeClient { info!("Connecting via Unix domain socket: {}", path); // For Unix domain sockets, we need a special URI format - let uri = format!("http://[::]:50051"); // dummy URI for unix socket + let uri = "http://[::]:50051".to_string(); // dummy URI for unix socket // Use tonic's built-in Unix socket support use tonic::transport::Uri; @@ -74,11 +73,7 @@ impl FlightBridgeClient { let client = FlightClient::new(channel); - Ok(Self { - client, - endpoint, - info: None, - }) + Ok(Self { client, info: None }) } /// Get bridge information diff --git a/crates/phaser-bridge/src/descriptors.rs b/crates/phaser-bridge/src/descriptors.rs index c7dcbd8..4472c98 100644 --- a/crates/phaser-bridge/src/descriptors.rs +++ b/crates/phaser-bridge/src/descriptors.rs @@ -12,13 +12,13 @@ pub enum StreamType { Trie, // Raw trie nodes for state reconstruction } -impl ToString for StreamType { - fn to_string(&self) -> String { +impl std::fmt::Display for StreamType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - StreamType::Blocks => "blocks".to_string(), - StreamType::Transactions => "transactions".to_string(), - StreamType::Logs => "logs".to_string(), - StreamType::Trie => "trie".to_string(), + StreamType::Blocks => write!(f, "blocks"), + StreamType::Transactions => write!(f, "transactions"), + StreamType::Logs => write!(f, "logs"), + StreamType::Trie => write!(f, "trie"), } } } diff --git a/crates/phaser-query/Cargo.toml b/crates/phaser-query/Cargo.toml index 7e2e621..ed978dc 100644 --- a/crates/phaser-query/Cargo.toml +++ b/crates/phaser-query/Cargo.toml @@ -54,3 +54,6 @@ alloy-rlp.workspace = true [build-dependencies] tonic-build = "0.13" + +[dev-dependencies] +tempfile = "3.10" diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 29c267c..0eefb2f 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -116,11 +116,22 @@ async fn main() -> Result<()> { status.chain_id, status.bridge_name ); println!("Blocks: {}-{}", status.from_block, status.to_block); + + // Progress includes blocks + transactions + logs (3 phases per segment) + let percent = if status.total_blocks > 0 { + (status.blocks_synced as f64 / status.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - "Progress: {}/{} blocks", - status.blocks_synced, status.total_blocks + "Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)", + status.blocks_synced, status.total_blocks, percent ); - println!("Current block: {}", status.current_block); + + // Only show highest completed block if it's meaningful (> from_block) + if status.current_block > status.from_block { + println!("Highest completed: block {}", status.current_block); + } println!("Active workers: {}", status.active_workers); if !status.error.is_empty() { @@ -170,17 +181,20 @@ async fn main() -> Result<()> { println!("Status: {}", status_str); println!("Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!("Blocks: {}-{}", job.from_block, job.to_block); + + let percent = if job.total_blocks > 0 { + (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - "Progress: {}/{} blocks ({:.1}%)", - job.blocks_synced, - job.total_blocks, - if job.total_blocks > 0 { - (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 - } else { - 0.0 - } + "Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)", + job.blocks_synced, job.total_blocks, percent ); - println!("Current block: {}", job.current_block); + + if job.current_block > job.from_block { + println!("Highest completed: block {}", job.current_block); + } println!("Active workers: {}", job.active_workers); if !job.error.is_empty() { println!("Error: {}", job.error); @@ -231,9 +245,15 @@ async fn main() -> Result<()> { println!(" Status: {}", status_str); println!(" Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!(" Blocks: {}-{}", job.from_block, job.to_block); + + let percent = if job.total_blocks > 0 { + (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - " Progress: {}/{} blocks", - job.blocks_synced, job.total_blocks + " Progress: {}/{} blocks ({:.1}%)", + job.blocks_synced, job.total_blocks, percent ); println!(" Workers: {}", job.active_workers); if !job.error.is_empty() { diff --git a/crates/phaser-query/src/buffer_manager.rs b/crates/phaser-query/src/buffer_manager.rs index e047294..03b39e4 100644 --- a/crates/phaser-query/src/buffer_manager.rs +++ b/crates/phaser-query/src/buffer_manager.rs @@ -271,14 +271,14 @@ impl CfToParquetBuffer { use std::io::Cursor; let cursor = Cursor::new(bytes); - let reader = StreamReader::try_new(cursor, None)?; + let mut reader = StreamReader::try_new(cursor, None)?; // Read the first (and only) batch - for batch in reader { - return Ok(batch?); + if let Some(batch_result) = reader.next() { + Ok(batch_result?) + } else { + Err(anyhow!("No batch found in serialized data")) } - - Err(anyhow!("No batch found in serialized data")) } /// Force flush any remaining items (for shutdown) diff --git a/crates/phaser-query/src/erigon_client.rs b/crates/phaser-query/src/erigon_client.rs index 063aba8..3122fda 100644 --- a/crates/phaser-query/src/erigon_client.rs +++ b/crates/phaser-query/src/erigon_client.rs @@ -1,18 +1,13 @@ use anyhow::Result; use futures::StreamExt; use tonic::transport::Channel; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; -use crate::proto::types::{H160, H256}; -use crate::proto::{ - BlockReply, BlockRequest, EthbackendClient, Event, LogsFilterRequest, SubscribeLogsReply, - SubscribeReply, SubscribeRequest, -}; +use crate::proto::{BlockRequest, EthbackendClient, Event, SubscribeRequest}; /// Client for connecting to Erigon's gRPC interface pub struct ErigonClient { client: EthbackendClient, - endpoint: String, } impl ErigonClient { @@ -37,7 +32,7 @@ impl ErigonClient { info!("Successfully connected to Erigon"); - Ok(Self { client, endpoint }) + Ok(Self { client }) } /// Test the connection by fetching the client version diff --git a/crates/phaser-query/src/indexer.rs b/crates/phaser-query/src/indexer.rs index fe51d7e..a4b910b 100644 --- a/crates/phaser-query/src/indexer.rs +++ b/crates/phaser-query/src/indexer.rs @@ -1,5 +1,5 @@ use crate::catalog::RocksDbCatalog; -use crate::index::{BlockPointer, FileInfo, LogPointer, TransactionPointer}; +use crate::index::FileInfo; use crate::PhaserConfig; use anyhow::Result; use std::fs; @@ -117,7 +117,7 @@ async fn index_block_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<() } /// Index a transaction Parquet file -async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { +async fn index_transaction_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { debug!("Indexing transaction file: {:?}", path); // TODO: Similar to block indexing @@ -128,7 +128,7 @@ async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Res } /// Index a log Parquet file -async fn index_log_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { +async fn index_log_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { debug!("Indexing log file: {:?}", path); // TODO: Similar to block indexing diff --git a/crates/phaser-query/src/parquet_writer.rs b/crates/phaser-query/src/parquet_writer.rs index dc9adc2..89ddab2 100644 --- a/crates/phaser-query/src/parquet_writer.rs +++ b/crates/phaser-query/src/parquet_writer.rs @@ -69,7 +69,7 @@ impl ParquetWriter { .as_any() .downcast_ref::() { - if array.len() > 0 { + if !array.is_empty() { array.value(0) } else { return Ok(()); // Skip empty batch @@ -129,12 +129,13 @@ impl ParquetWriter { let segment_start = (block_num / self.segment_size) * self.segment_size; let segment_end = segment_start + self.segment_size - 1; - // Create temporary filename with .tmp extension - // Format: {topic}_{segment_start}-{segment_end}_from_{block}_to_{block}.parquet.tmp - // Final rename will update the actual range + // Create temporary filename showing the actual range we'll write + // Format: {topic}_from_{actual_start}_to_{segment_end}.parquet.tmp + // Live sync starts mid-segment and writes to segment boundary + // Historical sync starts at segment boundary let filename = format!( - "{}_{}-{}_from_{}_to_{}.parquet.tmp", - self.data_type, segment_start, segment_end, block_num, block_num + "{}_from_{}_to_{}.parquet.tmp", + self.data_type, block_num, segment_end ); let temp_path = self.data_dir.join(filename); @@ -288,7 +289,6 @@ fn parse_encoding(s: &str) -> Encoding { match s.to_lowercase().as_str() { "plain" => Encoding::PLAIN, "rle" => Encoding::RLE, - "bit_packed" => Encoding::BIT_PACKED, "delta_binary_packed" => Encoding::DELTA_BINARY_PACKED, "delta_length_byte_array" => Encoding::DELTA_LENGTH_BYTE_ARRAY, "delta_byte_array" => Encoding::DELTA_BYTE_ARRAY, diff --git a/crates/phaser-query/src/sql.rs b/crates/phaser-query/src/sql.rs index 000e20b..36888df 100644 --- a/crates/phaser-query/src/sql.rs +++ b/crates/phaser-query/src/sql.rs @@ -12,7 +12,7 @@ pub struct SqlServer { #[derive(Debug, Deserialize)] struct SqlQuery { - query: String, + _query: String, } #[derive(Debug, Serialize)] diff --git a/crates/phaser-query/src/sync/data_scanner.rs b/crates/phaser-query/src/sync/data_scanner.rs new file mode 100644 index 0000000..b5bfe7e --- /dev/null +++ b/crates/phaser-query/src/sync/data_scanner.rs @@ -0,0 +1,505 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::{Path, PathBuf}; +use tracing::{debug, info, warn}; + +/// Represents a block range that has been synced +#[derive(Debug, Clone)] +pub struct BlockRange { + pub start: u64, + pub end: u64, +} + +/// Scanner for detecting existing blockchain data +pub struct DataScanner { + data_dir: PathBuf, +} + +impl DataScanner { + pub fn new(data_dir: PathBuf) -> Self { + Self { data_dir } + } + + /// Scans parquet files to find existing block ranges + pub fn scan_existing_ranges(&self) -> Result> { + let mut ranges = Vec::new(); + + if !self.data_dir.exists() { + return Ok(ranges); + } + + for entry in fs::read_dir(&self.data_dir).context("Failed to read data directory")? { + let entry = entry?; + let path = entry.path(); + + if !path.is_file() { + continue; + } + + // Parse both .parquet and .parquet.tmp files + if let Some(range) = self.parse_filename(&path)? { + ranges.push(range); + } + } + + // Sort ranges by start block + ranges.sort_by_key(|r| r.start); + + debug!( + "Found {} existing block ranges in {:?}", + ranges.len(), + self.data_dir + ); + Ok(ranges) + } + + /// Parse filename to extract block range + fn parse_filename(&self, path: &Path) -> Result> { + let filename = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name, + None => return Ok(None), + }; + + // Skip non-parquet files + if !filename.contains(".parquet") { + return Ok(None); + } + + // Parse filenames like: + // Finalized: blocks_0-499999_from_0_to_499999.parquet + // Temp: blocks_from_0_to_499999.parquet.tmp + // Temp (live): blocks_from_485689_to_499999.parquet.tmp + + // Check if this contains the actual block range (both .tmp and finalized files) + if filename.contains("_from_") && filename.contains("_to_") { + if let Some(from_idx) = filename.find("_from_") { + if let Some(to_idx) = filename.find("_to_") { + let start_str = &filename[from_idx + 6..to_idx]; + let end_part = &filename[to_idx + 4..]; + // Remove .parquet or .parquet.tmp extension + let end_str = end_part + .trim_end_matches(".parquet.tmp") + .trim_end_matches(".parquet"); + + if let (Ok(start), Ok(end)) = (start_str.parse::(), end_str.parse::()) + { + // Validate range is sensible + if start <= end { + return Ok(Some(BlockRange { start, end })); + } else { + warn!( + "Invalid block range in filename {}: start {} > end {}", + filename, start, end + ); + } + } + } + } + } + + Ok(None) + } + + /// Find the first gap in block coverage or the start of live sync data + /// Returns the block number where historical sync can safely backfill up to + pub fn find_historical_boundary(&self, segment_size: u64) -> Result> { + let ranges = self.scan_existing_ranges()?; + + if ranges.is_empty() { + info!("No existing data found, historical sync can start from genesis"); + return Ok(None); + } + + // Check for the first gap or find where continuous data starts + let mut expected_start = 0u64; + + for range in &ranges { + if range.start > expected_start { + // Found a gap! Historical sync should backfill up to range.start - 1 + info!( + "Found gap in data: blocks {} to {} are missing. Historical sync can backfill up to {}", + expected_start, + range.start - 1, + range.start - 1 + ); + return Ok(Some(range.start - 1)); + } + + // Update expected start to after this range + expected_start = range.end + 1; + } + + // No gaps found, but we should find the last contiguous segment boundary + // The last range.end might be in the middle of a segment + // Round down to the previous segment boundary + if let Some(last_range) = ranges.last() { + let last_segment_boundary = (last_range.end / segment_size) * segment_size; + if last_segment_boundary > 0 { + info!( + "Found continuous data up to block {}. Historical sync can backfill up to segment boundary {}", + last_range.end, + last_segment_boundary - 1 + ); + return Ok(Some(last_segment_boundary - 1)); + } + } + + info!("No clear boundary found for historical sync"); + Ok(None) + } + + /// Get a summary of data coverage + pub fn get_coverage_summary(&self) -> Result { + let ranges = self.scan_existing_ranges()?; + + if ranges.is_empty() { + return Ok("No data found".to_string()); + } + + let mut summary = String::new(); + summary.push_str(&format!("Found {} block ranges:\n", ranges.len())); + + for (i, range) in ranges.iter().enumerate() { + let blocks = range.end - range.start + 1; + summary.push_str(&format!( + " {}. Blocks {} to {} ({} blocks)\n", + i + 1, + range.start, + range.end, + blocks + )); + } + + Ok(summary) + } + + /// Find and clean incomplete segments, returning which ones need to be synced + /// This method: + /// 1. Finds .tmp files (incomplete segments) + /// 2. Deletes them (they're partial/corrupted) + /// 3. Returns segment numbers that need to be synced + pub fn find_missing_segments( + &self, + from_block: u64, + to_block: u64, + segment_size: u64, + ) -> Result> { + // Calculate total segments in the requested range + let first_segment = from_block / segment_size; + let last_segment = to_block / segment_size; + + let mut missing_segments = Vec::new(); + + if !self.data_dir.exists() { + // No data directory means all segments are missing + for segment_num in first_segment..=last_segment { + missing_segments.push(segment_num); + } + return Ok(missing_segments); + } + + for segment_num in first_segment..=last_segment { + let segment_start = segment_num * segment_size; + let segment_end = segment_start + segment_size - 1; + + // Check for completed segment files (all three data types must exist) + let blocks_complete = + self.has_completed_segment("blocks", segment_start, segment_end)?; + let txs_complete = + self.has_completed_segment("transactions", segment_start, segment_end)?; + let logs_complete = self.has_completed_segment("logs", segment_start, segment_end)?; + + if !blocks_complete || !txs_complete || !logs_complete { + // Clean any temp files for this segment + self.clean_temp_files_for_segment(segment_start, segment_end)?; + missing_segments.push(segment_num); + } + } + + info!( + "Found {} missing segments out of {} total segments in range {}-{}", + missing_segments.len(), + (last_segment - first_segment + 1), + from_block, + to_block + ); + + Ok(missing_segments) + } + + /// Check if a completed parquet file exists for a specific segment + fn has_completed_segment( + &self, + data_type: &str, + segment_start: u64, + segment_end: u64, + ) -> Result { + for entry in fs::read_dir(&self.data_dir)? { + let entry = entry?; + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + // Look for completed files matching this segment + // Format: {data_type}_{segment_start}-{segment_end}_from_*_to_*.parquet + if filename_str.starts_with(&format!( + "{}_{}-{}_from_", + data_type, segment_start, segment_end + )) && filename_str.ends_with(".parquet") + && !filename_str.ends_with(".parquet.tmp") + { + return Ok(true); + } + } + + Ok(false) + } + + /// Clean temp files for a specific segment + /// Only removes temp files from failed HISTORICAL syncs (starting at segment boundary) + /// Preserves temp files from LIVE sync (starting mid-segment) + fn clean_temp_files_for_segment(&self, segment_start: u64, segment_end: u64) -> Result<()> { + use std::time::SystemTime; + + for entry in fs::read_dir(&self.data_dir)? { + let entry = entry?; + let path = entry.path(); + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + // Look for temp files for this segment: {type}_from_{X}_to_{segment_end}.parquet.tmp + if filename_str.ends_with(&format!("_to_{}.parquet.tmp", segment_end)) { + // Parse to get the start block + if let Some(range) = self.parse_filename(&path)? { + // Only clean if this starts at the segment boundary (failed historical sync) + // Skip if it starts mid-segment (active live sync) + if range.start == segment_start { + // Check if file is recently modified (within 5 seconds) + // to avoid race condition with active writes + if let Ok(metadata) = fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + if let Ok(elapsed) = SystemTime::now().duration_since(modified) { + if elapsed.as_secs() < 5 { + debug!( + "Skipping recently modified temp file ({}s old): {}", + elapsed.as_secs(), + path.display() + ); + continue; + } + } + } + } + + info!( + "Cleaning incomplete historical sync temp file: {}", + path.display() + ); + fs::remove_file(&path)?; + } else { + debug!( + "Skipping live sync temp file (starts at {}): {}", + range.start, + path.display() + ); + } + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use tempfile::TempDir; + + #[test] + fn test_parse_finalized_filename() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create a test file + let test_path = temp_dir + .path() + .join("blocks_0-499999_from_0_to_46200.parquet"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 0); + assert_eq!(range.end, 46200); + } + + #[test] + fn test_find_gap() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create files with a gap + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("blocks_500000-999999_from_500000_to_999999.parquet"), + ) + .unwrap(); + // Gap here! Missing 1000000-1499999 + File::create( + temp_dir + .path() + .join("blocks_1500000-1999999_from_1500000_to_1999999.parquet"), + ) + .unwrap(); + + let boundary = scanner.find_historical_boundary(500000).unwrap(); + assert_eq!(boundary, Some(1499999)); + } + + #[test] + fn test_parse_live_sync_temp_file() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Live sync temp file starts mid-segment + let test_path = temp_dir + .path() + .join("blocks_from_23485689_to_23499999.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 23485689); // Mid-segment + assert_eq!(range.end, 23499999); + } + + #[test] + fn test_parse_historical_temp_file() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Historical sync temp file starts at segment boundary + let test_path = temp_dir.path().join("blocks_from_0_to_499999.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 0); // Segment boundary + assert_eq!(range.end, 499999); + } + + #[test] + fn test_invalid_range_rejected() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Invalid: start > end + let test_path = temp_dir.path().join("blocks_from_1000_to_500.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + // Should return None due to validation + assert!(range.is_none()); + } + + #[test] + fn test_clean_preserves_live_sync() { + use std::io::Write; + use std::thread; + + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create historical temp file (at segment boundary) + let hist_path = temp_dir.path().join("blocks_from_0_to_499999.parquet.tmp"); + File::create(&hist_path).unwrap(); + + // Wait 6 seconds so historical file is considered "stale" + // (modification time check requires >5 seconds) + thread::sleep(std::time::Duration::from_secs(6)); + + // Create live sync temp file (mid-segment) - this will be recent + let live_path = temp_dir + .path() + .join("blocks_from_485689_to_499999.parquet.tmp"); + let mut live_file = File::create(&live_path).unwrap(); + live_file.write_all(b"live data").unwrap(); + + // Clean should remove historical but preserve live + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + + // Historical temp should be deleted (was stale) + assert!(!hist_path.exists()); + + // Live sync temp should still exist (mid-segment + recent) + assert!(live_path.exists()); + + // Segment 0 should be in missing (because historical temp was removed) + assert_eq!(missing, vec![0]); + } + + #[test] + fn test_completed_segment_detection() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create all three data types for segment 0 + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("transactions_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("logs_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + + // Segment 0 should not be in missing segments + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + assert!(missing.is_empty()); + } + + #[test] + fn test_incomplete_segment_detection() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create only blocks and transactions (missing logs) + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("transactions_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + // logs missing! + + // Segment 0 should be in missing segments (incomplete) + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + assert_eq!(missing, vec![0]); + } +} diff --git a/crates/phaser-query/src/sync/mod.rs b/crates/phaser-query/src/sync/mod.rs index 6b143d4..6622ff9 100644 --- a/crates/phaser-query/src/sync/mod.rs +++ b/crates/phaser-query/src/sync/mod.rs @@ -1,5 +1,7 @@ +mod data_scanner; mod service; mod worker; +pub use data_scanner::DataScanner; pub use service::SyncServer; pub use worker::SyncWorker; diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 8c4d9c5..939a0d0 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -1,6 +1,7 @@ use crate::proto::admin::sync_service_server::{SyncService, SyncServiceServer}; use crate::proto::admin::*; -use crate::sync::worker::SyncWorker; +use crate::sync::data_scanner::DataScanner; +use crate::sync::worker::{ProgressTracker, SyncWorker}; use crate::PhaserConfig; use anyhow::Result; use std::collections::HashMap; @@ -23,6 +24,7 @@ struct SyncJobState { blocks_synced: u64, active_workers: u32, error: Option, + progress_tracker: ProgressTracker, } /// Server implementation for the sync admin service @@ -64,6 +66,7 @@ impl SyncServer { bridge_endpoint: String, from_block: u64, to_block: u64, + progress_tracker: ProgressTracker, ) -> Result<()> { // Update status to RUNNING { @@ -79,38 +82,140 @@ impl SyncServer { job_id, config.sync_parallelism, from_block, to_block ); - // Calculate block range per worker - let total_blocks = to_block - from_block + 1; - let blocks_per_worker = - (total_blocks + config.sync_parallelism as u64 - 1) / config.sync_parallelism as u64; + let segment_size = config.segment_size; - // Get data directory for this bridge + // Find which segments need to be synced (supports resume) let data_dir = config.bridge_data_dir(chain_id, &bridge_name); + let scanner = DataScanner::new(data_dir.clone()); - // Spawn workers - let mut worker_handles = vec![]; - for worker_id in 0..config.sync_parallelism { - let worker_from = from_block + (worker_id as u64 * blocks_per_worker); - let worker_to = std::cmp::min(worker_from + blocks_per_worker - 1, to_block); + let missing_segments = scanner + .find_missing_segments(from_block, to_block, segment_size) + .map_err(|e| anyhow::anyhow!("Failed to find missing segments: {}", e))?; - // Skip if this worker has no blocks to process - if worker_from > to_block { - break; - } + let total_segments = missing_segments.len() as u64; - let mut worker = SyncWorker::new( - worker_id, - bridge_endpoint.clone(), - data_dir.clone(), - worker_from, - worker_to, - config.segment_size, - config.max_file_size_mb, - 1000, // batch_size - config.parquet.clone(), + if total_segments == 0 { + info!( + "All segments already synced for range {}-{}", + from_block, to_block ); + // Mark job as complete + let mut jobs_lock = jobs.write().await; + if let Some(job) = jobs_lock.get_mut(&job_id) { + job.status = SyncStatus::Completed as i32; + job.blocks_synced = to_block - from_block + 1; + job.current_block = to_block; + } + return Ok(()); + } + + info!( + "Found {} segments to sync ({} blocks per segment)", + total_segments, segment_size + ); + + // Convert missing segments to a shared queue + let segment_queue = Arc::new(tokio::sync::Mutex::new(missing_segments)); + + // Spawn worker tasks that pull segments from the queue + let mut worker_handles = vec![]; + let num_workers = std::cmp::min(config.sync_parallelism as u64, total_segments) as u32; + let max_file_size_mb = config.max_file_size_mb; + + // Track failed segments for potential retry + let failed_segments = Arc::new(tokio::sync::Mutex::new(Vec::new())); + + for worker_id in 0..num_workers { + let bridge_endpoint = bridge_endpoint.clone(); + let data_dir = data_dir.clone(); + let segment_queue = segment_queue.clone(); + let progress_tracker = progress_tracker.clone(); + let parquet_config = config.parquet.clone(); + let failed_segments = failed_segments.clone(); + + let handle = tokio::spawn(async move { + let mut worker_errors = 0u32; + + loop { + // Get next segment to process + let segment_num = { + let mut queue = segment_queue.lock().await; + if queue.is_empty() { + info!( + "Worker {} completed all assigned segments (errors: {})", + worker_id, worker_errors + ); + break; + } + queue.remove(0) + }; + + // Calculate block range for this segment + let segment_from = segment_num * segment_size; + let segment_to = segment_from + segment_size - 1; + + // Ensure we don't go past the requested to_block + let segment_to = std::cmp::min(segment_to, to_block); + + info!( + "Worker {} processing segment {} (blocks {}-{})", + worker_id, segment_num, segment_from, segment_to + ); + + // Create and run worker for this segment with timeout + let mut worker = SyncWorker::new( + worker_id, + bridge_endpoint.clone(), + data_dir.clone(), + segment_from, + segment_to, + segment_size, + max_file_size_mb, + 1000, // batch_size + parquet_config.clone(), + ) + .with_progress_tracker(progress_tracker.clone()); + + // Add 10 minute timeout per segment + let result = + tokio::time::timeout(std::time::Duration::from_secs(600), worker.run()) + .await; + + match result { + Ok(Ok(())) => { + info!("Worker {} completed segment {}", worker_id, segment_num); + } + Ok(Err(e)) => { + error!( + "Worker {} failed on segment {}: {}", + worker_id, segment_num, e + ); + worker_errors += 1; + failed_segments.lock().await.push(segment_num); + + // Continue to next segment instead of stopping worker + } + Err(_) => { + error!( + "Worker {} timeout on segment {} after 10 minutes", + worker_id, segment_num + ); + worker_errors += 1; + failed_segments.lock().await.push(segment_num); + } + } + } - let handle = tokio::spawn(async move { worker.run().await }); + if worker_errors > 0 { + Err(anyhow::anyhow!( + "Worker {} had {} errors", + worker_id, + worker_errors + )) + } else { + Ok(()) + } + }); worker_handles.push(handle); } @@ -122,21 +227,38 @@ impl SyncServer { for (idx, handle) in worker_handles.into_iter().enumerate() { match handle.await { Ok(Ok(())) => { - info!("Worker {} completed successfully", idx); + info!("Worker {} finished all segments", idx); } Ok(Err(e)) => { error!("Worker {} failed: {}", idx, e); has_error = true; - error_msg = format!("Worker {} failed: {}", idx, e); + if error_msg.is_empty() { + error_msg = format!("Worker {} failed: {}", idx, e); + } } Err(e) => { error!("Worker {} panicked: {}", idx, e); has_error = true; - error_msg = format!("Worker {} panicked: {}", idx, e); + if error_msg.is_empty() { + error_msg = format!("Worker {} panicked: {}", idx, e); + } } } } + // Check for failed segments + let failed = failed_segments.lock().await; + if !failed.is_empty() { + error!( + "Sync job {} had {} failed segments: {:?}", + job_id, + failed.len(), + failed + ); + has_error = true; + error_msg = format!("{} segments failed: {:?}", failed.len(), failed); + } + // Update final status { let mut jobs_lock = jobs.write().await; @@ -146,7 +268,7 @@ impl SyncServer { job.error = Some(error_msg); } else { job.status = SyncStatus::Completed as i32; - job.blocks_synced = total_blocks; + job.blocks_synced = to_block - from_block + 1; job.current_block = to_block; } job.active_workers = 0; @@ -182,6 +304,34 @@ impl SyncService for SyncServer { )); } + // Get data directory for this bridge to scan for existing data + let data_dir = self.config.bridge_data_dir(req.chain_id, &req.bridge_name); + let scanner = DataScanner::new(data_dir); + + // Find where live sync data starts (if any) + let historical_boundary = scanner + .find_historical_boundary(self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to scan existing data: {}", e)))?; + + // Determine final to_block + let to_block = if let Some(boundary) = historical_boundary { + // Live sync data detected, ensure we don't overlap + if req.to_block > boundary { + info!( + "Live sync detected at block {}. Adjusting to_block from {} to {}", + boundary + 1, + req.to_block, + boundary + ); + boundary + } else { + req.to_block + } + } else { + // No live sync data, use requested to_block + req.to_block + }; + // Check if bridge is configured let bridge = self .config @@ -196,18 +346,22 @@ impl SyncService for SyncServer { // Generate job ID let job_id = Uuid::new_v4().to_string(); + // Create progress tracker + let progress_tracker = Arc::new(RwLock::new(HashMap::new())); + // Create job state let job_state = SyncJobState { job_id: job_id.clone(), chain_id: req.chain_id, bridge_name: req.bridge_name.clone(), from_block: req.from_block, - to_block: req.to_block, + to_block, status: SyncStatus::Pending as i32, current_block: req.from_block, blocks_synced: 0, active_workers: 0, error: None, + progress_tracker: progress_tracker.clone(), }; // Store job state @@ -224,7 +378,6 @@ impl SyncService for SyncServer { let bridge_name = req.bridge_name.clone(); let chain_id = req.chain_id; let from_block = req.from_block; - let to_block = req.to_block; tokio::spawn(async move { if let Err(e) = Self::run_sync_job( @@ -236,6 +389,7 @@ impl SyncService for SyncServer { bridge_endpoint, from_block, to_block, + progress_tracker, ) .await { @@ -249,7 +403,7 @@ impl SyncService for SyncServer { job_id, message: format!( "Sync job created for blocks {}-{} on chain {} via bridge '{}'", - req.from_block, req.to_block, req.chain_id, req.bridge_name + req.from_block, to_block, req.chain_id, req.bridge_name ), accepted: true, })) @@ -266,12 +420,44 @@ impl SyncService for SyncServer { .get(&req.job_id) .ok_or_else(|| Status::not_found(format!("Job {} not found", req.job_id)))?; + // Aggregate worker progress + let progress = job.progress_tracker.read().await; + let total_blocks = job.to_block - job.from_block + 1; + + // Count completed items from all workers across all 3 data types + let mut blocks_synced = 0u64; + let mut max_completed_block = job.from_block; + + for worker in progress.values() { + let worker_blocks = worker.to_block - worker.from_block + 1; + + // Calculate completion for this worker + // Each worker processes 3 data types: blocks, transactions, logs + // Progress is sum of completed phases + let phase_progress = if worker.logs_completed { + worker_blocks // All 3 phases complete + } else if worker.transactions_completed { + (worker_blocks * 2) / 3 // 2 of 3 phases complete + } else if worker.blocks_completed { + worker_blocks / 3 // 1 of 3 phases complete + } else { + 0 + }; + + blocks_synced += phase_progress; + + // Track highest fully completed segment (all 3 data types done) + if worker.logs_completed && worker.to_block > max_completed_block { + max_completed_block = worker.to_block; + } + } + Ok(Response::new(SyncStatusResponse { job_id: job.job_id.clone(), status: job.status, - current_block: job.current_block, - total_blocks: job.to_block - job.from_block + 1, - blocks_synced: job.blocks_synced, + current_block: max_completed_block, + total_blocks, + blocks_synced, error: job.error.clone().unwrap_or_default(), active_workers: job.active_workers, chain_id: job.chain_id, diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index ac1b30e..8631720 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -4,9 +4,26 @@ use anyhow::{Context, Result}; use futures::StreamExt; use phaser_bridge::client::FlightBridgeClient; use phaser_bridge::descriptors::{BlockchainDescriptor, StreamType}; +use std::collections::HashMap; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; use tracing::{debug, info}; +/// Worker progress tracking +#[derive(Debug, Clone)] +pub struct WorkerProgress { + pub worker_id: u32, + pub from_block: u64, + pub to_block: u64, + pub current_phase: String, + pub blocks_completed: bool, + pub transactions_completed: bool, + pub logs_completed: bool, +} + +pub type ProgressTracker = Arc>>; + /// A worker that syncs a specific block range from erigon-bridge pub struct SyncWorker { worker_id: u32, @@ -16,8 +33,9 @@ pub struct SyncWorker { to_block: u64, segment_size: u64, max_file_size_mb: u64, - batch_size: u32, + _batch_size: u32, parquet_config: Option, + progress_tracker: Option, } impl SyncWorker { @@ -40,8 +58,38 @@ impl SyncWorker { to_block, segment_size, max_file_size_mb, - batch_size, + _batch_size: batch_size, parquet_config, + progress_tracker: None, + } + } + + pub fn with_progress_tracker(mut self, tracker: ProgressTracker) -> Self { + self.progress_tracker = Some(tracker); + self + } + + async fn update_progress( + &self, + phase: &str, + blocks_done: bool, + txs_done: bool, + logs_done: bool, + ) { + if let Some(tracker) = &self.progress_tracker { + let mut tracker_lock = tracker.write().await; + tracker_lock.insert( + self.worker_id, + WorkerProgress { + worker_id: self.worker_id, + from_block: self.from_block, + to_block: self.to_block, + current_phase: phase.to_string(), + blocks_completed: blocks_done, + transactions_completed: txs_done, + logs_completed: logs_done, + }, + ); } } @@ -51,6 +99,9 @@ impl SyncWorker { self.worker_id, self.from_block, self.to_block, self.bridge_endpoint ); + // Initialize progress + self.update_progress("blocks", false, false, false).await; + // Connect to bridge via Arrow Flight let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) .await @@ -60,9 +111,14 @@ impl SyncWorker { // Sync blocks, transactions, and logs self.sync_blocks(&mut client).await?; + self.update_progress("transactions", true, false, false) + .await; + self.sync_transactions(&mut client).await?; - // Note: logs require ExecuteBlocks which is not yet implemented via Flight - // self.sync_logs(&mut client).await?; + self.update_progress("logs", true, true, false).await; + + self.sync_logs(&mut client).await?; + self.update_progress("completed", true, true, true).await; info!("Worker {} completed sync successfully", self.worker_id); Ok(()) @@ -109,8 +165,6 @@ impl SyncWorker { .context("Failed to write block batch")?; blocks_processed += 1; - - // TODO: Report progress } writer.finalize_current_file()?; @@ -176,4 +230,56 @@ impl SyncWorker { ); Ok(()) } + + async fn sync_logs(&mut self, client: &mut FlightBridgeClient) -> Result<()> { + info!( + "Worker {} syncing logs {}-{}", + self.worker_id, self.from_block, self.to_block + ); + + let mut writer = ParquetWriter::with_config( + self.data_dir.clone(), + self.max_file_size_mb, + self.segment_size, + "logs".to_string(), + self.parquet_config.clone(), + )?; + + // Create historical query descriptor for logs (uses ExecuteBlocks) + let descriptor = + BlockchainDescriptor::historical(StreamType::Logs, self.from_block, self.to_block); + + // Subscribe to the log stream (returns Arrow RecordBatches) + let mut stream = client + .subscribe(&descriptor) + .await + .context("Failed to subscribe to log stream")?; + + let mut batches_processed = 0u64; + while let Some(batch_result) = stream.next().await { + let batch = batch_result.context("Failed to receive log batch")?; + + debug!( + "Worker {} received log batch with {} rows", + self.worker_id, + batch.num_rows() + ); + + // Write Arrow RecordBatch directly to parquet + writer + .write_batch(batch) + .await + .context("Failed to write log batch")?; + + batches_processed += 1; + } + + writer.finalize_current_file()?; + + info!( + "Worker {} completed log sync ({} batches)", + self.worker_id, batches_processed + ); + Ok(()) + } } diff --git a/crates/schemas/evm/common/src/log.rs b/crates/schemas/evm/common/src/log.rs index e6bb36f..69e1016 100644 --- a/crates/schemas/evm/common/src/log.rs +++ b/crates/schemas/evm/common/src/log.rs @@ -58,7 +58,7 @@ pub struct LogContext { impl From for LogRecord { fn from(ctx: LogContext) -> Self { // Extract topics (up to 4) - let topic0 = ctx.topics.get(0).cloned(); + let topic0 = ctx.topics.first().cloned(); let topic1 = ctx.topics.get(1).cloned(); let topic2 = ctx.topics.get(2).cloned(); let topic3 = ctx.topics.get(3).cloned(); diff --git a/crates/schemas/evm/common/src/rpc_conversions.rs b/crates/schemas/evm/common/src/rpc_conversions.rs index 67c6ec3..c58e694 100644 --- a/crates/schemas/evm/common/src/rpc_conversions.rs +++ b/crates/schemas/evm/common/src/rpc_conversions.rs @@ -5,7 +5,7 @@ use crate::error::{EvmCommonError, Result}; use crate::log::LogRecord; use crate::transaction::{TransactionContext, TransactionRecord}; use alloy_consensus::Header as ConsensusHeader; -use alloy_network::{AnyHeader, AnyRpcBlock, AnyTxEnvelope, BlockResponse, TransactionResponse}; +use alloy_network::{AnyHeader, AnyRpcBlock, AnyTxEnvelope, TransactionResponse}; use alloy_rpc_types_eth::{Header as RpcHeader, Log as RpcLog}; use arrow_array::RecordBatch; use typed_arrow::prelude::BuildRows; @@ -122,7 +122,7 @@ pub fn convert_rpc_logs( for log in logs { // Extract topics (up to 4) let topics = log.topics(); - let topic0 = topics.get(0).map(|t| (*t).into()); + let topic0 = topics.first().map(|t| (*t).into()); let topic1 = topics.get(1).map(|t| (*t).into()); let topic2 = topics.get(2).map(|t| (*t).into()); let topic3 = topics.get(3).map(|t| (*t).into());