Skip to content

Improve resumability#4

Merged
dwerner merged 12 commits intomainfrom
improve-resumability
Oct 2, 2025
Merged

Improve resumability#4
dwerner merged 12 commits intomainfrom
improve-resumability

Conversation

@dwerner
Copy link
Copy Markdown
Collaborator

@dwerner dwerner commented Oct 2, 2025

Generally this PR is trying to improve resumability for phaser-query, but ends up with some simplifications to the bridge protocol.

dwerner added 12 commits October 2, 2025 07:34
- Add analyze_sync_range() to identify complete vs missing segments
- Add GapAnalysis struct with completion metrics
- Add AnalyzeGaps RPC endpoint for standalone gap queries
- Display gap analysis in sync job responses and CLI
- Support resume by scanning existing parquet files
- Clean temp files lazily when starting sync jobs
Remove the 10-minute worker timeout that was prematurely killing workers
during transaction/log sync phases. Workers should run as long as needed
to complete their segments.

Fix temp file cleanup to only remove files for segments we're about to sync,
preventing deletion of active live streaming temp files. The age-based cleanup
was incorrectly deleting files created by the live streaming writer.

Changes:
- Remove tokio::time::timeout wrapper from worker execution
- Replace clean_stale_temp_files() with clean_conflicting_temp_files()
- Only clean temp files for specific segments being synced
- Preserve live streaming temp files in non-overlapping segments
Track detailed progress for each worker including start time, current block,
blocks processed, bytes written, and files created. Calculate per-worker rates
and elapsed time for visibility into sync performance.

Changes:
- Add started_at field to WorkerProgress proto and struct
- Track current_block, blocks_processed, bytes_written, files_created
- Return metrics from sync_blocks/transactions/logs methods
- Calculate rate as blocks_processed / elapsed_seconds
- Populate worker progress in StreamSyncProgress responses
Add a new 'progress' command that streams live updates for a sync job,
showing overall progress and per-worker details with timing information.

Output includes:
- Job status and overall completion percentage
- Blocks/second rate and total bytes written
- Per-worker stage, block range, rate, and elapsed time
- Human-readable duration formatting (e.g., 4m18s)
- Updates every second until job completes
Changes to parquet_writer.rs:
- Enable Page-level statistics for _block_num column to allow querying
  min/max block ranges from Parquet metadata without reading file data
- Simplify temp filename to use timestamp instead of segment info
- Simplify final filename format to {type}_from_{start}_to_{end}.parquet
- Remove segment boundary tracking from CurrentFile struct

This allows querying block ranges efficiently and supports multiple files
per segment since we now track actual block ranges rather than segment IDs.
Changes to data_scanner.rs:
- Add read_block_range_from_parquet() to extract min/max block numbers
  from Parquet column statistics without reading entire file
- Update has_completed_segment() to check if union of file ranges covers
  segment, supporting multiple files per segment
- Handle UInt64 stored as Int64 at Parquet physical level

This enables efficient gap detection by reading only metadata from Parquet
file footers, and supports flexible file sizes where multiple files can
cover a single segment.
…tion

Changes to sync/service.rs:
- Add historical_boundary parameter to run_sync_job()
- Use live_state.get_boundary() instead of scanner.find_historical_boundary()
- Pass boundary from analyze_gaps() to spawned sync jobs

Benefits:
- Direct source of truth for when live streaming started
- No filesystem scanning needed
- No race conditions with temp files

The boundary is set when live streaming starts and used by historical
sync to determine where to stop, ensuring no gaps or overlaps between
historical and live data.
Removed from QueryMode:
- from_block in Live mode - unused, would require stateful buffering
- buffer_size parameter - unused, broadcast channels hardcoded in bridges
- Hybrid mode - unnecessary, coordination happens in phaser-query
- from_legacy() function - unused dead code with u64::MAX bug

Remaining protocol:
- Historical { start, end } - explicit block range for historical sync
- Live - stream from current head (no parameters)

This keeps bridges stateless as per architecture. Historical/live
coordination is handled by phaser-query using LiveStreamingState.
Changes:
- Remove from_block argument from BlockchainDescriptor::live() calls
- Update streaming_with_writer.rs to use live() without parameters
- Update worker.rs to use simplified protocol
- Minor cleanup in CLI and main binary

All callers now use the simplified stateless Flight protocol:
- Historical queries specify explicit ranges
- Live subscriptions start from current head
- No unused parameters
Added docs/parquet-files.md documenting:
- Filename conventions (final and temporary)
- Block range metadata stored in Parquet statistics
- File creation and rotation process
- Multiple files per segment support
- Gap detection and recovery logic
- Historical vs live streaming differences
- Query optimization using metadata

This provides a comprehensive reference for how Parquet files are
organized, named, and used throughout the system.
This document described edge cases that have been addressed by:
- Using Parquet statistics for gap detection
- Using LiveStreamingState for boundary tracking
- Supporting multiple files per segment

The architecture has evolved beyond what this document described.
@dwerner dwerner marked this pull request as ready for review October 2, 2025 18:41
@dwerner dwerner merged commit e5ef0fb into main Oct 2, 2025
@dwerner dwerner deleted the improve-resumability branch October 2, 2025 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant