Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions crates/bridges/erigon-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,8 @@ impl FlightBridge for ErigonFlightBridge {
.await?,
)
}
QueryMode::Live { from_block, .. } => {
info!("Creating live stream from block {:?}", from_block);
QueryMode::Live => {
info!("Creating live stream from current head");
let receiver = match stream_type {
StreamType::Blocks => self.streaming_service.subscribe_blocks(),
StreamType::Transactions => self.streaming_service.subscribe_transactions(),
Expand All @@ -580,17 +580,6 @@ impl FlightBridge for ErigonFlightBridge {
}
})
}
QueryMode::Hybrid {
historical_start,
then_follow,
} => {
info!(
"Creating hybrid stream starting at block {}, follow: {}",
historical_start, then_follow
);
// TODO: Implement hybrid mode (historical then live)
return Err(Status::unimplemented("Hybrid mode not yet implemented"));
}
};

let schema = Self::get_schema_for_type(stream_type);
Expand Down
7 changes: 6 additions & 1 deletion crates/phaser-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ impl FlightBridgeClient {
}
} else {
// TCP connection
Channel::from_shared(endpoint.clone())?.connect().await?
let uri = if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
endpoint.clone()
} else {
format!("http://{}", endpoint)
};
Channel::from_shared(uri)?.connect().await?
};

let client = FlightClient::new(channel);
Expand Down
61 changes: 3 additions & 58 deletions crates/phaser-bridge/src/descriptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,71 +45,16 @@ impl BlockchainDescriptor {
}
}

/// Create a descriptor for live subscription
pub fn live(stream_type: StreamType, from_block: Option<u64>) -> Self {
/// Create a descriptor for live subscription (starts from current head)
pub fn live(stream_type: StreamType) -> Self {
Self {
stream_type,
chain_id: None,
query_mode: QueryMode::Live {
from_block,
buffer_size: 100,
},
query_mode: QueryMode::Live,
subscription_options: None,
include_reorgs: false,
}
}

/// Create a hybrid descriptor (historical then live)
pub fn hybrid(stream_type: StreamType, historical_start: u64) -> Self {
Self {
stream_type,
chain_id: None,
query_mode: QueryMode::Hybrid {
historical_start,
then_follow: true,
},
subscription_options: None,
include_reorgs: false,
}
}

/// For backward compatibility - create from old-style parameters
pub fn from_legacy(
stream_type: StreamType,
chain_id: Option<u64>,
start_block: Option<u64>,
end_block: Option<u64>,
follow_head: bool,
include_reorgs: bool,
) -> Self {
let query_mode = match (start_block, end_block, follow_head) {
(Some(start), Some(end), false) => QueryMode::Historical { start, end },
(Some(start), None, true) => QueryMode::Hybrid {
historical_start: start,
then_follow: true,
},
(None, None, true) | (_, _, true) => QueryMode::Live {
from_block: start_block,
buffer_size: 100,
},
(Some(start), None, false) => QueryMode::Historical {
start,
end: u64::MAX,
},
_ => QueryMode::Live {
from_block: None,
buffer_size: 100,
},
};

Self {
stream_type,
chain_id,
query_mode,
subscription_options: None,
include_reorgs,
}
}
}

impl BlockchainDescriptor {
Expand Down
12 changes: 2 additions & 10 deletions crates/phaser-bridge/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,8 @@ pub enum ControlAction {
pub enum QueryMode {
/// Query historical data between start and end blocks
Historical { start: u64, end: u64 },
/// Subscribe to live data, optionally starting from a specific block
Live {
from_block: Option<u64>,
buffer_size: u32,
},
/// Query historical then transition to live
Hybrid {
historical_start: u64,
then_follow: bool,
},
/// Subscribe to live data from current head
Live,
}

/// Data availability information for query planning
Expand Down
69 changes: 69 additions & 0 deletions crates/phaser-query/proto/admin/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ service SyncService {

// Stream real-time progress updates for a sync job
rpc StreamSyncProgress(SyncProgressRequest) returns (stream SyncProgressUpdate);

// Analyze gaps in existing data without starting a sync job
rpc AnalyzeGaps(AnalyzeGapsRequest) returns (AnalyzeGapsResponse);
}

message SyncRequest {
Expand All @@ -43,6 +46,44 @@ message SyncResponse {

// Whether the request was accepted
bool accepted = 3;

// Gap analysis - existing data overview
GapAnalysis gap_analysis = 4;
}

message GapAnalysis {
// Total segments in the requested range
uint64 total_segments = 1;

// Number of complete segments (all data types present)
uint64 complete_segments = 2;

// Number of missing or incomplete segments
uint64 missing_segments = 3;

// Completion percentage (0-100)
double completion_percentage = 4;

// Number of stale temp files cleaned
uint64 cleaned_temp_files = 5;

// Segments that need syncing
repeated uint64 segments_to_sync = 6;

// Incomplete segments with details
repeated IncompleteSegment incomplete_details = 7;
}

message IncompleteSegment {
// Segment number
uint64 segment_num = 1;

// Block range for this segment
uint64 from_block = 2;
uint64 to_block = 3;

// Missing data types (e.g., "blocks", "txs", "logs")
repeated string missing_data_types = 4;
}

message SyncStatusRequest {
Expand Down Expand Up @@ -81,6 +122,9 @@ message SyncStatusResponse {
// Block range
uint64 from_block = 10;
uint64 to_block = 11;

// Gap analysis from when job started
GapAnalysis gap_analysis = 12;
}

message ListSyncJobsRequest {
Expand Down Expand Up @@ -154,6 +198,31 @@ message WorkerProgress {

// Number of parquet files created
uint32 files_created = 9;

// When this worker started (Unix timestamp in seconds)
int64 started_at = 10;
}

message AnalyzeGapsRequest {
// Chain ID to analyze
uint64 chain_id = 1;

// Bridge name
string bridge_name = 2;

// Starting block number (inclusive)
uint64 from_block = 3;

// Ending block number (inclusive)
uint64 to_block = 4;
}

message AnalyzeGapsResponse {
// Gap analysis for the requested range
GapAnalysis gap_analysis = 1;

// Message summarizing the analysis
string message = 2;
}

enum SyncStatus {
Expand Down
Loading