diff --git a/docs/developing-bridges.md b/docs/developing-bridges.md new file mode 100644 index 0000000..f863ffc --- /dev/null +++ b/docs/developing-bridges.md @@ -0,0 +1,116 @@ +# Developing Phaser Bridges + +## Overview + +Bridges translate blockchain node protocols into Arrow Flight streams. Bridges may buffer data for batching or validation (e.g., merkle trie construction), but don't cache historical data - that's phaser-query's responsibility. + +**Architecture**: Node → Bridge (convert) → Arrow Flight → phaser-query (buffer/write) → Parquet + +**Responsibilities**: +- Bridge: Stateless protocol translation +- phaser-query: Buffering, gap detection, Parquet writing + +## Implementation + +### FlightBridge Trait + +Implement the `FlightBridge` trait from [`../crates/phaser-bridge/src/bridge.rs`](../crates/phaser-bridge/src/bridge.rs). Core methods: + +- `get_info()` - Return bridge metadata (name, node type, chain ID, capabilities) +- `get_capabilities()` - Advertise supported features (historical, streaming, validation) +- `do_get()` - Stream data for a ticket (decode descriptor, return Arrow Flight stream) +- `health_check()` - Verify node connectivity + +See [`../crates/bridges/evm/erigon-bridge/src/bridge.rs`](../crates/bridges/evm/erigon-bridge/src/bridge.rs) for complete implementation. + +### Arrow Schemas + +Define Arrow schemas for each data type. For EVM chains, use `evm-common` crate schemas. For other chains, define custom schemas. + +**Critical**: Include `_block_num` column (UInt64) for gap detection and query optimization. + +Schema references: +- `evm_common::block_arrow_schema()` +- `evm_common::transaction_arrow_schema()` +- `evm_common::log_arrow_schema()` + +### Data Conversion + +Create converter that decodes node-specific formats (RLP, JSON, protobuf) to Arrow RecordBatches. Pattern: + +1. Decode node format to intermediate representation +2. Map to typed records (using `typed-arrow` or manual builders) +3. Build RecordBatch from Arrow arrays + +See [`../crates/bridges/evm/erigon-bridge/src/converter.rs`](../crates/bridges/evm/erigon-bridge/src/converter.rs) for RLP → Arrow conversion example. + +### Query Modes + +Support two modes via `BlockchainDescriptor`: + +**Historical** - Fetch specific block ranges from node +**Live** - Subscribe to new data as it arrives + +See [`../crates/phaser-bridge/src/subscription.rs`](../crates/phaser-bridge/src/subscription.rs) for query mode definitions. + +### Validation + +Implement two-stage validation: + +**Ingestion** - Validate raw data from node (RLP encoding, JSON structure) +**Conversion** - Validate Arrow RecordBatch after conversion (schema compliance, field constraints) + +Make validation configurable via `ValidationStage` enum. See [`../crates/phaser-bridge/src/descriptors.rs#L69-L80`](../crates/phaser-bridge/src/descriptors.rs#L69-L80). + +### Streaming + +Implement `do_get()` to return Arrow Flight stream. Respect client preferences: +- `batch_size_hint` - Blocks per batch +- `max_message_bytes` - Maximum message size +- `compression` - gRPC compression (none, gzip, zstd) + +See [`../crates/bridges/evm/erigon-bridge/src/streaming_service.rs`](../crates/bridges/evm/erigon-bridge/src/streaming_service.rs) for live streaming implementation. + +## Configuration + +Bridges run as standalone services, connect via Unix socket or TCP: + +```bash +# Unix socket (recommended for local phaser-query) +erigon-bridge --erigon-grpc localhost:9090 \ + --ipc-path /var/run/erigon-bridge.sock \ + --chain-id 1 + +# TCP (for remote phaser-query) +erigon-bridge --erigon-grpc localhost:9090 \ + --flight-addr 0.0.0.0:8091 \ + --chain-id 1 +``` + +Configure in phaser-query config.yaml: + +```yaml +bridges: + - name: erigon + chain_id: 1 + endpoint: /var/run/erigon-bridge.sock # or localhost:8091 + protocol: grpc +``` + +## Performance + +**Batch Size** - Larger batches improve throughput, increase memory. Respect client's `batch_size_hint` and `max_message_bytes`. + +**Compression** - Enable gRPC compression. ZSTD for best ratio, gzip for speed. + +**Connection Pooling** - Use pools for concurrent historical queries. Single connection sufficient for live streaming. + +**Validation** - Ingestion validation is cheap (decode check). Conversion validation higher cost (schema + constraints). Make configurable. + +## Reference Implementation + +See erigon-bridge for complete example: +- [`../crates/bridges/evm/erigon-bridge/src/bridge.rs`](../crates/bridges/evm/erigon-bridge/src/bridge.rs) - FlightBridge implementation +- [`../crates/bridges/evm/erigon-bridge/src/converter.rs`](../crates/bridges/evm/erigon-bridge/src/converter.rs) - RLP to Arrow conversion +- [`../crates/bridges/evm/erigon-bridge/src/streaming_service.rs`](../crates/bridges/evm/erigon-bridge/src/streaming_service.rs) - Live streaming +- [`../crates/bridges/evm/erigon-bridge/src/client.rs`](../crates/bridges/evm/erigon-bridge/src/client.rs) - Node client wrapper diff --git a/docs/parquet-files.md b/docs/parquet-files.md index 8d261ee..30b82b4 100644 --- a/docs/parquet-files.md +++ b/docs/parquet-files.md @@ -8,19 +8,22 @@ Phaser uses Parquet files to store blockchain data (blocks, transactions, logs). ### Final Files ``` -{data_type}_from_{start_block}_to_{end_block}.parquet +{data_type}_from_{segment_start}_to_{segment_end}_{sequence}.parquet ``` Examples: -- `blocks_from_0_to_99999.parquet` -- `transactions_from_100000_to_199999.parquet` -- `logs_from_200000_to_299999.parquet` +- `blocks_from_0_to_499999_0.parquet` +- `transactions_from_500000_to_999999_0.parquet` +- `transactions_from_500000_to_999999_1.parquet` +- `logs_from_1000000_to_1499999_0.parquet` **Key points:** - `data_type`: One of `blocks`, `transactions`, or `logs` -- `start_block` and `end_block`: The actual block range contained in the file +- `segment_start` and `segment_end`: The 500K block segment this file belongs to +- `sequence`: Incrementing number starting from 0 for file rotations within a segment - Range is **inclusive** on both ends -- No segment information in filename - segments are a logical concept for parallel processing +- Files are named by segment boundaries, not actual data ranges +- Multiple files can exist for the same segment (indicated by sequence number) ### Temporary Files During Writing ``` @@ -45,54 +48,24 @@ Phaser stores two types of metadata in Parquet files: ### Phaser Metadata Format -Each file contains a compact, versioned metadata structure in the Parquet file footer: - -```rust -pub struct PhaserMetadata { - pub version: u8, // Format version (currently 1) - pub segment_start: u64, // Full segment range start (e.g., 0) - pub segment_end: u64, // Full segment range end (e.g., 499999) - pub responsibility_start: u64, // This file's responsibility start (e.g., 1) - pub responsibility_end: u64, // This file's responsibility end (e.g., 499999) - pub data_start: u64, // Actual first block with data (e.g., 46147) - pub data_end: u64, // Actual last block with data (e.g., 499998) - pub data_type: String, // "blocks", "transactions", or "logs" -} -``` +Each file contains metadata with three range types: -**Three types of ranges:** - **Segment range**: The full 500K block segment this file belongs to - **Responsibility range**: The block range this file is responsible for covering - **Data range**: The actual blocks that have data (may skip empty blocks) -The metadata is: -- Encoded with bincode (compact binary format) -- Base64-encoded for storage in Parquet key-value metadata -- Stored in a single `phaser.meta` key -- Updated in-place without rewriting data (see below) - -Reading metadata: -```rust -use phaser_parquet_metadata::PhaserMetadata; - -let file = File::open(path)?; -let reader = SerializedFileReader::new(file)?; -let metadata = reader.metadata(); - -if let Some(kv_metadata) = metadata.file_metadata().key_value_metadata() { - if let Ok(Some(phaser_meta)) = PhaserMetadata::from_key_value_metadata(kv_metadata) { - println!("Segment: {}-{}", phaser_meta.segment_start, phaser_meta.segment_end); - println!("Data: {}-{}", phaser_meta.data_start, phaser_meta.data_end); - } -} -``` +Metadata storage: +- Encoded with bincode, base64-encoded +- Stored in `phaser.meta` key-value metadata +- Can be updated in-place without rewriting file data + +See [`../crates/phaser-parquet-metadata/src/lib.rs`](../crates/phaser-parquet-metadata/src/lib.rs) for `PhaserMetadata` struct definition and read/write methods. ### In-Place Metadata Updates -Metadata can be updated **without rewriting the entire file** using the `parquet-meta` CLI tool or the library: +Metadata can be updated without rewriting file data using the `parquet-meta` CLI tool: ```bash -# Update metadata on an existing file parquet-meta fix-meta transactions_from_1_to_499999.parquet \ --segment-start 0 \ --segment-end 499999 \ @@ -102,124 +75,48 @@ parquet-meta fix-meta transactions_from_1_to_499999.parquet \ --infer # Infer data_start/data_end from statistics ``` -This works by: -1. Reading the existing Parquet footer (small, at end of file) -2. Updating the key-value metadata -3. Serializing the new footer using `ParquetMetaDataWriter` -4. Truncating the file to remove the old footer -5. Appending the new footer - -**Benefits:** -- No data copying (preserves all row groups and compression) -- Works on multi-terabyte files in milliseconds -- Safe: detects corrupted footers and fails before writing -- Preserves file structure: row groups, column indexes, compression settings - -**Safety:** -- Validates PAR1 magic bytes before updating -- Fails if footer is corrupted (detected during read) -- Uses atomic file operations where possible -- Tested with files containing thousands of row groups +Works by reading footer, updating key-value metadata, truncating old footer, appending new footer. No data copying - works on multi-terabyte files in milliseconds. -### Block Range Statistics +See [`../crates/phaser-parquet-metadata/src/lib.rs`](../crates/phaser-parquet-metadata/src/lib.rs) for implementation. -Every Parquet file has **column-level statistics** enabled for the `_block_num` column: - -```rust -// In ParquetWriter -builder.set_column_statistics_enabled("_block_num".into(), EnabledStatistics::Page); -``` +### Block Range Statistics -This stores min/max block numbers in the Parquet file footer, allowing: -- **Fast gap detection**: Read block ranges without opening files -- **Efficient queries**: Skip files outside requested range -- **Resume logic**: Determine what data exists without scanning contents - -Reading statistics: -```rust -let file = File::open(path)?; -let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; -let metadata = builder.metadata(); - -// Read min/max from row group statistics -for row_group in metadata.row_groups() { - let column_stats = row_group.column(block_num_col_idx).statistics(); - // Statistics are stored as Int64 at Parquet physical level - if let Statistics::Int64(int_stats) = stats { - let min_block = int_stats.min_opt().map(|&v| v as u64); - let max_block = int_stats.max_opt().map(|&v| v as u64); - } -} -``` +Column-level statistics are enabled for `_block_num` column, storing min/max block numbers in file footer. This enables fast gap detection and query optimization without opening files. -**Note**: Parquet stores UInt64 as Int64 at the physical level, so we cast when reading. +See [`../crates/phaser-query/src/parquet_writer.rs`](../crates/phaser-query/src/parquet_writer.rs) for statistics configuration. ## File Creation Process -### 1. Start New File -```rust -ParquetWriter::start_new_file(block_num, schema) -``` -- Creates temporary file with timestamp -- Initializes ArrowWriter with schema and WriterProperties -- Tracks `start_block` (set to first block number written) +Files are created in three stages: -### 2. Write Batches -```rust -ParquetWriter::write_batch(batch) -``` -- Writes RecordBatch to current file -- Updates `end_block` to latest block written -- Checks if file should be finalized (segment boundary or size limit) +1. **Start** - Create `.tmp` file, initialize writer, track starting block +2. **Write** - Append batches, update end block, check finalization triggers +3. **Finalize** - Close writer, rename to final format, update metadata -### 3. Finalize File -```rust -ParquetWriter::finalize_current_file() -``` -- Closes writer (flushes all data and writes footer with statistics) -- Renames `.tmp` file to final format with actual block range -- Logs completion with row count and block range +Files finalize when: +- Segment boundary crossed +- Size limit exceeded -### 4. Finalization Triggers +This ensures files never span segments and sizes stay bounded. -Files are finalized and a new file started when: -1. **Segment boundary crossed**: `(block_num / segment_size) != (start_block / segment_size)` -2. **Size limit exceeded**: File size >= `max_file_size_bytes` - -This means: -- Multiple files can cover a single segment -- Files never span segment boundaries -- File sizes are bounded +See [`../crates/phaser-query/src/parquet_writer.rs`](../crates/phaser-query/src/parquet_writer.rs) for implementation. ## Multiple Files Per Segment -Segments are logical units for parallel processing (e.g., 500K blocks). Physical files can be smaller: +Segments are logical units for parallel processing (e.g., 500K blocks). When files reach the size limit before the segment ends, they rotate creating multiple files with incrementing sequence numbers: ``` Segment 0 (blocks 0-499,999): - - blocks_from_0_to_249999.parquet - - blocks_from_250000_to_499999.parquet + - blocks_from_0_to_499999_0.parquet (sequence 0) + - blocks_from_0_to_499999_1.parquet (sequence 1, if rotated) Segment 1 (blocks 500,000-999,999): - - blocks_from_500000_to_999999.parquet + - blocks_from_500000_to_999999_0.parquet ``` -Gap detection checks if the **union** of file ranges covers the segment: -```rust -// Sort files by start block -ranges.sort_by_key(|r| r.start); - -// Check continuous coverage -let mut covered_up_to = segment_start - 1; -for range in ranges { - if range.start > covered_up_to + 1 { - return false; // Gap found - } - covered_up_to = covered_up_to.max(range.end); -} - -covered_up_to >= segment_end // Segment complete if covered to end -``` +Gap detection uses responsibility ranges from metadata to verify continuous coverage across all files in a segment. + +See [`../crates/phaser-query/src/sync/data_scanner.rs`](../crates/phaser-query/src/sync/data_scanner.rs) for gap detection implementation. ## Configuration @@ -279,7 +176,7 @@ If a write is interrupted: ## Example: File Lifecycle ``` -1. Worker starts segment 0 (blocks 0-499,999) +1. Worker starts segment 0 (blocks 0-499,999), sequence 0 Create: blocks_1733160000123.parquet.tmp 2. Write blocks 0-99,999 @@ -289,7 +186,9 @@ If a write is interrupted: Update: end_block = 199999 4. Block 200,000 arrives, size limit reached - Close writer, rename to: blocks_from_0_to_199999.parquet + Close writer, rename to: blocks_from_0_to_499999_0.parquet + Metadata: segment 0-499999, responsibility 0-199999, data 0-199999 + Increment sequence to 1 Create: blocks_1733160005678.parquet.tmp 5. Write blocks 200,000-299,999 @@ -302,9 +201,11 @@ If a write is interrupted: Update: end_block = 499999 8. Block 500,000 arrives, segment boundary crossed - Close writer, rename to: blocks_from_200000_to_499999.parquet + Close writer, rename to: blocks_from_0_to_499999_1.parquet + Metadata: segment 0-499999, responsibility 200000-499999, data 200000-499999 + Reset sequence to 0 for next segment -Result: Segment 0 covered by 2 files +Result: Segment 0 covered by 2 files with contiguous responsibility ranges ``` ## Querying Files diff --git a/docs/using-bridges.md b/docs/using-bridges.md new file mode 100644 index 0000000..58d9872 --- /dev/null +++ b/docs/using-bridges.md @@ -0,0 +1,170 @@ +# Using Phaser Bridges + +Phaser bridges translate blockchain node protocols into Arrow Flight streams for efficient data sync. + +## Available Bridges + +### Erigon Bridge + +Connects to Erigon nodes via gRPC to stream EVM blockchain data. + +**Supports:** +- Blocks, transactions, logs +- Historical queries (specific block ranges) +- Live streaming (chain head subscription) +- Validation at ingestion and conversion stages + +**Requirements:** +- Erigon node with gRPC enabled +- Network access to Erigon gRPC port (default: 9090) + +**Connection:** +```yaml +# config.yaml +bridges: + - name: erigon + chain_id: 1 + endpoint: localhost:9090 + protocol: grpc +``` + +Start the bridge: +```bash +erigon-bridge --erigon-grpc localhost:9090 +``` + +The bridge listens on a Unix socket or TCP port for phaser-query to connect. + +### JSON-RPC Bridge + +Connects to standard Ethereum JSON-RPC endpoints. + +**Supports:** +- Blocks, transactions, logs +- Historical queries only (no live streaming) +- Works with any JSON-RPC compatible node (Geth, Nethermind, etc.) + +**Requirements:** +- JSON-RPC endpoint (HTTP or WebSocket) +- Archive node for historical data access + +**Connection:** +```yaml +bridges: + - name: rpc + chain_id: 1 + endpoint: https://eth-mainnet.example.com + protocol: jsonrpc +``` + +Start the bridge: +```bash +jsonrpc-bridge --rpc-url https://eth-mainnet.example.com +``` + +## Data Schemas + +All bridges provide data in standardized Arrow schemas: + +### Blocks +- `number` (UInt64) - Block number +- `hash` (Binary) - Block hash +- `parent_hash` (Binary) +- `timestamp` (UInt64) +- `gas_used` (UInt64) +- `gas_limit` (UInt64) +- Additional chain-specific fields + +### Transactions +- `hash` (Binary) +- `block_number` (UInt64) +- `from` (Binary) - Sender address +- `to` (Binary) - Recipient address +- `value` (Binary) - Amount (U256 as bytes) +- `gas` (UInt64) +- `gas_price` (Binary) +- `nonce` (UInt64) +- `input` (Binary) - Call data + +### Logs +- `address` (Binary) - Contract address +- `block_number` (UInt64) +- `transaction_hash` (Binary) +- `log_index` (UInt32) +- `topics` (List) - Indexed event parameters +- `data` (Binary) - Non-indexed parameters + +## Validation + +Bridges can validate data at two stages: + +**Ingestion** - Validates raw data from the node +- Checks RLP encoding +- Verifies structural integrity +- Minimal performance impact + +**Conversion** - Validates after Arrow conversion +- Ensures schema compliance +- Checks field constraints +- Higher accuracy, slight performance cost + +Configure in phaser-query: +```yaml +validation: both # none, ingestion, conversion, both +``` + +## Performance Tuning + +### Batch Size + +Controls how many blocks are fetched per request: +```yaml +batch_size_hint: 100 # Default +``` + +Larger batches improve throughput but increase memory usage. + +### Compression + +Enable compression for network transfer: +```yaml +compression: zstd # none, gzip, zstd +``` + +ZSTD offers best compression ratio. Gzip is faster. + +### Message Size + +Maximum message size for large batches: +```yaml +max_message_bytes: 33554432 # 32MB default +``` + +Increase for blocks with many transactions. + +## Monitoring + +Bridges expose metrics on :9091/metrics: + +- `bridge_blocks_served` - Total blocks streamed +- `bridge_requests_total` - Request count by type +- `bridge_request_duration_seconds` - Request latency +- `bridge_errors_total` - Error count by type + +## Troubleshooting + +**Bridge fails to connect to node:** +- Verify node gRPC/RPC endpoint is accessible +- Check firewall rules +- Confirm node is synced + +**Data validation errors:** +- Check node is not corrupt or desynced +- Try reducing validation level +- Verify schema compatibility with node version + +**Slow performance:** +- Increase batch size +- Enable compression +- Check network latency to node +- Verify node can handle request rate