Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 309 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ resolver = "2"
members = [
"crates/phaser-query",
"crates/phaser-bridge",
"crates/bridges/erigon-bridge",
"crates/bridges/jsonrpc-bridge",
"crates/bridges/evm/erigon-bridge",
"crates/bridges/evm/jsonrpc-bridge",
"crates/schemas/evm/common",
]

[workspace.dependencies]
# Internal crates
erigon-bridge = { path = "crates/bridges/evm/erigon-bridge" }
jsonrpc-bridge = { path = "crates/bridges/evm/jsonrpc-bridge" }
phaser-bridge = { path = "crates/phaser-bridge" }
evm-common = { path = "crates/schemas/evm/common" }

# External dependencies
tokio = { version = "1.41", features = ["full"] }
async-trait = "0.1"
Expand Down
36 changes: 36 additions & 0 deletions commit-reorganization.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
set -e

# Add all changes (will detect moves properly)
git add -A

# Create commit
git commit -m "$(cat <<'EOF'
Reorganize EVM bridges under crates/bridges/evm/

Moved bridges to better namespace structure:
- crates/bridges/erigon-bridge → crates/bridges/evm/erigon-bridge
- crates/bridges/jsonrpc-bridge → crates/bridges/evm/jsonrpc-bridge

Benefits:
- Clear EVM-specific namespace for future multi-chain support
- Consistent location for all EVM-related bridges
- Better organization as we add more bridge types

Also updated to use workspace dependencies:
- Added erigon-bridge, jsonrpc-bridge, phaser-bridge, evm-common to
workspace.dependencies in root Cargo.toml
- Updated all crates to use `{ workspace = true }` instead of relative paths
- Cleaner dependency management across the workspace

No functional changes - pure reorganization.
EOF
)"

echo ""
echo "✓ Reorganization committed successfully!"
echo ""
echo "New structure:"
echo " crates/bridges/evm/"
echo " ├── erigon-bridge/"
echo " └── jsonrpc-bridge/"
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ path = "src/main.rs"

[dependencies]
# Local dependencies
phaser-bridge = { path = "../../phaser-bridge" }
evm-common = { path = "../../schemas/evm/common" }
phaser-bridge = { workspace = true }
evm-common = { workspace = true }

# Typed Arrow support
typed-arrow = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ path = "src/main.rs"

[dependencies]
# Bridge framework
phaser-bridge = { path = "../../phaser-bridge" }
evm-common = { path = "../../schemas/evm/common" }
phaser-bridge = { workspace = true }
evm-common = { workspace = true }

# Alloy - all from workspace
alloy = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::{stream, Stream, StreamExt};
use phaser_bridge::{
bridge::{BridgeCapabilities, FlightBridge},
descriptors::{BridgeInfo, StreamType},
subscription::QueryMode,
};
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -23,6 +24,7 @@ pub struct JsonRpcFlightBridge {
chain_id: u64,
streaming_service: Arc<StreamingService>,
node_url: String,
max_batch_size: usize,
}

impl JsonRpcFlightBridge {
Expand Down Expand Up @@ -61,21 +63,31 @@ impl JsonRpcFlightBridge {
chain_id,
streaming_service,
node_url,
max_batch_size: 1000, // Default batch size, matches BridgeCapabilities
})
}

/// Get bridge information
pub fn bridge_info(&self) -> BridgeInfo {
let mut capabilities = vec![
"blocks".to_string(),
"transactions".to_string(),
"logs".to_string(),
];

if self.client.supports_subscriptions() {
capabilities.push("streaming".to_string());
capabilities.push("subscriptions".to_string());
} else {
capabilities.push("polling".to_string());
}

BridgeInfo {
name: "jsonrpc-bridge".to_string(),
node_type: "json-rpc".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
chain_id: self.chain_id,
capabilities: if self.client.supports_subscriptions() {
vec!["streaming".to_string(), "subscriptions".to_string()]
} else {
vec!["polling".to_string()]
},
capabilities,
current_block: 0, // Would need to query this
oldest_block: 0, // Would need to query this
}
Expand Down Expand Up @@ -106,6 +118,128 @@ impl JsonRpcFlightBridge {
)),
}
}

/// Create a stream for historical data (specific block range)
fn create_historical_stream(
&self,
stream_type: StreamType,
start_block: u64,
end_block: u64,
) -> impl Stream<Item = Result<arrow::record_batch::RecordBatch, Status>> + Send {
let client = self.client.clone();
let batch_size = self.max_batch_size as u64;

async_stream::stream! {
use alloy::eips::BlockNumberOrTag;
use alloy_rpc_types_eth::Filter;
use arrow::compute::concat_batches;
use tracing::debug;

info!("Fetching historical {:?} from block {} to {} (batch size: {})",
stream_type, start_block, end_block, batch_size);

let mut current_block = start_block;

while current_block <= end_block {
let batch_end = std::cmp::min(current_block + batch_size - 1, end_block);
let batch_count = (batch_end - current_block + 1) as usize;

debug!("Fetching batch: blocks {} to {} ({} blocks)", current_block, batch_end, batch_count);

// Collect RecordBatches for this batch
let mut record_batches = Vec::new();

for block_num in current_block..=batch_end {
// Fetch block with transactions
let block = match client.get_block_with_txs(BlockNumberOrTag::Number(block_num)).await {
Ok(Some(block)) => block,
Ok(None) => {
error!("Block #{} not found", block_num);
continue;
}
Err(e) => {
error!("Failed to fetch block #{}: {}", block_num, e);
yield Err(Status::internal(format!("Failed to fetch block {}: {}", block_num, e)));
continue;
}
};

match stream_type {
StreamType::Blocks => {
// Convert block header to RecordBatch
match evm_common::rpc_conversions::convert_any_header(&block.header) {
Ok(batch) => record_batches.push(batch),
Err(e) => {
error!("Failed to convert block header #{}: {}", block_num, e);
yield Err(Status::internal(format!("Conversion error: {}", e)));
}
}
}
StreamType::Transactions => {
// Convert transactions (if any)
if !block.transactions.is_empty() {
match JsonRpcConverter::convert_transactions(&block) {
Ok(batch) => record_batches.push(batch),
Err(e) => {
error!("Failed to convert transactions for block #{}: {}", block_num, e);
yield Err(Status::internal(format!("Conversion error: {}", e)));
}
}
}
}
StreamType::Logs => {
// Fetch and convert logs
let filter = Filter::new().from_block(block_num).to_block(block_num);

match client.get_logs(filter).await {
Ok(logs) if !logs.is_empty() => {
let block_hash = block.header.hash;
match JsonRpcConverter::convert_logs(&logs, block_num, block_hash, block.header.timestamp) {
Ok(batch) => record_batches.push(batch),
Err(e) => {
error!("Failed to convert logs for block #{}: {}", block_num, e);
yield Err(Status::internal(format!("Conversion error: {}", e)));
}
}
}
Ok(_) => {
// No logs in this block, skip
}
Err(e) => {
error!("Failed to fetch logs for block #{}: {}", block_num, e);
yield Err(Status::internal(format!("Failed to fetch logs: {}", e)));
}
}
}
StreamType::Trie => {
yield Err(Status::unimplemented("Trie streaming not supported via JSON-RPC"));
return;
}
}
}

// If we collected any batches, concatenate and yield them
if !record_batches.is_empty() {
let schema = record_batches[0].schema();
match concat_batches(&schema, &record_batches) {
Ok(combined_batch) => {
debug!("Yielding combined batch with {} rows for blocks {} to {}",
combined_batch.num_rows(), current_block, batch_end);
yield Ok(combined_batch);
}
Err(e) => {
error!("Failed to concatenate batches: {}", e);
yield Err(Status::internal(format!("Failed to concatenate batches: {}", e)));
}
}
}

current_block = batch_end + 1;
}

info!("Completed historical {:?} query for blocks {} to {}", stream_type, start_block, end_block);
}
}
}

#[async_trait]
Expand All @@ -117,10 +251,10 @@ impl FlightBridge for JsonRpcFlightBridge {
async fn get_capabilities(&self) -> std::result::Result<BridgeCapabilities, Status> {
Ok(BridgeCapabilities {
supports_historical: true, // Can fetch historical blocks via JSON-RPC
supports_streaming: self.client.supports_subscriptions(),
supports_streaming: true, // Always support streaming (HTTP uses polling, WS/IPC use subscriptions)
supports_reorg_notifications: false,
supports_filters: true,
max_batch_size: 1000,
max_batch_size: self.max_batch_size,
})
}

Expand Down Expand Up @@ -234,35 +368,54 @@ impl FlightBridge for JsonRpcFlightBridge {
})?;

let stream_type = blockchain_desc.stream_type;
info!("Processing do_get for {:?}", stream_type);

// Get the appropriate receiver
let receiver = match stream_type {
StreamType::Blocks => self.streaming_service.subscribe_blocks(),
StreamType::Transactions => self.streaming_service.subscribe_transactions(),
StreamType::Logs => self.streaming_service.subscribe_logs(),
StreamType::Trie => {
return Err(Status::unimplemented(
"Trie streaming not supported via JSON-RPC",
))
}
};
let query_mode = blockchain_desc.query_mode.clone();
info!(
"Processing do_get for {:?} in {:?} mode",
stream_type, query_mode
);

// Get schema for the stream type
let schema = Self::get_schema_for_type(stream_type)?;

// Create stream from the broadcast receiver
let batch_stream = async_stream::stream! {
let mut rx = receiver;
while let Ok(batch) = rx.recv().await {
yield Ok(batch);
// Branch based on query mode
let batch_stream: Pin<
Box<dyn Stream<Item = Result<arrow::record_batch::RecordBatch, Status>> + Send>,
> = match query_mode {
QueryMode::Historical { start, end } => {
// Historical query - fetch specific block range
Box::pin(self.create_historical_stream(stream_type, start, end))
}
QueryMode::Live => {
// Live streaming - subscribe to broadcast channels
let receiver = match stream_type {
StreamType::Blocks => self.streaming_service.subscribe_blocks(),
StreamType::Transactions => self.streaming_service.subscribe_transactions(),
StreamType::Logs => self.streaming_service.subscribe_logs(),
StreamType::Trie => {
return Err(Status::unimplemented(
"Trie streaming not supported via JSON-RPC",
))
}
};

Box::pin(async_stream::stream! {
let mut rx = receiver;
while let Ok(batch) = rx.recv().await {
yield Ok(batch);
}
})
}
};

// Convert Status errors to FlightError for the encoder
let flight_batch_stream = batch_stream.map(|result| {
result.map_err(|status| arrow_flight::error::FlightError::Tonic(Box::new(status)))
});

// Encode as Flight data
let encoder = FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(batch_stream);
.build(flight_batch_stream);

let flight_stream = encoder.map(|result| {
result.map_err(|e| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ impl JsonRpcClient {
Arc::new(
ProviderBuilder::new()
.network::<AnyNetwork>()
.on_ws(alloy::transports::ws::WsConnect::new(url))
.connect_ws(alloy::transports::ws::WsConnect::new(url))
.await?,
)
} else if url.ends_with(".ipc") || url.starts_with("/") {
info!("Using IPC transport with subscription support");
Arc::new(
ProviderBuilder::new()
.network::<AnyNetwork>()
.on_ipc(alloy::transports::ipc::IpcConnect::new(url.to_string()))
.connect_ipc(alloy::transports::ipc::IpcConnect::new(url.to_string()))
.await?,
)
} else {
info!("Using HTTP transport (no subscription support)");
Arc::new(
ProviderBuilder::new()
.network::<AnyNetwork>()
.on_http(url.parse()?),
.connect_http(url.parse()?),
)
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy::network::{AnyHeader, AnyRpcBlock};
use alloy::network::AnyRpcBlock;
use alloy_rpc_types_eth::{Header as RpcHeader, Log};
use anyhow::Result;
use arrow::datatypes::Schema;
Expand Down
6 changes: 3 additions & 3 deletions crates/phaser-query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ path = "src/bin/phaser-cli.rs"

[dependencies]
# Local dependencies
phaser-bridge = { path = "../phaser-bridge" }
erigon-bridge = { path = "../bridges/erigon-bridge" }
evm-common = { path = "../schemas/evm/common" }
phaser-bridge = { workspace = true }
erigon-bridge = { workspace = true }
evm-common = { workspace = true }

# Typed Arrow support
typed-arrow = { workspace = true }
Expand Down
Loading