Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ test-data/streaming
test-data/rocksdb
erigon-bridge.log
commit-changes.sh
*config.yaml
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 70 additions & 7 deletions crates/bridges/erigon-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,76 @@ impl ErigonFlightBridge {
}
}
StreamType::Logs => {
error!("Historical log streaming not yet implemented");
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Historical log streaming not yet implemented"
))
));
// For logs, we need to execute blocks to generate receipts
let mut client_guard = blockdata_client.lock().await;

// First get blocks to extract timestamps
let mut timestamps = HashMap::new();
let mut block_stream = match client_guard.stream_blocks(start, end, 100).await {
Ok(s) => s,
Err(e) => {
error!("Failed to start block stream for timestamps: {}", e);
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
));
return;
}
};

// Collect timestamps
while let Some(batch_result) = block_stream.message().await.transpose() {
if let Ok(block_batch) = batch_result {
use alloy_consensus::Header;
use alloy_rlp::Decodable;
for block in &block_batch.blocks {
// Decode RLP header to get timestamp
if let Ok(header) = Header::decode(&mut block.rlp_header.as_slice()) {
timestamps.insert(block.block_number, header.timestamp as i64);
}
}
}
}

// Now execute blocks to get receipts (which contain logs)
info!("Executing blocks {}-{} to generate receipts/logs (this may be slow)", start, end);
let mut receipt_stream = match client_guard.execute_blocks(start, end, 100).await {
Ok(s) => s,
Err(e) => {
error!("Failed to start receipt stream: {}", e);
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
));
return;
}
};
drop(client_guard);

while let Some(batch_result) = receipt_stream.message().await.transpose() {
match batch_result {
Ok(receipt_batch) => {
match BlockDataConverter::receipts_to_logs_arrow(receipt_batch, &timestamps) {
Ok(record_batch) => {
debug!("Converted receipt batch to {} log rows", record_batch.num_rows());
yield Ok(record_batch);
}
Err(e) => {
error!("Failed to convert receipt batch: {}", e);
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
));
break;
}
}
}
Err(e) => {
error!("Failed to receive receipt batch: {}", e);
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
));
break;
}
}
}
}
StreamType::Trie => {
error!("Historical trie streaming not supported");
Expand Down
2 changes: 1 addition & 1 deletion crates/phaser-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::Stream;
use std::pin::Pin;
use tonic::{Request, Response, Status, Streaming};

use crate::descriptors::{BlockchainDescriptor, BridgeInfo};
use crate::descriptors::BridgeInfo;

/// Capabilities that a bridge can advertise
#[derive(Debug, Clone)]
Expand Down
9 changes: 2 additions & 7 deletions crates/phaser-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::descriptors::{BlockchainDescriptor, BridgeInfo};
/// Client for connecting to blockchain data bridges
pub struct FlightBridgeClient {
client: FlightClient,
endpoint: String,
info: Option<BridgeInfo>,
}

Expand All @@ -33,7 +32,7 @@ impl FlightBridgeClient {
info!("Connecting via Unix domain socket: {}", path);

// For Unix domain sockets, we need a special URI format
let uri = format!("http://[::]:50051"); // dummy URI for unix socket
let uri = "http://[::]:50051".to_string(); // dummy URI for unix socket

// Use tonic's built-in Unix socket support
use tonic::transport::Uri;
Expand Down Expand Up @@ -74,11 +73,7 @@ impl FlightBridgeClient {

let client = FlightClient::new(channel);

Ok(Self {
client,
endpoint,
info: None,
})
Ok(Self { client, info: None })
}

/// Get bridge information
Expand Down
12 changes: 6 additions & 6 deletions crates/phaser-bridge/src/descriptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ pub enum StreamType {
Trie, // Raw trie nodes for state reconstruction
}

impl ToString for StreamType {
fn to_string(&self) -> String {
impl std::fmt::Display for StreamType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamType::Blocks => "blocks".to_string(),
StreamType::Transactions => "transactions".to_string(),
StreamType::Logs => "logs".to_string(),
StreamType::Trie => "trie".to_string(),
StreamType::Blocks => write!(f, "blocks"),
StreamType::Transactions => write!(f, "transactions"),
StreamType::Logs => write!(f, "logs"),
StreamType::Trie => write!(f, "trie"),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/phaser-query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ alloy-rlp.workspace = true

[build-dependencies]
tonic-build = "0.13"

[dev-dependencies]
tempfile = "3.10"
48 changes: 34 additions & 14 deletions crates/phaser-query/src/bin/phaser-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,22 @@ async fn main() -> Result<()> {
status.chain_id, status.bridge_name
);
println!("Blocks: {}-{}", status.from_block, status.to_block);

// Progress includes blocks + transactions + logs (3 phases per segment)
let percent = if status.total_blocks > 0 {
(status.blocks_synced as f64 / status.total_blocks as f64) * 100.0
} else {
0.0
};
println!(
"Progress: {}/{} blocks",
status.blocks_synced, status.total_blocks
"Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)",
status.blocks_synced, status.total_blocks, percent
);
println!("Current block: {}", status.current_block);

// Only show highest completed block if it's meaningful (> from_block)
if status.current_block > status.from_block {
println!("Highest completed: block {}", status.current_block);
}
println!("Active workers: {}", status.active_workers);

if !status.error.is_empty() {
Expand Down Expand Up @@ -170,17 +181,20 @@ async fn main() -> Result<()> {
println!("Status: {}", status_str);
println!("Chain: {} / Bridge: {}", job.chain_id, job.bridge_name);
println!("Blocks: {}-{}", job.from_block, job.to_block);

let percent = if job.total_blocks > 0 {
(job.blocks_synced as f64 / job.total_blocks as f64) * 100.0
} else {
0.0
};
println!(
"Progress: {}/{} blocks ({:.1}%)",
job.blocks_synced,
job.total_blocks,
if job.total_blocks > 0 {
(job.blocks_synced as f64 / job.total_blocks as f64) * 100.0
} else {
0.0
}
"Progress: {}/{} blocks ({:.1}% - includes blocks+txs+logs)",
job.blocks_synced, job.total_blocks, percent
);
println!("Current block: {}", job.current_block);

if job.current_block > job.from_block {
println!("Highest completed: block {}", job.current_block);
}
println!("Active workers: {}", job.active_workers);
if !job.error.is_empty() {
println!("Error: {}", job.error);
Expand Down Expand Up @@ -231,9 +245,15 @@ async fn main() -> Result<()> {
println!(" Status: {}", status_str);
println!(" Chain: {} / Bridge: {}", job.chain_id, job.bridge_name);
println!(" Blocks: {}-{}", job.from_block, job.to_block);

let percent = if job.total_blocks > 0 {
(job.blocks_synced as f64 / job.total_blocks as f64) * 100.0
} else {
0.0
};
println!(
" Progress: {}/{} blocks",
job.blocks_synced, job.total_blocks
" Progress: {}/{} blocks ({:.1}%)",
job.blocks_synced, job.total_blocks, percent
);
println!(" Workers: {}", job.active_workers);
if !job.error.is_empty() {
Expand Down
10 changes: 5 additions & 5 deletions crates/phaser-query/src/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,14 @@ impl CfToParquetBuffer {
use std::io::Cursor;

let cursor = Cursor::new(bytes);
let reader = StreamReader::try_new(cursor, None)?;
let mut reader = StreamReader::try_new(cursor, None)?;

// Read the first (and only) batch
for batch in reader {
return Ok(batch?);
if let Some(batch_result) = reader.next() {
Ok(batch_result?)
} else {
Err(anyhow!("No batch found in serialized data"))
}

Err(anyhow!("No batch found in serialized data"))
}

/// Force flush any remaining items (for shutdown)
Expand Down
11 changes: 3 additions & 8 deletions crates/phaser-query/src/erigon_client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use anyhow::Result;
use futures::StreamExt;
use tonic::transport::Channel;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};

use crate::proto::types::{H160, H256};
use crate::proto::{
BlockReply, BlockRequest, EthbackendClient, Event, LogsFilterRequest, SubscribeLogsReply,
SubscribeReply, SubscribeRequest,
};
use crate::proto::{BlockRequest, EthbackendClient, Event, SubscribeRequest};

/// Client for connecting to Erigon's gRPC interface
pub struct ErigonClient {
client: EthbackendClient<Channel>,
endpoint: String,
}

impl ErigonClient {
Expand All @@ -37,7 +32,7 @@ impl ErigonClient {

info!("Successfully connected to Erigon");

Ok(Self { client, endpoint })
Ok(Self { client })
}

/// Test the connection by fetching the client version
Expand Down
6 changes: 3 additions & 3 deletions crates/phaser-query/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::catalog::RocksDbCatalog;
use crate::index::{BlockPointer, FileInfo, LogPointer, TransactionPointer};
use crate::index::FileInfo;
use crate::PhaserConfig;
use anyhow::Result;
use std::fs;
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn index_block_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()
}

/// Index a transaction Parquet file
async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> {
async fn index_transaction_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> {
debug!("Indexing transaction file: {:?}", path);

// TODO: Similar to block indexing
Expand All @@ -128,7 +128,7 @@ async fn index_transaction_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Res
}

/// Index a log Parquet file
async fn index_log_file(catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> {
async fn index_log_file(_catalog: &RocksDbCatalog, path: &PathBuf) -> Result<()> {
debug!("Indexing log file: {:?}", path);

// TODO: Similar to block indexing
Expand Down
14 changes: 7 additions & 7 deletions crates/phaser-query/src/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl ParquetWriter {
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
{
if array.len() > 0 {
if !array.is_empty() {
array.value(0)
} else {
return Ok(()); // Skip empty batch
Expand Down Expand Up @@ -129,12 +129,13 @@ impl ParquetWriter {
let segment_start = (block_num / self.segment_size) * self.segment_size;
let segment_end = segment_start + self.segment_size - 1;

// Create temporary filename with .tmp extension
// Format: {topic}_{segment_start}-{segment_end}_from_{block}_to_{block}.parquet.tmp
// Final rename will update the actual range
// Create temporary filename showing the actual range we'll write
// Format: {topic}_from_{actual_start}_to_{segment_end}.parquet.tmp
// Live sync starts mid-segment and writes to segment boundary
// Historical sync starts at segment boundary
let filename = format!(
"{}_{}-{}_from_{}_to_{}.parquet.tmp",
self.data_type, segment_start, segment_end, block_num, block_num
"{}_from_{}_to_{}.parquet.tmp",
self.data_type, block_num, segment_end
);
let temp_path = self.data_dir.join(filename);

Expand Down Expand Up @@ -288,7 +289,6 @@ fn parse_encoding(s: &str) -> Encoding {
match s.to_lowercase().as_str() {
"plain" => Encoding::PLAIN,
"rle" => Encoding::RLE,
"bit_packed" => Encoding::BIT_PACKED,
"delta_binary_packed" => Encoding::DELTA_BINARY_PACKED,
"delta_length_byte_array" => Encoding::DELTA_LENGTH_BYTE_ARRAY,
"delta_byte_array" => Encoding::DELTA_BYTE_ARRAY,
Expand Down
2 changes: 1 addition & 1 deletion crates/phaser-query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct SqlServer {

#[derive(Debug, Deserialize)]
struct SqlQuery {
query: String,
_query: String,
}

#[derive(Debug, Serialize)]
Expand Down
Loading