From dcfe79b77fb903fa829cd08234d903381933dbfb Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Wed, 1 Oct 2025 17:56:39 -0700 Subject: [PATCH 1/4] Add historical log streaming support to erigon-bridge Implements historical log data streaming using Erigon's Logs RPC. Completes the bridge's ability to serve all three data types (blocks, transactions, logs) in both historical and live modes. --- crates/bridges/erigon-bridge/src/bridge.rs | 77 ++++++++++++++++++++-- 1 file changed, 70 insertions(+), 7 deletions(-) diff --git a/crates/bridges/erigon-bridge/src/bridge.rs b/crates/bridges/erigon-bridge/src/bridge.rs index d5087fc..8857c56 100644 --- a/crates/bridges/erigon-bridge/src/bridge.rs +++ b/crates/bridges/erigon-bridge/src/bridge.rs @@ -292,13 +292,76 @@ impl ErigonFlightBridge { } } StreamType::Logs => { - error!("Historical log streaming not yet implemented"); - yield Err(arrow_flight::error::FlightError::ExternalError( - Box::new(std::io::Error::new( - std::io::ErrorKind::Unsupported, - "Historical log streaming not yet implemented" - )) - )); + // For logs, we need to execute blocks to generate receipts + let mut client_guard = blockdata_client.lock().await; + + // First get blocks to extract timestamps + let mut timestamps = HashMap::new(); + let mut block_stream = match client_guard.stream_blocks(start, end, 100).await { + Ok(s) => s, + Err(e) => { + error!("Failed to start block stream for timestamps: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + return; + } + }; + + // Collect timestamps + while let Some(batch_result) = block_stream.message().await.transpose() { + if let Ok(block_batch) = batch_result { + use alloy_consensus::Header; + use alloy_rlp::Decodable; + for block in &block_batch.blocks { + // Decode RLP header to get timestamp + if let Ok(header) = Header::decode(&mut block.rlp_header.as_slice()) { + timestamps.insert(block.block_number, header.timestamp as i64); + } + } + } + } + + // Now execute blocks to get receipts (which contain logs) + info!("Executing blocks {}-{} to generate receipts/logs (this may be slow)", start, end); + let mut receipt_stream = match client_guard.execute_blocks(start, end, 100).await { + Ok(s) => s, + Err(e) => { + error!("Failed to start receipt stream: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + return; + } + }; + drop(client_guard); + + while let Some(batch_result) = receipt_stream.message().await.transpose() { + match batch_result { + Ok(receipt_batch) => { + match BlockDataConverter::receipts_to_logs_arrow(receipt_batch, ×tamps) { + Ok(record_batch) => { + debug!("Converted receipt batch to {} log rows", record_batch.num_rows()); + yield Ok(record_batch); + } + Err(e) => { + error!("Failed to convert receipt batch: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + break; + } + } + } + Err(e) => { + error!("Failed to receive receipt batch: {}", e); + yield Err(arrow_flight::error::FlightError::ExternalError( + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + )); + break; + } + } + } } StreamType::Trie => { error!("Historical trie streaming not supported"); From 83b0075b0cc0948672d68af9c6dde84c3015f0c0 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Wed, 1 Oct 2025 17:56:49 -0700 Subject: [PATCH 2/4] Implement contiguity-aware resumable sync with comprehensive tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds DataScanner module that: - Detects existing parquet files and finds gaps in block coverage - Identifies historical/live sync boundaries to prevent overlap - Enables resumable sync by cleaning failed historical temp files - Preserves active live sync temp files using modification time checks - Validates filename ranges to reject corrupted data Sync improvements: - Workers continue on failure, tracking failed segments for retry - 10-minute timeout per segment prevents hung workers - Temp filenames show actual block ranges being written - Three-phase sync per segment: blocks → transactions → logs Includes 8 comprehensive tests covering edge cases: - Live vs historical temp file detection - Gap finding and boundary calculation - Temp file cleanup with race condition protection - Complete vs incomplete segment detection --- crates/phaser-query/Cargo.toml | 3 + crates/phaser-query/src/parquet_writer.rs | 14 +- crates/phaser-query/src/sync/data_scanner.rs | 505 +++++++++++++++++++ crates/phaser-query/src/sync/mod.rs | 2 + crates/phaser-query/src/sync/service.rs | 258 ++++++++-- crates/phaser-query/src/sync/worker.rs | 118 ++++- 6 files changed, 851 insertions(+), 49 deletions(-) create mode 100644 crates/phaser-query/src/sync/data_scanner.rs diff --git a/crates/phaser-query/Cargo.toml b/crates/phaser-query/Cargo.toml index 7e2e621..ed978dc 100644 --- a/crates/phaser-query/Cargo.toml +++ b/crates/phaser-query/Cargo.toml @@ -54,3 +54,6 @@ alloy-rlp.workspace = true [build-dependencies] tonic-build = "0.13" + +[dev-dependencies] +tempfile = "3.10" diff --git a/crates/phaser-query/src/parquet_writer.rs b/crates/phaser-query/src/parquet_writer.rs index dc9adc2..89ddab2 100644 --- a/crates/phaser-query/src/parquet_writer.rs +++ b/crates/phaser-query/src/parquet_writer.rs @@ -69,7 +69,7 @@ impl ParquetWriter { .as_any() .downcast_ref::() { - if array.len() > 0 { + if !array.is_empty() { array.value(0) } else { return Ok(()); // Skip empty batch @@ -129,12 +129,13 @@ impl ParquetWriter { let segment_start = (block_num / self.segment_size) * self.segment_size; let segment_end = segment_start + self.segment_size - 1; - // Create temporary filename with .tmp extension - // Format: {topic}_{segment_start}-{segment_end}_from_{block}_to_{block}.parquet.tmp - // Final rename will update the actual range + // Create temporary filename showing the actual range we'll write + // Format: {topic}_from_{actual_start}_to_{segment_end}.parquet.tmp + // Live sync starts mid-segment and writes to segment boundary + // Historical sync starts at segment boundary let filename = format!( - "{}_{}-{}_from_{}_to_{}.parquet.tmp", - self.data_type, segment_start, segment_end, block_num, block_num + "{}_from_{}_to_{}.parquet.tmp", + self.data_type, block_num, segment_end ); let temp_path = self.data_dir.join(filename); @@ -288,7 +289,6 @@ fn parse_encoding(s: &str) -> Encoding { match s.to_lowercase().as_str() { "plain" => Encoding::PLAIN, "rle" => Encoding::RLE, - "bit_packed" => Encoding::BIT_PACKED, "delta_binary_packed" => Encoding::DELTA_BINARY_PACKED, "delta_length_byte_array" => Encoding::DELTA_LENGTH_BYTE_ARRAY, "delta_byte_array" => Encoding::DELTA_BYTE_ARRAY, diff --git a/crates/phaser-query/src/sync/data_scanner.rs b/crates/phaser-query/src/sync/data_scanner.rs new file mode 100644 index 0000000..b5bfe7e --- /dev/null +++ b/crates/phaser-query/src/sync/data_scanner.rs @@ -0,0 +1,505 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::{Path, PathBuf}; +use tracing::{debug, info, warn}; + +/// Represents a block range that has been synced +#[derive(Debug, Clone)] +pub struct BlockRange { + pub start: u64, + pub end: u64, +} + +/// Scanner for detecting existing blockchain data +pub struct DataScanner { + data_dir: PathBuf, +} + +impl DataScanner { + pub fn new(data_dir: PathBuf) -> Self { + Self { data_dir } + } + + /// Scans parquet files to find existing block ranges + pub fn scan_existing_ranges(&self) -> Result> { + let mut ranges = Vec::new(); + + if !self.data_dir.exists() { + return Ok(ranges); + } + + for entry in fs::read_dir(&self.data_dir).context("Failed to read data directory")? { + let entry = entry?; + let path = entry.path(); + + if !path.is_file() { + continue; + } + + // Parse both .parquet and .parquet.tmp files + if let Some(range) = self.parse_filename(&path)? { + ranges.push(range); + } + } + + // Sort ranges by start block + ranges.sort_by_key(|r| r.start); + + debug!( + "Found {} existing block ranges in {:?}", + ranges.len(), + self.data_dir + ); + Ok(ranges) + } + + /// Parse filename to extract block range + fn parse_filename(&self, path: &Path) -> Result> { + let filename = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name, + None => return Ok(None), + }; + + // Skip non-parquet files + if !filename.contains(".parquet") { + return Ok(None); + } + + // Parse filenames like: + // Finalized: blocks_0-499999_from_0_to_499999.parquet + // Temp: blocks_from_0_to_499999.parquet.tmp + // Temp (live): blocks_from_485689_to_499999.parquet.tmp + + // Check if this contains the actual block range (both .tmp and finalized files) + if filename.contains("_from_") && filename.contains("_to_") { + if let Some(from_idx) = filename.find("_from_") { + if let Some(to_idx) = filename.find("_to_") { + let start_str = &filename[from_idx + 6..to_idx]; + let end_part = &filename[to_idx + 4..]; + // Remove .parquet or .parquet.tmp extension + let end_str = end_part + .trim_end_matches(".parquet.tmp") + .trim_end_matches(".parquet"); + + if let (Ok(start), Ok(end)) = (start_str.parse::(), end_str.parse::()) + { + // Validate range is sensible + if start <= end { + return Ok(Some(BlockRange { start, end })); + } else { + warn!( + "Invalid block range in filename {}: start {} > end {}", + filename, start, end + ); + } + } + } + } + } + + Ok(None) + } + + /// Find the first gap in block coverage or the start of live sync data + /// Returns the block number where historical sync can safely backfill up to + pub fn find_historical_boundary(&self, segment_size: u64) -> Result> { + let ranges = self.scan_existing_ranges()?; + + if ranges.is_empty() { + info!("No existing data found, historical sync can start from genesis"); + return Ok(None); + } + + // Check for the first gap or find where continuous data starts + let mut expected_start = 0u64; + + for range in &ranges { + if range.start > expected_start { + // Found a gap! Historical sync should backfill up to range.start - 1 + info!( + "Found gap in data: blocks {} to {} are missing. Historical sync can backfill up to {}", + expected_start, + range.start - 1, + range.start - 1 + ); + return Ok(Some(range.start - 1)); + } + + // Update expected start to after this range + expected_start = range.end + 1; + } + + // No gaps found, but we should find the last contiguous segment boundary + // The last range.end might be in the middle of a segment + // Round down to the previous segment boundary + if let Some(last_range) = ranges.last() { + let last_segment_boundary = (last_range.end / segment_size) * segment_size; + if last_segment_boundary > 0 { + info!( + "Found continuous data up to block {}. Historical sync can backfill up to segment boundary {}", + last_range.end, + last_segment_boundary - 1 + ); + return Ok(Some(last_segment_boundary - 1)); + } + } + + info!("No clear boundary found for historical sync"); + Ok(None) + } + + /// Get a summary of data coverage + pub fn get_coverage_summary(&self) -> Result { + let ranges = self.scan_existing_ranges()?; + + if ranges.is_empty() { + return Ok("No data found".to_string()); + } + + let mut summary = String::new(); + summary.push_str(&format!("Found {} block ranges:\n", ranges.len())); + + for (i, range) in ranges.iter().enumerate() { + let blocks = range.end - range.start + 1; + summary.push_str(&format!( + " {}. Blocks {} to {} ({} blocks)\n", + i + 1, + range.start, + range.end, + blocks + )); + } + + Ok(summary) + } + + /// Find and clean incomplete segments, returning which ones need to be synced + /// This method: + /// 1. Finds .tmp files (incomplete segments) + /// 2. Deletes them (they're partial/corrupted) + /// 3. Returns segment numbers that need to be synced + pub fn find_missing_segments( + &self, + from_block: u64, + to_block: u64, + segment_size: u64, + ) -> Result> { + // Calculate total segments in the requested range + let first_segment = from_block / segment_size; + let last_segment = to_block / segment_size; + + let mut missing_segments = Vec::new(); + + if !self.data_dir.exists() { + // No data directory means all segments are missing + for segment_num in first_segment..=last_segment { + missing_segments.push(segment_num); + } + return Ok(missing_segments); + } + + for segment_num in first_segment..=last_segment { + let segment_start = segment_num * segment_size; + let segment_end = segment_start + segment_size - 1; + + // Check for completed segment files (all three data types must exist) + let blocks_complete = + self.has_completed_segment("blocks", segment_start, segment_end)?; + let txs_complete = + self.has_completed_segment("transactions", segment_start, segment_end)?; + let logs_complete = self.has_completed_segment("logs", segment_start, segment_end)?; + + if !blocks_complete || !txs_complete || !logs_complete { + // Clean any temp files for this segment + self.clean_temp_files_for_segment(segment_start, segment_end)?; + missing_segments.push(segment_num); + } + } + + info!( + "Found {} missing segments out of {} total segments in range {}-{}", + missing_segments.len(), + (last_segment - first_segment + 1), + from_block, + to_block + ); + + Ok(missing_segments) + } + + /// Check if a completed parquet file exists for a specific segment + fn has_completed_segment( + &self, + data_type: &str, + segment_start: u64, + segment_end: u64, + ) -> Result { + for entry in fs::read_dir(&self.data_dir)? { + let entry = entry?; + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + // Look for completed files matching this segment + // Format: {data_type}_{segment_start}-{segment_end}_from_*_to_*.parquet + if filename_str.starts_with(&format!( + "{}_{}-{}_from_", + data_type, segment_start, segment_end + )) && filename_str.ends_with(".parquet") + && !filename_str.ends_with(".parquet.tmp") + { + return Ok(true); + } + } + + Ok(false) + } + + /// Clean temp files for a specific segment + /// Only removes temp files from failed HISTORICAL syncs (starting at segment boundary) + /// Preserves temp files from LIVE sync (starting mid-segment) + fn clean_temp_files_for_segment(&self, segment_start: u64, segment_end: u64) -> Result<()> { + use std::time::SystemTime; + + for entry in fs::read_dir(&self.data_dir)? { + let entry = entry?; + let path = entry.path(); + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + // Look for temp files for this segment: {type}_from_{X}_to_{segment_end}.parquet.tmp + if filename_str.ends_with(&format!("_to_{}.parquet.tmp", segment_end)) { + // Parse to get the start block + if let Some(range) = self.parse_filename(&path)? { + // Only clean if this starts at the segment boundary (failed historical sync) + // Skip if it starts mid-segment (active live sync) + if range.start == segment_start { + // Check if file is recently modified (within 5 seconds) + // to avoid race condition with active writes + if let Ok(metadata) = fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + if let Ok(elapsed) = SystemTime::now().duration_since(modified) { + if elapsed.as_secs() < 5 { + debug!( + "Skipping recently modified temp file ({}s old): {}", + elapsed.as_secs(), + path.display() + ); + continue; + } + } + } + } + + info!( + "Cleaning incomplete historical sync temp file: {}", + path.display() + ); + fs::remove_file(&path)?; + } else { + debug!( + "Skipping live sync temp file (starts at {}): {}", + range.start, + path.display() + ); + } + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use tempfile::TempDir; + + #[test] + fn test_parse_finalized_filename() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create a test file + let test_path = temp_dir + .path() + .join("blocks_0-499999_from_0_to_46200.parquet"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 0); + assert_eq!(range.end, 46200); + } + + #[test] + fn test_find_gap() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create files with a gap + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("blocks_500000-999999_from_500000_to_999999.parquet"), + ) + .unwrap(); + // Gap here! Missing 1000000-1499999 + File::create( + temp_dir + .path() + .join("blocks_1500000-1999999_from_1500000_to_1999999.parquet"), + ) + .unwrap(); + + let boundary = scanner.find_historical_boundary(500000).unwrap(); + assert_eq!(boundary, Some(1499999)); + } + + #[test] + fn test_parse_live_sync_temp_file() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Live sync temp file starts mid-segment + let test_path = temp_dir + .path() + .join("blocks_from_23485689_to_23499999.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 23485689); // Mid-segment + assert_eq!(range.end, 23499999); + } + + #[test] + fn test_parse_historical_temp_file() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Historical sync temp file starts at segment boundary + let test_path = temp_dir.path().join("blocks_from_0_to_499999.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + assert!(range.is_some()); + let range = range.unwrap(); + assert_eq!(range.start, 0); // Segment boundary + assert_eq!(range.end, 499999); + } + + #[test] + fn test_invalid_range_rejected() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Invalid: start > end + let test_path = temp_dir.path().join("blocks_from_1000_to_500.parquet.tmp"); + File::create(&test_path).unwrap(); + + let range = scanner.parse_filename(&test_path).unwrap(); + // Should return None due to validation + assert!(range.is_none()); + } + + #[test] + fn test_clean_preserves_live_sync() { + use std::io::Write; + use std::thread; + + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create historical temp file (at segment boundary) + let hist_path = temp_dir.path().join("blocks_from_0_to_499999.parquet.tmp"); + File::create(&hist_path).unwrap(); + + // Wait 6 seconds so historical file is considered "stale" + // (modification time check requires >5 seconds) + thread::sleep(std::time::Duration::from_secs(6)); + + // Create live sync temp file (mid-segment) - this will be recent + let live_path = temp_dir + .path() + .join("blocks_from_485689_to_499999.parquet.tmp"); + let mut live_file = File::create(&live_path).unwrap(); + live_file.write_all(b"live data").unwrap(); + + // Clean should remove historical but preserve live + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + + // Historical temp should be deleted (was stale) + assert!(!hist_path.exists()); + + // Live sync temp should still exist (mid-segment + recent) + assert!(live_path.exists()); + + // Segment 0 should be in missing (because historical temp was removed) + assert_eq!(missing, vec![0]); + } + + #[test] + fn test_completed_segment_detection() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create all three data types for segment 0 + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("transactions_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("logs_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + + // Segment 0 should not be in missing segments + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + assert!(missing.is_empty()); + } + + #[test] + fn test_incomplete_segment_detection() { + let temp_dir = TempDir::new().unwrap(); + let scanner = DataScanner::new(temp_dir.path().to_path_buf()); + + // Create only blocks and transactions (missing logs) + File::create( + temp_dir + .path() + .join("blocks_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + File::create( + temp_dir + .path() + .join("transactions_0-499999_from_0_to_499999.parquet"), + ) + .unwrap(); + // logs missing! + + // Segment 0 should be in missing segments (incomplete) + let missing = scanner.find_missing_segments(0, 499999, 500000).unwrap(); + assert_eq!(missing, vec![0]); + } +} diff --git a/crates/phaser-query/src/sync/mod.rs b/crates/phaser-query/src/sync/mod.rs index 6b143d4..6622ff9 100644 --- a/crates/phaser-query/src/sync/mod.rs +++ b/crates/phaser-query/src/sync/mod.rs @@ -1,5 +1,7 @@ +mod data_scanner; mod service; mod worker; +pub use data_scanner::DataScanner; pub use service::SyncServer; pub use worker::SyncWorker; diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 8c4d9c5..939a0d0 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -1,6 +1,7 @@ use crate::proto::admin::sync_service_server::{SyncService, SyncServiceServer}; use crate::proto::admin::*; -use crate::sync::worker::SyncWorker; +use crate::sync::data_scanner::DataScanner; +use crate::sync::worker::{ProgressTracker, SyncWorker}; use crate::PhaserConfig; use anyhow::Result; use std::collections::HashMap; @@ -23,6 +24,7 @@ struct SyncJobState { blocks_synced: u64, active_workers: u32, error: Option, + progress_tracker: ProgressTracker, } /// Server implementation for the sync admin service @@ -64,6 +66,7 @@ impl SyncServer { bridge_endpoint: String, from_block: u64, to_block: u64, + progress_tracker: ProgressTracker, ) -> Result<()> { // Update status to RUNNING { @@ -79,38 +82,140 @@ impl SyncServer { job_id, config.sync_parallelism, from_block, to_block ); - // Calculate block range per worker - let total_blocks = to_block - from_block + 1; - let blocks_per_worker = - (total_blocks + config.sync_parallelism as u64 - 1) / config.sync_parallelism as u64; + let segment_size = config.segment_size; - // Get data directory for this bridge + // Find which segments need to be synced (supports resume) let data_dir = config.bridge_data_dir(chain_id, &bridge_name); + let scanner = DataScanner::new(data_dir.clone()); - // Spawn workers - let mut worker_handles = vec![]; - for worker_id in 0..config.sync_parallelism { - let worker_from = from_block + (worker_id as u64 * blocks_per_worker); - let worker_to = std::cmp::min(worker_from + blocks_per_worker - 1, to_block); + let missing_segments = scanner + .find_missing_segments(from_block, to_block, segment_size) + .map_err(|e| anyhow::anyhow!("Failed to find missing segments: {}", e))?; - // Skip if this worker has no blocks to process - if worker_from > to_block { - break; - } + let total_segments = missing_segments.len() as u64; - let mut worker = SyncWorker::new( - worker_id, - bridge_endpoint.clone(), - data_dir.clone(), - worker_from, - worker_to, - config.segment_size, - config.max_file_size_mb, - 1000, // batch_size - config.parquet.clone(), + if total_segments == 0 { + info!( + "All segments already synced for range {}-{}", + from_block, to_block ); + // Mark job as complete + let mut jobs_lock = jobs.write().await; + if let Some(job) = jobs_lock.get_mut(&job_id) { + job.status = SyncStatus::Completed as i32; + job.blocks_synced = to_block - from_block + 1; + job.current_block = to_block; + } + return Ok(()); + } + + info!( + "Found {} segments to sync ({} blocks per segment)", + total_segments, segment_size + ); + + // Convert missing segments to a shared queue + let segment_queue = Arc::new(tokio::sync::Mutex::new(missing_segments)); + + // Spawn worker tasks that pull segments from the queue + let mut worker_handles = vec![]; + let num_workers = std::cmp::min(config.sync_parallelism as u64, total_segments) as u32; + let max_file_size_mb = config.max_file_size_mb; + + // Track failed segments for potential retry + let failed_segments = Arc::new(tokio::sync::Mutex::new(Vec::new())); + + for worker_id in 0..num_workers { + let bridge_endpoint = bridge_endpoint.clone(); + let data_dir = data_dir.clone(); + let segment_queue = segment_queue.clone(); + let progress_tracker = progress_tracker.clone(); + let parquet_config = config.parquet.clone(); + let failed_segments = failed_segments.clone(); + + let handle = tokio::spawn(async move { + let mut worker_errors = 0u32; + + loop { + // Get next segment to process + let segment_num = { + let mut queue = segment_queue.lock().await; + if queue.is_empty() { + info!( + "Worker {} completed all assigned segments (errors: {})", + worker_id, worker_errors + ); + break; + } + queue.remove(0) + }; + + // Calculate block range for this segment + let segment_from = segment_num * segment_size; + let segment_to = segment_from + segment_size - 1; + + // Ensure we don't go past the requested to_block + let segment_to = std::cmp::min(segment_to, to_block); + + info!( + "Worker {} processing segment {} (blocks {}-{})", + worker_id, segment_num, segment_from, segment_to + ); + + // Create and run worker for this segment with timeout + let mut worker = SyncWorker::new( + worker_id, + bridge_endpoint.clone(), + data_dir.clone(), + segment_from, + segment_to, + segment_size, + max_file_size_mb, + 1000, // batch_size + parquet_config.clone(), + ) + .with_progress_tracker(progress_tracker.clone()); + + // Add 10 minute timeout per segment + let result = + tokio::time::timeout(std::time::Duration::from_secs(600), worker.run()) + .await; + + match result { + Ok(Ok(())) => { + info!("Worker {} completed segment {}", worker_id, segment_num); + } + Ok(Err(e)) => { + error!( + "Worker {} failed on segment {}: {}", + worker_id, segment_num, e + ); + worker_errors += 1; + failed_segments.lock().await.push(segment_num); + + // Continue to next segment instead of stopping worker + } + Err(_) => { + error!( + "Worker {} timeout on segment {} after 10 minutes", + worker_id, segment_num + ); + worker_errors += 1; + failed_segments.lock().await.push(segment_num); + } + } + } - let handle = tokio::spawn(async move { worker.run().await }); + if worker_errors > 0 { + Err(anyhow::anyhow!( + "Worker {} had {} errors", + worker_id, + worker_errors + )) + } else { + Ok(()) + } + }); worker_handles.push(handle); } @@ -122,21 +227,38 @@ impl SyncServer { for (idx, handle) in worker_handles.into_iter().enumerate() { match handle.await { Ok(Ok(())) => { - info!("Worker {} completed successfully", idx); + info!("Worker {} finished all segments", idx); } Ok(Err(e)) => { error!("Worker {} failed: {}", idx, e); has_error = true; - error_msg = format!("Worker {} failed: {}", idx, e); + if error_msg.is_empty() { + error_msg = format!("Worker {} failed: {}", idx, e); + } } Err(e) => { error!("Worker {} panicked: {}", idx, e); has_error = true; - error_msg = format!("Worker {} panicked: {}", idx, e); + if error_msg.is_empty() { + error_msg = format!("Worker {} panicked: {}", idx, e); + } } } } + // Check for failed segments + let failed = failed_segments.lock().await; + if !failed.is_empty() { + error!( + "Sync job {} had {} failed segments: {:?}", + job_id, + failed.len(), + failed + ); + has_error = true; + error_msg = format!("{} segments failed: {:?}", failed.len(), failed); + } + // Update final status { let mut jobs_lock = jobs.write().await; @@ -146,7 +268,7 @@ impl SyncServer { job.error = Some(error_msg); } else { job.status = SyncStatus::Completed as i32; - job.blocks_synced = total_blocks; + job.blocks_synced = to_block - from_block + 1; job.current_block = to_block; } job.active_workers = 0; @@ -182,6 +304,34 @@ impl SyncService for SyncServer { )); } + // Get data directory for this bridge to scan for existing data + let data_dir = self.config.bridge_data_dir(req.chain_id, &req.bridge_name); + let scanner = DataScanner::new(data_dir); + + // Find where live sync data starts (if any) + let historical_boundary = scanner + .find_historical_boundary(self.config.segment_size) + .map_err(|e| Status::internal(format!("Failed to scan existing data: {}", e)))?; + + // Determine final to_block + let to_block = if let Some(boundary) = historical_boundary { + // Live sync data detected, ensure we don't overlap + if req.to_block > boundary { + info!( + "Live sync detected at block {}. Adjusting to_block from {} to {}", + boundary + 1, + req.to_block, + boundary + ); + boundary + } else { + req.to_block + } + } else { + // No live sync data, use requested to_block + req.to_block + }; + // Check if bridge is configured let bridge = self .config @@ -196,18 +346,22 @@ impl SyncService for SyncServer { // Generate job ID let job_id = Uuid::new_v4().to_string(); + // Create progress tracker + let progress_tracker = Arc::new(RwLock::new(HashMap::new())); + // Create job state let job_state = SyncJobState { job_id: job_id.clone(), chain_id: req.chain_id, bridge_name: req.bridge_name.clone(), from_block: req.from_block, - to_block: req.to_block, + to_block, status: SyncStatus::Pending as i32, current_block: req.from_block, blocks_synced: 0, active_workers: 0, error: None, + progress_tracker: progress_tracker.clone(), }; // Store job state @@ -224,7 +378,6 @@ impl SyncService for SyncServer { let bridge_name = req.bridge_name.clone(); let chain_id = req.chain_id; let from_block = req.from_block; - let to_block = req.to_block; tokio::spawn(async move { if let Err(e) = Self::run_sync_job( @@ -236,6 +389,7 @@ impl SyncService for SyncServer { bridge_endpoint, from_block, to_block, + progress_tracker, ) .await { @@ -249,7 +403,7 @@ impl SyncService for SyncServer { job_id, message: format!( "Sync job created for blocks {}-{} on chain {} via bridge '{}'", - req.from_block, req.to_block, req.chain_id, req.bridge_name + req.from_block, to_block, req.chain_id, req.bridge_name ), accepted: true, })) @@ -266,12 +420,44 @@ impl SyncService for SyncServer { .get(&req.job_id) .ok_or_else(|| Status::not_found(format!("Job {} not found", req.job_id)))?; + // Aggregate worker progress + let progress = job.progress_tracker.read().await; + let total_blocks = job.to_block - job.from_block + 1; + + // Count completed items from all workers across all 3 data types + let mut blocks_synced = 0u64; + let mut max_completed_block = job.from_block; + + for worker in progress.values() { + let worker_blocks = worker.to_block - worker.from_block + 1; + + // Calculate completion for this worker + // Each worker processes 3 data types: blocks, transactions, logs + // Progress is sum of completed phases + let phase_progress = if worker.logs_completed { + worker_blocks // All 3 phases complete + } else if worker.transactions_completed { + (worker_blocks * 2) / 3 // 2 of 3 phases complete + } else if worker.blocks_completed { + worker_blocks / 3 // 1 of 3 phases complete + } else { + 0 + }; + + blocks_synced += phase_progress; + + // Track highest fully completed segment (all 3 data types done) + if worker.logs_completed && worker.to_block > max_completed_block { + max_completed_block = worker.to_block; + } + } + Ok(Response::new(SyncStatusResponse { job_id: job.job_id.clone(), status: job.status, - current_block: job.current_block, - total_blocks: job.to_block - job.from_block + 1, - blocks_synced: job.blocks_synced, + current_block: max_completed_block, + total_blocks, + blocks_synced, error: job.error.clone().unwrap_or_default(), active_workers: job.active_workers, chain_id: job.chain_id, diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index ac1b30e..8631720 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -4,9 +4,26 @@ use anyhow::{Context, Result}; use futures::StreamExt; use phaser_bridge::client::FlightBridgeClient; use phaser_bridge::descriptors::{BlockchainDescriptor, StreamType}; +use std::collections::HashMap; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; use tracing::{debug, info}; +/// Worker progress tracking +#[derive(Debug, Clone)] +pub struct WorkerProgress { + pub worker_id: u32, + pub from_block: u64, + pub to_block: u64, + pub current_phase: String, + pub blocks_completed: bool, + pub transactions_completed: bool, + pub logs_completed: bool, +} + +pub type ProgressTracker = Arc>>; + /// A worker that syncs a specific block range from erigon-bridge pub struct SyncWorker { worker_id: u32, @@ -16,8 +33,9 @@ pub struct SyncWorker { to_block: u64, segment_size: u64, max_file_size_mb: u64, - batch_size: u32, + _batch_size: u32, parquet_config: Option, + progress_tracker: Option, } impl SyncWorker { @@ -40,8 +58,38 @@ impl SyncWorker { to_block, segment_size, max_file_size_mb, - batch_size, + _batch_size: batch_size, parquet_config, + progress_tracker: None, + } + } + + pub fn with_progress_tracker(mut self, tracker: ProgressTracker) -> Self { + self.progress_tracker = Some(tracker); + self + } + + async fn update_progress( + &self, + phase: &str, + blocks_done: bool, + txs_done: bool, + logs_done: bool, + ) { + if let Some(tracker) = &self.progress_tracker { + let mut tracker_lock = tracker.write().await; + tracker_lock.insert( + self.worker_id, + WorkerProgress { + worker_id: self.worker_id, + from_block: self.from_block, + to_block: self.to_block, + current_phase: phase.to_string(), + blocks_completed: blocks_done, + transactions_completed: txs_done, + logs_completed: logs_done, + }, + ); } } @@ -51,6 +99,9 @@ impl SyncWorker { self.worker_id, self.from_block, self.to_block, self.bridge_endpoint ); + // Initialize progress + self.update_progress("blocks", false, false, false).await; + // Connect to bridge via Arrow Flight let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) .await @@ -60,9 +111,14 @@ impl SyncWorker { // Sync blocks, transactions, and logs self.sync_blocks(&mut client).await?; + self.update_progress("transactions", true, false, false) + .await; + self.sync_transactions(&mut client).await?; - // Note: logs require ExecuteBlocks which is not yet implemented via Flight - // self.sync_logs(&mut client).await?; + self.update_progress("logs", true, true, false).await; + + self.sync_logs(&mut client).await?; + self.update_progress("completed", true, true, true).await; info!("Worker {} completed sync successfully", self.worker_id); Ok(()) @@ -109,8 +165,6 @@ impl SyncWorker { .context("Failed to write block batch")?; blocks_processed += 1; - - // TODO: Report progress } writer.finalize_current_file()?; @@ -176,4 +230,56 @@ impl SyncWorker { ); Ok(()) } + + async fn sync_logs(&mut self, client: &mut FlightBridgeClient) -> Result<()> { + info!( + "Worker {} syncing logs {}-{}", + self.worker_id, self.from_block, self.to_block + ); + + let mut writer = ParquetWriter::with_config( + self.data_dir.clone(), + self.max_file_size_mb, + self.segment_size, + "logs".to_string(), + self.parquet_config.clone(), + )?; + + // Create historical query descriptor for logs (uses ExecuteBlocks) + let descriptor = + BlockchainDescriptor::historical(StreamType::Logs, self.from_block, self.to_block); + + // Subscribe to the log stream (returns Arrow RecordBatches) + let mut stream = client + .subscribe(&descriptor) + .await + .context("Failed to subscribe to log stream")?; + + let mut batches_processed = 0u64; + while let Some(batch_result) = stream.next().await { + let batch = batch_result.context("Failed to receive log batch")?; + + debug!( + "Worker {} received log batch with {} rows", + self.worker_id, + batch.num_rows() + ); + + // Write Arrow RecordBatch directly to parquet + writer + .write_batch(batch) + .await + .context("Failed to write log batch")?; + + batches_processed += 1; + } + + writer.finalize_current_file()?; + + info!( + "Worker {} completed log sync ({} batches)", + self.worker_id, batches_processed + ); + Ok(()) + } } From bb6b61cbf327a1bfd5fe0b89780c7372a2972c86 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Wed, 1 Oct 2025 17:56:49 -0700 Subject: [PATCH 3/4] Fix progress tracking to account for all three data types Progress now correctly represents work across blocks, transactions, and logs (3 phases per segment). Updated CLI display to clarify that progress includes all data types. --- crates/phaser-query/src/bin/phaser-cli.rs | 48 ++++++++++++++++------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 29c267c..0eefb2f 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -116,11 +116,22 @@ async fn main() -> Result<()> { status.chain_id, status.bridge_name ); println!("Blocks: {}-{}", status.from_block, status.to_block); + + // Progress includes blocks + transactions + logs (3 phases per segment) + let percent = if status.total_blocks > 0 { + (status.blocks_synced as f64 / status.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - "Progress: {}/{} blocks", - status.blocks_synced, status.total_blocks + "Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)", + status.blocks_synced, status.total_blocks, percent ); - println!("Current block: {}", status.current_block); + + // Only show highest completed block if it's meaningful (> from_block) + if status.current_block > status.from_block { + println!("Highest completed: block {}", status.current_block); + } println!("Active workers: {}", status.active_workers); if !status.error.is_empty() { @@ -170,17 +181,20 @@ async fn main() -> Result<()> { println!("Status: {}", status_str); println!("Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!("Blocks: {}-{}", job.from_block, job.to_block); + + let percent = if job.total_blocks > 0 { + (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - "Progress: {}/{} blocks ({:.1}%)", - job.blocks_synced, - job.total_blocks, - if job.total_blocks > 0 { - (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 - } else { - 0.0 - } + "Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)", + job.blocks_synced, job.total_blocks, percent ); - println!("Current block: {}", job.current_block); + + if job.current_block > job.from_block { + println!("Highest completed: block {}", job.current_block); + } println!("Active workers: {}", job.active_workers); if !job.error.is_empty() { println!("Error: {}", job.error); @@ -231,9 +245,15 @@ async fn main() -> Result<()> { println!(" Status: {}", status_str); println!(" Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!(" Blocks: {}-{}", job.from_block, job.to_block); + + let percent = if job.total_blocks > 0 { + (job.blocks_synced as f64 / job.total_blocks as f64) * 100.0 + } else { + 0.0 + }; println!( - " Progress: {}/{} blocks", - job.blocks_synced, job.total_blocks + " Progress: {}/{} blocks ({:.1}%)", + job.blocks_synced, job.total_blocks, percent ); println!(" Workers: {}", job.active_workers); if !job.error.is_empty() { From 42ce67b0c1af7d401cc3a769f8ef3af37a903e16 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Wed, 1 Oct 2025 17:56:49 -0700 Subject: [PATCH 4/4] Fix clippy warnings and remove unused code - Remove deprecated BIT_PACKED parquet encoding - Change ToString to Display implementation for StreamType - Remove unused struct fields (endpoint, query, batch_size) - Replace .get(0) with .first() for clarity - Prefix intentionally unused parameters with underscore - Fix useless format!() calls --- .gitignore | 1 + Cargo.lock | 1 + crates/phaser-bridge/src/bridge.rs | 2 +- crates/phaser-bridge/src/client.rs | 9 ++------- crates/phaser-bridge/src/descriptors.rs | 12 ++++++------ crates/phaser-query/src/buffer_manager.rs | 10 +++++----- crates/phaser-query/src/erigon_client.rs | 11 +++-------- crates/phaser-query/src/indexer.rs | 6 +++--- crates/phaser-query/src/sql.rs | 2 +- crates/schemas/evm/common/src/log.rs | 2 +- crates/schemas/evm/common/src/rpc_conversions.rs | 4 ++-- 11 files changed, 26 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index 34100eb..841301e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ test-data/streaming test-data/rocksdb erigon-bridge.log commit-changes.sh +*config.yaml diff --git a/Cargo.lock b/Cargo.lock index 26d2b09..ea8a980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3785,6 +3785,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-stream", diff --git a/crates/phaser-bridge/src/bridge.rs b/crates/phaser-bridge/src/bridge.rs index f996ef0..7f9112b 100644 --- a/crates/phaser-bridge/src/bridge.rs +++ b/crates/phaser-bridge/src/bridge.rs @@ -8,7 +8,7 @@ use futures::Stream; use std::pin::Pin; use tonic::{Request, Response, Status, Streaming}; -use crate::descriptors::{BlockchainDescriptor, BridgeInfo}; +use crate::descriptors::BridgeInfo; /// Capabilities that a bridge can advertise #[derive(Debug, Clone)] diff --git a/crates/phaser-bridge/src/client.rs b/crates/phaser-bridge/src/client.rs index 783c293..856f230 100644 --- a/crates/phaser-bridge/src/client.rs +++ b/crates/phaser-bridge/src/client.rs @@ -10,7 +10,6 @@ use crate::descriptors::{BlockchainDescriptor, BridgeInfo}; /// Client for connecting to blockchain data bridges pub struct FlightBridgeClient { client: FlightClient, - endpoint: String, info: Option, } @@ -33,7 +32,7 @@ impl FlightBridgeClient { info!("Connecting via Unix domain socket: {}", path); // For Unix domain sockets, we need a special URI format - let uri = format!("http://[::]:50051"); // dummy URI for unix socket + let uri = "http://[::]:50051".to_string(); // dummy URI for unix socket // Use tonic's built-in Unix socket support use tonic::transport::Uri; @@ -74,11 +73,7 @@ impl FlightBridgeClient { let client = FlightClient::new(channel); - Ok(Self { - client, - endpoint, - info: None, - }) + Ok(Self { client, info: None }) } /// Get bridge information diff --git a/crates/phaser-bridge/src/descriptors.rs b/crates/phaser-bridge/src/descriptors.rs index c7dcbd8..4472c98 100644 --- a/crates/phaser-bridge/src/descriptors.rs +++ b/crates/phaser-bridge/src/descriptors.rs @@ -12,13 +12,13 @@ pub enum StreamType { Trie, // Raw trie nodes for state reconstruction } -impl ToString for StreamType { - fn to_string(&self) -> String { +impl std::fmt::Display for StreamType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - StreamType::Blocks => "blocks".to_string(), - StreamType::Transactions => "transactions".to_string(), - StreamType::Logs => "logs".to_string(), - StreamType::Trie => "trie".to_string(), + StreamType::Blocks => write!(f, "blocks"), + StreamType::Transactions => write!(f, "transactions"), + StreamType::Logs => write!(f, "logs"), + StreamType::Trie => write!(f, "trie"), } } } diff --git a/crates/phaser-query/src/buffer_manager.rs b/crates/phaser-query/src/buffer_manager.rs index e047294..03b39e4 100644 --- a/crates/phaser-query/src/buffer_manager.rs +++ b/crates/phaser-query/src/buffer_manager.rs @@ -271,14 +271,14 @@ impl CfToParquetBuffer { use std::io::Cursor; let cursor = Cursor::new(bytes); - let reader = StreamReader::try_new(cursor, None)?; + let mut reader = StreamReader::try_new(cursor, None)?; // Read the first (and only) batch - for batch in reader { - return Ok(batch?); + if let Some(batch_result) = reader.next() { + Ok(batch_result?) + } else { + Err(anyhow!("No batch found in serialized data")) } - - Err(anyhow!("No batch found in serialized data")) } /// Force flush any remaining items (for shutdown) diff --git a/crates/phaser-query/src/erigon_client.rs b/crates/phaser-query/src/erigon_client.rs index 063aba8..3122fda 100644 --- a/crates/phaser-query/src/erigon_client.rs +++ b/crates/phaser-query/src/erigon_client.rs @@ -1,18 +1,13 @@ use anyhow::Result; use futures::StreamExt; use tonic::transport::Channel; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; -use crate::proto::types::{H160, H256}; -use crate::proto::{ - BlockReply, BlockRequest, EthbackendClient, Event, LogsFilterRequest, SubscribeLogsReply, - SubscribeReply, SubscribeRequest, -}; +use crate::proto::{BlockRequest, EthbackendClient, Event, SubscribeRequest}; /// Client for connecting to Erigon's gRPC interface pub struct ErigonClient { client: EthbackendClient, - endpoint: String, } impl ErigonClient { @@ -37,7 +32,7 @@ impl ErigonClient { info!("Successfully connected to Erigon"); - Ok(Self { client, endpoint }) + Ok(Self { client }) } /// Test the connection by fetching the client version diff --git a/crates/phaser-query/src/indexer.rs b/crates/phaser-query/src/indexer.rs index fe51d7e..a4b910b 100644 --- a/crates/phaser-query/src/indexer.rs +++ b/crates/phaser-query/src/indexer.rs @@ -1,5 +1,5 @@ use crate::catalog::RocksDbCatalog; -use crate::index::{BlockPointer, FileInfo, LogPointer, TransactionPointer}; +use crate::index::FileInfo; use crate::PhaserConfig; use anyhow::Result; use std::fs; @@ -117,7 +117,7 @@ async fn index_block_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<() } /// Index a transaction Parquet file -async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { +async fn index_transaction_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { debug!("Indexing transaction file: {:?}", path); // TODO: Similar to block indexing @@ -128,7 +128,7 @@ async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Res } /// Index a log Parquet file -async fn index_log_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { +async fn index_log_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> { debug!("Indexing log file: {:?}", path); // TODO: Similar to block indexing diff --git a/crates/phaser-query/src/sql.rs b/crates/phaser-query/src/sql.rs index 000e20b..36888df 100644 --- a/crates/phaser-query/src/sql.rs +++ b/crates/phaser-query/src/sql.rs @@ -12,7 +12,7 @@ pub struct SqlServer { #[derive(Debug, Deserialize)] struct SqlQuery { - query: String, + _query: String, } #[derive(Debug, Serialize)] diff --git a/crates/schemas/evm/common/src/log.rs b/crates/schemas/evm/common/src/log.rs index e6bb36f..69e1016 100644 --- a/crates/schemas/evm/common/src/log.rs +++ b/crates/schemas/evm/common/src/log.rs @@ -58,7 +58,7 @@ pub struct LogContext { impl From for LogRecord { fn from(ctx: LogContext) -> Self { // Extract topics (up to 4) - let topic0 = ctx.topics.get(0).cloned(); + let topic0 = ctx.topics.first().cloned(); let topic1 = ctx.topics.get(1).cloned(); let topic2 = ctx.topics.get(2).cloned(); let topic3 = ctx.topics.get(3).cloned(); diff --git a/crates/schemas/evm/common/src/rpc_conversions.rs b/crates/schemas/evm/common/src/rpc_conversions.rs index 67c6ec3..c58e694 100644 --- a/crates/schemas/evm/common/src/rpc_conversions.rs +++ b/crates/schemas/evm/common/src/rpc_conversions.rs @@ -5,7 +5,7 @@ use crate::error::{EvmCommonError, Result}; use crate::log::LogRecord; use crate::transaction::{TransactionContext, TransactionRecord}; use alloy_consensus::Header as ConsensusHeader; -use alloy_network::{AnyHeader, AnyRpcBlock, AnyTxEnvelope, BlockResponse, TransactionResponse}; +use alloy_network::{AnyHeader, AnyRpcBlock, AnyTxEnvelope, TransactionResponse}; use alloy_rpc_types_eth::{Header as RpcHeader, Log as RpcLog}; use arrow_array::RecordBatch; use typed_arrow::prelude::BuildRows; @@ -122,7 +122,7 @@ pub fn convert_rpc_logs( for log in logs { // Extract topics (up to 4) let topics = log.topics(); - let topic0 = topics.get(0).map(|t| (*t).into()); + let topic0 = topics.first().map(|t| (*t).into()); let topic1 = topics.get(1).map(|t| (*t).into()); let topic2 = topics.get(2).map(|t| (*t).into()); let topic3 = topics.get(3).map(|t| (*t).into());