Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions docs/developing-bridges.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Developing Phaser Bridges

## Overview

Bridges translate blockchain node protocols into Arrow Flight streams. Bridges may buffer data for batching or validation (e.g., merkle trie construction), but don't cache historical data - that's phaser-query's responsibility.

**Architecture**: Node → Bridge (convert) → Arrow Flight → phaser-query (buffer/write) → Parquet

**Responsibilities**:
- Bridge: Stateless protocol translation
- phaser-query: Buffering, gap detection, Parquet writing

## Implementation

### FlightBridge Trait

Implement the `FlightBridge` trait from [`../crates/phaser-bridge/src/bridge.rs`](../crates/phaser-bridge/src/bridge.rs). Core methods:

- `get_info()` - Return bridge metadata (name, node type, chain ID, capabilities)
- `get_capabilities()` - Advertise supported features (historical, streaming, validation)
- `do_get()` - Stream data for a ticket (decode descriptor, return Arrow Flight stream)
- `health_check()` - Verify node connectivity

See [`../crates/bridges/evm/erigon-bridge/src/bridge.rs`](../crates/bridges/evm/erigon-bridge/src/bridge.rs) for complete implementation.

### Arrow Schemas

Define Arrow schemas for each data type. For EVM chains, use `evm-common` crate schemas. For other chains, define custom schemas.

**Critical**: Include `_block_num` column (UInt64) for gap detection and query optimization.

Schema references:
- `evm_common::block_arrow_schema()`
- `evm_common::transaction_arrow_schema()`
- `evm_common::log_arrow_schema()`

### Data Conversion

Create converter that decodes node-specific formats (RLP, JSON, protobuf) to Arrow RecordBatches. Pattern:

1. Decode node format to intermediate representation
2. Map to typed records (using `typed-arrow` or manual builders)
3. Build RecordBatch from Arrow arrays

See [`../crates/bridges/evm/erigon-bridge/src/converter.rs`](../crates/bridges/evm/erigon-bridge/src/converter.rs) for RLP → Arrow conversion example.

### Query Modes

Support two modes via `BlockchainDescriptor`:

**Historical** - Fetch specific block ranges from node
**Live** - Subscribe to new data as it arrives

See [`../crates/phaser-bridge/src/subscription.rs`](../crates/phaser-bridge/src/subscription.rs) for query mode definitions.

### Validation

Implement two-stage validation:

**Ingestion** - Validate raw data from node (RLP encoding, JSON structure)
**Conversion** - Validate Arrow RecordBatch after conversion (schema compliance, field constraints)

Make validation configurable via `ValidationStage` enum. See [`../crates/phaser-bridge/src/descriptors.rs#L69-L80`](../crates/phaser-bridge/src/descriptors.rs#L69-L80).

### Streaming

Implement `do_get()` to return Arrow Flight stream. Respect client preferences:
- `batch_size_hint` - Blocks per batch
- `max_message_bytes` - Maximum message size
- `compression` - gRPC compression (none, gzip, zstd)

See [`../crates/bridges/evm/erigon-bridge/src/streaming_service.rs`](../crates/bridges/evm/erigon-bridge/src/streaming_service.rs) for live streaming implementation.

## Configuration

Bridges run as standalone services, connect via Unix socket or TCP:

```bash
# Unix socket (recommended for local phaser-query)
erigon-bridge --erigon-grpc localhost:9090 \
--ipc-path /var/run/erigon-bridge.sock \
--chain-id 1

# TCP (for remote phaser-query)
erigon-bridge --erigon-grpc localhost:9090 \
--flight-addr 0.0.0.0:8091 \
--chain-id 1
```

Configure in phaser-query config.yaml:

```yaml
bridges:
- name: erigon
chain_id: 1
endpoint: /var/run/erigon-bridge.sock # or localhost:8091
protocol: grpc
```

## Performance

**Batch Size** - Larger batches improve throughput, increase memory. Respect client's `batch_size_hint` and `max_message_bytes`.

**Compression** - Enable gRPC compression. ZSTD for best ratio, gzip for speed.

**Connection Pooling** - Use pools for concurrent historical queries. Single connection sufficient for live streaming.

**Validation** - Ingestion validation is cheap (decode check). Conversion validation higher cost (schema + constraints). Make configurable.

## Reference Implementation

See erigon-bridge for complete example:
- [`../crates/bridges/evm/erigon-bridge/src/bridge.rs`](../crates/bridges/evm/erigon-bridge/src/bridge.rs) - FlightBridge implementation
- [`../crates/bridges/evm/erigon-bridge/src/converter.rs`](../crates/bridges/evm/erigon-bridge/src/converter.rs) - RLP to Arrow conversion
- [`../crates/bridges/evm/erigon-bridge/src/streaming_service.rs`](../crates/bridges/evm/erigon-bridge/src/streaming_service.rs) - Live streaming
- [`../crates/bridges/evm/erigon-bridge/src/client.rs`](../crates/bridges/evm/erigon-bridge/src/client.rs) - Node client wrapper
191 changes: 46 additions & 145 deletions docs/parquet-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ Phaser uses Parquet files to store blockchain data (blocks, transactions, logs).

### Final Files
```
{data_type}_from_{start_block}_to_{end_block}.parquet
{data_type}_from_{segment_start}_to_{segment_end}_{sequence}.parquet
```

Examples:
- `blocks_from_0_to_99999.parquet`
- `transactions_from_100000_to_199999.parquet`
- `logs_from_200000_to_299999.parquet`
- `blocks_from_0_to_499999_0.parquet`
- `transactions_from_500000_to_999999_0.parquet`
- `transactions_from_500000_to_999999_1.parquet`
- `logs_from_1000000_to_1499999_0.parquet`

**Key points:**
- `data_type`: One of `blocks`, `transactions`, or `logs`
- `start_block` and `end_block`: The actual block range contained in the file
- `segment_start` and `segment_end`: The 500K block segment this file belongs to
- `sequence`: Incrementing number starting from 0 for file rotations within a segment
- Range is **inclusive** on both ends
- No segment information in filename - segments are a logical concept for parallel processing
- Files are named by segment boundaries, not actual data ranges
- Multiple files can exist for the same segment (indicated by sequence number)

### Temporary Files During Writing
```
Expand All @@ -45,54 +48,24 @@ Phaser stores two types of metadata in Parquet files:

### Phaser Metadata Format

Each file contains a compact, versioned metadata structure in the Parquet file footer:

```rust
pub struct PhaserMetadata {
pub version: u8, // Format version (currently 1)
pub segment_start: u64, // Full segment range start (e.g., 0)
pub segment_end: u64, // Full segment range end (e.g., 499999)
pub responsibility_start: u64, // This file's responsibility start (e.g., 1)
pub responsibility_end: u64, // This file's responsibility end (e.g., 499999)
pub data_start: u64, // Actual first block with data (e.g., 46147)
pub data_end: u64, // Actual last block with data (e.g., 499998)
pub data_type: String, // "blocks", "transactions", or "logs"
}
```
Each file contains metadata with three range types:

**Three types of ranges:**
- **Segment range**: The full 500K block segment this file belongs to
- **Responsibility range**: The block range this file is responsible for covering
- **Data range**: The actual blocks that have data (may skip empty blocks)

The metadata is:
- Encoded with bincode (compact binary format)
- Base64-encoded for storage in Parquet key-value metadata
- Stored in a single `phaser.meta` key
- Updated in-place without rewriting data (see below)

Reading metadata:
```rust
use phaser_parquet_metadata::PhaserMetadata;

let file = File::open(path)?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

if let Some(kv_metadata) = metadata.file_metadata().key_value_metadata() {
if let Ok(Some(phaser_meta)) = PhaserMetadata::from_key_value_metadata(kv_metadata) {
println!("Segment: {}-{}", phaser_meta.segment_start, phaser_meta.segment_end);
println!("Data: {}-{}", phaser_meta.data_start, phaser_meta.data_end);
}
}
```
Metadata storage:
- Encoded with bincode, base64-encoded
- Stored in `phaser.meta` key-value metadata
- Can be updated in-place without rewriting file data

See [`../crates/phaser-parquet-metadata/src/lib.rs`](../crates/phaser-parquet-metadata/src/lib.rs) for `PhaserMetadata` struct definition and read/write methods.

### In-Place Metadata Updates

Metadata can be updated **without rewriting the entire file** using the `parquet-meta` CLI tool or the library:
Metadata can be updated without rewriting file data using the `parquet-meta` CLI tool:

```bash
# Update metadata on an existing file
parquet-meta fix-meta transactions_from_1_to_499999.parquet \
--segment-start 0 \
--segment-end 499999 \
Expand All @@ -102,124 +75,48 @@ parquet-meta fix-meta transactions_from_1_to_499999.parquet \
--infer # Infer data_start/data_end from statistics
```

This works by:
1. Reading the existing Parquet footer (small, at end of file)
2. Updating the key-value metadata
3. Serializing the new footer using `ParquetMetaDataWriter`
4. Truncating the file to remove the old footer
5. Appending the new footer

**Benefits:**
- No data copying (preserves all row groups and compression)
- Works on multi-terabyte files in milliseconds
- Safe: detects corrupted footers and fails before writing
- Preserves file structure: row groups, column indexes, compression settings

**Safety:**
- Validates PAR1 magic bytes before updating
- Fails if footer is corrupted (detected during read)
- Uses atomic file operations where possible
- Tested with files containing thousands of row groups
Works by reading footer, updating key-value metadata, truncating old footer, appending new footer. No data copying - works on multi-terabyte files in milliseconds.

### Block Range Statistics
See [`../crates/phaser-parquet-metadata/src/lib.rs`](../crates/phaser-parquet-metadata/src/lib.rs) for implementation.

Every Parquet file has **column-level statistics** enabled for the `_block_num` column:

```rust
// In ParquetWriter
builder.set_column_statistics_enabled("_block_num".into(), EnabledStatistics::Page);
```
### Block Range Statistics

This stores min/max block numbers in the Parquet file footer, allowing:
- **Fast gap detection**: Read block ranges without opening files
- **Efficient queries**: Skip files outside requested range
- **Resume logic**: Determine what data exists without scanning contents

Reading statistics:
```rust
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let metadata = builder.metadata();

// Read min/max from row group statistics
for row_group in metadata.row_groups() {
let column_stats = row_group.column(block_num_col_idx).statistics();
// Statistics are stored as Int64 at Parquet physical level
if let Statistics::Int64(int_stats) = stats {
let min_block = int_stats.min_opt().map(|&v| v as u64);
let max_block = int_stats.max_opt().map(|&v| v as u64);
}
}
```
Column-level statistics are enabled for `_block_num` column, storing min/max block numbers in file footer. This enables fast gap detection and query optimization without opening files.

**Note**: Parquet stores UInt64 as Int64 at the physical level, so we cast when reading.
See [`../crates/phaser-query/src/parquet_writer.rs`](../crates/phaser-query/src/parquet_writer.rs) for statistics configuration.

## File Creation Process

### 1. Start New File
```rust
ParquetWriter::start_new_file(block_num, schema)
```
- Creates temporary file with timestamp
- Initializes ArrowWriter with schema and WriterProperties
- Tracks `start_block` (set to first block number written)
Files are created in three stages:

### 2. Write Batches
```rust
ParquetWriter::write_batch(batch)
```
- Writes RecordBatch to current file
- Updates `end_block` to latest block written
- Checks if file should be finalized (segment boundary or size limit)
1. **Start** - Create `.tmp` file, initialize writer, track starting block
2. **Write** - Append batches, update end block, check finalization triggers
3. **Finalize** - Close writer, rename to final format, update metadata

### 3. Finalize File
```rust
ParquetWriter::finalize_current_file()
```
- Closes writer (flushes all data and writes footer with statistics)
- Renames `.tmp` file to final format with actual block range
- Logs completion with row count and block range
Files finalize when:
- Segment boundary crossed
- Size limit exceeded

### 4. Finalization Triggers
This ensures files never span segments and sizes stay bounded.

Files are finalized and a new file started when:
1. **Segment boundary crossed**: `(block_num / segment_size) != (start_block / segment_size)`
2. **Size limit exceeded**: File size >= `max_file_size_bytes`

This means:
- Multiple files can cover a single segment
- Files never span segment boundaries
- File sizes are bounded
See [`../crates/phaser-query/src/parquet_writer.rs`](../crates/phaser-query/src/parquet_writer.rs) for implementation.

## Multiple Files Per Segment

Segments are logical units for parallel processing (e.g., 500K blocks). Physical files can be smaller:
Segments are logical units for parallel processing (e.g., 500K blocks). When files reach the size limit before the segment ends, they rotate creating multiple files with incrementing sequence numbers:

```
Segment 0 (blocks 0-499,999):
- blocks_from_0_to_249999.parquet
- blocks_from_250000_to_499999.parquet
- blocks_from_0_to_499999_0.parquet (sequence 0)
- blocks_from_0_to_499999_1.parquet (sequence 1, if rotated)

Segment 1 (blocks 500,000-999,999):
- blocks_from_500000_to_999999.parquet
- blocks_from_500000_to_999999_0.parquet
```

Gap detection checks if the **union** of file ranges covers the segment:
```rust
// Sort files by start block
ranges.sort_by_key(|r| r.start);

// Check continuous coverage
let mut covered_up_to = segment_start - 1;
for range in ranges {
if range.start > covered_up_to + 1 {
return false; // Gap found
}
covered_up_to = covered_up_to.max(range.end);
}

covered_up_to >= segment_end // Segment complete if covered to end
```
Gap detection uses responsibility ranges from metadata to verify continuous coverage across all files in a segment.

See [`../crates/phaser-query/src/sync/data_scanner.rs`](../crates/phaser-query/src/sync/data_scanner.rs) for gap detection implementation.

## Configuration

Expand Down Expand Up @@ -279,7 +176,7 @@ If a write is interrupted:
## Example: File Lifecycle

```
1. Worker starts segment 0 (blocks 0-499,999)
1. Worker starts segment 0 (blocks 0-499,999), sequence 0
Create: blocks_1733160000123.parquet.tmp

2. Write blocks 0-99,999
Expand All @@ -289,7 +186,9 @@ If a write is interrupted:
Update: end_block = 199999

4. Block 200,000 arrives, size limit reached
Close writer, rename to: blocks_from_0_to_199999.parquet
Close writer, rename to: blocks_from_0_to_499999_0.parquet
Metadata: segment 0-499999, responsibility 0-199999, data 0-199999
Increment sequence to 1
Create: blocks_1733160005678.parquet.tmp

5. Write blocks 200,000-299,999
Expand All @@ -302,9 +201,11 @@ If a write is interrupted:
Update: end_block = 499999

8. Block 500,000 arrives, segment boundary crossed
Close writer, rename to: blocks_from_200000_to_499999.parquet
Close writer, rename to: blocks_from_0_to_499999_1.parquet
Metadata: segment 0-499999, responsibility 200000-499999, data 200000-499999
Reset sequence to 0 for next segment

Result: Segment 0 covered by 2 files
Result: Segment 0 covered by 2 files with contiguous responsibility ranges
```

## Querying Files
Expand Down
Loading