From 75fbd2f05219db070b17f387a8e282708ef318d4 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 12:25:55 -0700 Subject: [PATCH 1/7] Reorganize EVM bridges under crates/bridges/evm/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moved bridges to better namespace structure: - crates/bridges/erigon-bridge → crates/bridges/evm/erigon-bridge - crates/bridges/jsonrpc-bridge → crates/bridges/evm/jsonrpc-bridge Benefits: - Clear EVM-specific namespace for future multi-chain support - Consistent location for all EVM-related bridges - Better organization as we add more bridge types Also updated to use workspace dependencies: - Added erigon-bridge, jsonrpc-bridge, phaser-bridge, evm-common to workspace.dependencies in root Cargo.toml - Updated all crates to use `{ workspace = true }` instead of relative paths - Cleaner dependency management across the workspace No functional changes - pure reorganization. --- Cargo.lock | 320 +++++++++++++++++- Cargo.toml | 10 +- commit-reorganization.sh | 36 ++ .../{ => evm}/erigon-bridge/Cargo.toml | 4 +- .../bridges/{ => evm}/erigon-bridge/build.rs | 0 .../proto/customized-erigon/blockdata.proto | 0 .../proto/customized-erigon/trie.proto | 0 .../erigon-bridge/proto/remote/bor.proto | 0 .../proto/remote/ethbackend.proto | 0 .../erigon-bridge/proto/types/types.proto | 0 .../erigon-bridge/src/blockdata_client.rs | 0 .../erigon-bridge/src/blockdata_converter.rs | 0 .../{ => evm}/erigon-bridge/src/bridge.rs | 0 .../{ => evm}/erigon-bridge/src/client.rs | 0 .../{ => evm}/erigon-bridge/src/converter.rs | 0 .../{ => evm}/erigon-bridge/src/error.rs | 0 .../erigon-bridge/src/generated/custom.rs | 0 .../erigon-bridge/src/generated/remote.rs | 0 .../erigon-bridge/src/generated/types.rs | 0 .../{ => evm}/erigon-bridge/src/lib.rs | 0 .../{ => evm}/erigon-bridge/src/main.rs | 0 .../{ => evm}/erigon-bridge/src/proto.rs | 0 .../erigon-bridge/src/streaming_service.rs | 0 .../erigon-bridge/src/trie_client.rs | 0 .../erigon-bridge/src/trie_converter.rs | 0 .../{ => evm}/jsonrpc-bridge/Cargo.toml | 4 +- .../{ => evm}/jsonrpc-bridge/src/bridge.rs | 0 .../{ => evm}/jsonrpc-bridge/src/client.rs | 0 .../{ => evm}/jsonrpc-bridge/src/converter.rs | 0 .../{ => evm}/jsonrpc-bridge/src/error.rs | 0 .../{ => evm}/jsonrpc-bridge/src/lib.rs | 0 .../{ => evm}/jsonrpc-bridge/src/main.rs | 0 .../{ => evm}/jsonrpc-bridge/src/streaming.rs | 0 crates/phaser-query/Cargo.toml | 6 +- 34 files changed, 360 insertions(+), 20 deletions(-) create mode 100644 commit-reorganization.sh rename crates/bridges/{ => evm}/erigon-bridge/Cargo.toml (92%) rename crates/bridges/{ => evm}/erigon-bridge/build.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/proto/customized-erigon/blockdata.proto (100%) rename crates/bridges/{ => evm}/erigon-bridge/proto/customized-erigon/trie.proto (100%) rename crates/bridges/{ => evm}/erigon-bridge/proto/remote/bor.proto (100%) rename crates/bridges/{ => evm}/erigon-bridge/proto/remote/ethbackend.proto (100%) rename crates/bridges/{ => evm}/erigon-bridge/proto/types/types.proto (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/blockdata_client.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/blockdata_converter.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/bridge.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/client.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/converter.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/error.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/generated/custom.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/generated/remote.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/generated/types.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/lib.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/main.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/proto.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/streaming_service.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/trie_client.rs (100%) rename crates/bridges/{ => evm}/erigon-bridge/src/trie_converter.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/Cargo.toml (91%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/bridge.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/client.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/converter.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/error.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/lib.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/main.rs (100%) rename crates/bridges/{ => evm}/jsonrpc-bridge/src/streaming.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index ea8a980..fceb379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,7 +95,7 @@ checksum = "f3008b4f680adca5a81fad5f6cdbb561cca0cee7e97050756c2c1f3e41d2103c" dependencies = [ "alloy-primitives", "num_enum", - "strum", + "strum 0.27.2", ] [[package]] @@ -388,7 +388,7 @@ dependencies = [ "either", "futures", "futures-utils-wasm", - "lru", + "lru 0.13.0", "parking_lot", "pin-project", "reqwest", @@ -537,7 +537,7 @@ dependencies = [ "derive_more 2.0.1", "rand 0.8.5", "serde", - "strum", + "strum 0.27.2", ] [[package]] @@ -802,7 +802,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "614d998c2f0e95079fdc8798cb48b9ea985dab225ed02005f724e66788aaf614" dependencies = [ "alloy-primitives", - "darling", + "darling 0.21.3", "proc-macro2", "quote", "syn 2.0.106", @@ -1657,6 +1657,21 @@ dependencies = [ "serde", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.37" @@ -1761,6 +1776,20 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "compact_str" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b79c4069c6cad78e2e0cdfcbd26275770669fb39fd308a752dc110e83b9af32" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1828,6 +1857,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "convert_case" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1873,6 +1911,49 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.9.4", + "crossterm_winapi", + "mio", + "parking_lot", + "rustix 0.38.44", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" +dependencies = [ + "bitflags 2.9.4", + "crossterm_winapi", + "derive_more 2.0.1", + "document-features", + "mio", + "parking_lot", + "rustix 1.1.2", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -1922,14 +2003,38 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + [[package]] name = "darling" version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.21.3", + "darling_macro 0.21.3", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", ] [[package]] @@ -1947,13 +2052,24 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core 0.20.11", + "quote", + "syn 2.0.106", +] + [[package]] name = "darling_macro" version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ - "darling_core", + "darling_core 0.21.3", "quote", "syn 2.0.106", ] @@ -2045,6 +2161,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ + "convert_case", "proc-macro2", "quote", "syn 2.0.106", @@ -2089,6 +2206,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", +] + [[package]] name = "dunce" version = "1.0.5" @@ -2900,6 +3026,25 @@ dependencies = [ "serde_core", ] +[[package]] +name = "indoc" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" + +[[package]] +name = "instability" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435d80800b936787d62688c927b6490e887c7ef5ff9ce922c6c6050fca75eb9a" +dependencies = [ + "darling 0.20.11", + "indoc", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -3285,6 +3430,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -3297,6 +3448,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" +[[package]] +name = "litrs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5e54036fe321fd421e10d732f155734c4e4afd610dd556d9a82833ab3ee0bed" + [[package]] name = "lock_api" version = "0.4.13" @@ -3313,6 +3470,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru" version = "0.13.0" @@ -3413,6 +3579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.59.0", ] @@ -3772,7 +3939,9 @@ dependencies = [ "async-trait", "axum 0.7.9", "bincode", + "chrono", "clap", + "crossterm 0.29.0", "erigon-bridge", "evm-common", "futures", @@ -3781,6 +3950,7 @@ dependencies = [ "phaser-bridge", "prost", "prost-types", + "ratatui", "rocksdb", "serde", "serde_json", @@ -4156,6 +4326,27 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "ratatui" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" +dependencies = [ + "bitflags 2.9.4", + "cassowary", + "compact_str", + "crossterm 0.28.1", + "indoc", + "instability", + "itertools 0.13.0", + "lru 0.12.5", + "paste", + "strum 0.26.3", + "unicode-segmentation", + "unicode-truncate", + "unicode-width 0.2.0", +] + [[package]] name = "recvmsg" version = "1.0.0" @@ -4383,6 +4574,19 @@ dependencies = [ "semver 1.0.27", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags 2.9.4", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + [[package]] name = "rustix" version = "1.1.2" @@ -4392,7 +4596,7 @@ dependencies = [ "bitflags 2.9.4", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.11.0", "windows-sys 0.60.2", ] @@ -4649,7 +4853,7 @@ version = "3.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327ada00f7d64abaac1e55a6911e90cf665aa051b9a561c7006c157f4633135e" dependencies = [ - "darling", + "darling 0.21.3", "proc-macro2", "quote", "syn 2.0.106", @@ -4735,6 +4939,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -4845,13 +5070,35 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] + [[package]] name = "strum" version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" dependencies = [ - "strum_macros", + "strum_macros 0.27.2", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.106", ] [[package]] @@ -4941,7 +5188,7 @@ dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", - "rustix", + "rustix 1.1.2", "windows-sys 0.60.2", ] @@ -5452,6 +5699,35 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] +name = "unicode-truncate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" +dependencies = [ + "itertools 0.13.0", + "unicode-segmentation", + "unicode-width 0.1.14", +] + +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -5701,6 +5977,28 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.0" diff --git a/Cargo.toml b/Cargo.toml index 49089a4..48cf16a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,12 +3,18 @@ resolver = "2" members = [ "crates/phaser-query", "crates/phaser-bridge", - "crates/bridges/erigon-bridge", - "crates/bridges/jsonrpc-bridge", + "crates/bridges/evm/erigon-bridge", + "crates/bridges/evm/jsonrpc-bridge", "crates/schemas/evm/common", ] [workspace.dependencies] +# Internal crates +erigon-bridge = { path = "crates/bridges/evm/erigon-bridge" } +jsonrpc-bridge = { path = "crates/bridges/evm/jsonrpc-bridge" } +phaser-bridge = { path = "crates/phaser-bridge" } +evm-common = { path = "crates/schemas/evm/common" } + # External dependencies tokio = { version = "1.41", features = ["full"] } async-trait = "0.1" diff --git a/commit-reorganization.sh b/commit-reorganization.sh new file mode 100644 index 0000000..18b94ba --- /dev/null +++ b/commit-reorganization.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +# Add all changes (will detect moves properly) +git add -A + +# Create commit +git commit -m "$(cat <<'EOF' +Reorganize EVM bridges under crates/bridges/evm/ + +Moved bridges to better namespace structure: +- crates/bridges/erigon-bridge → crates/bridges/evm/erigon-bridge +- crates/bridges/jsonrpc-bridge → crates/bridges/evm/jsonrpc-bridge + +Benefits: +- Clear EVM-specific namespace for future multi-chain support +- Consistent location for all EVM-related bridges +- Better organization as we add more bridge types + +Also updated to use workspace dependencies: +- Added erigon-bridge, jsonrpc-bridge, phaser-bridge, evm-common to + workspace.dependencies in root Cargo.toml +- Updated all crates to use `{ workspace = true }` instead of relative paths +- Cleaner dependency management across the workspace + +No functional changes - pure reorganization. +EOF +)" + +echo "" +echo "✓ Reorganization committed successfully!" +echo "" +echo "New structure:" +echo " crates/bridges/evm/" +echo " ├── erigon-bridge/" +echo " └── jsonrpc-bridge/" diff --git a/crates/bridges/erigon-bridge/Cargo.toml b/crates/bridges/evm/erigon-bridge/Cargo.toml similarity index 92% rename from crates/bridges/erigon-bridge/Cargo.toml rename to crates/bridges/evm/erigon-bridge/Cargo.toml index 4a81ee0..e845440 100644 --- a/crates/bridges/erigon-bridge/Cargo.toml +++ b/crates/bridges/evm/erigon-bridge/Cargo.toml @@ -13,8 +13,8 @@ path = "src/main.rs" [dependencies] # Local dependencies -phaser-bridge = { path = "../../phaser-bridge" } -evm-common = { path = "../../schemas/evm/common" } +phaser-bridge = { workspace = true } +evm-common = { workspace = true } # Typed Arrow support typed-arrow = { workspace = true } diff --git a/crates/bridges/erigon-bridge/build.rs b/crates/bridges/evm/erigon-bridge/build.rs similarity index 100% rename from crates/bridges/erigon-bridge/build.rs rename to crates/bridges/evm/erigon-bridge/build.rs diff --git a/crates/bridges/erigon-bridge/proto/customized-erigon/blockdata.proto b/crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto similarity index 100% rename from crates/bridges/erigon-bridge/proto/customized-erigon/blockdata.proto rename to crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto diff --git a/crates/bridges/erigon-bridge/proto/customized-erigon/trie.proto b/crates/bridges/evm/erigon-bridge/proto/customized-erigon/trie.proto similarity index 100% rename from crates/bridges/erigon-bridge/proto/customized-erigon/trie.proto rename to crates/bridges/evm/erigon-bridge/proto/customized-erigon/trie.proto diff --git a/crates/bridges/erigon-bridge/proto/remote/bor.proto b/crates/bridges/evm/erigon-bridge/proto/remote/bor.proto similarity index 100% rename from crates/bridges/erigon-bridge/proto/remote/bor.proto rename to crates/bridges/evm/erigon-bridge/proto/remote/bor.proto diff --git a/crates/bridges/erigon-bridge/proto/remote/ethbackend.proto b/crates/bridges/evm/erigon-bridge/proto/remote/ethbackend.proto similarity index 100% rename from crates/bridges/erigon-bridge/proto/remote/ethbackend.proto rename to crates/bridges/evm/erigon-bridge/proto/remote/ethbackend.proto diff --git a/crates/bridges/erigon-bridge/proto/types/types.proto b/crates/bridges/evm/erigon-bridge/proto/types/types.proto similarity index 100% rename from crates/bridges/erigon-bridge/proto/types/types.proto rename to crates/bridges/evm/erigon-bridge/proto/types/types.proto diff --git a/crates/bridges/erigon-bridge/src/blockdata_client.rs b/crates/bridges/evm/erigon-bridge/src/blockdata_client.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/blockdata_client.rs rename to crates/bridges/evm/erigon-bridge/src/blockdata_client.rs diff --git a/crates/bridges/erigon-bridge/src/blockdata_converter.rs b/crates/bridges/evm/erigon-bridge/src/blockdata_converter.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/blockdata_converter.rs rename to crates/bridges/evm/erigon-bridge/src/blockdata_converter.rs diff --git a/crates/bridges/erigon-bridge/src/bridge.rs b/crates/bridges/evm/erigon-bridge/src/bridge.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/bridge.rs rename to crates/bridges/evm/erigon-bridge/src/bridge.rs diff --git a/crates/bridges/erigon-bridge/src/client.rs b/crates/bridges/evm/erigon-bridge/src/client.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/client.rs rename to crates/bridges/evm/erigon-bridge/src/client.rs diff --git a/crates/bridges/erigon-bridge/src/converter.rs b/crates/bridges/evm/erigon-bridge/src/converter.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/converter.rs rename to crates/bridges/evm/erigon-bridge/src/converter.rs diff --git a/crates/bridges/erigon-bridge/src/error.rs b/crates/bridges/evm/erigon-bridge/src/error.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/error.rs rename to crates/bridges/evm/erigon-bridge/src/error.rs diff --git a/crates/bridges/erigon-bridge/src/generated/custom.rs b/crates/bridges/evm/erigon-bridge/src/generated/custom.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/generated/custom.rs rename to crates/bridges/evm/erigon-bridge/src/generated/custom.rs diff --git a/crates/bridges/erigon-bridge/src/generated/remote.rs b/crates/bridges/evm/erigon-bridge/src/generated/remote.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/generated/remote.rs rename to crates/bridges/evm/erigon-bridge/src/generated/remote.rs diff --git a/crates/bridges/erigon-bridge/src/generated/types.rs b/crates/bridges/evm/erigon-bridge/src/generated/types.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/generated/types.rs rename to crates/bridges/evm/erigon-bridge/src/generated/types.rs diff --git a/crates/bridges/erigon-bridge/src/lib.rs b/crates/bridges/evm/erigon-bridge/src/lib.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/lib.rs rename to crates/bridges/evm/erigon-bridge/src/lib.rs diff --git a/crates/bridges/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/main.rs rename to crates/bridges/evm/erigon-bridge/src/main.rs diff --git a/crates/bridges/erigon-bridge/src/proto.rs b/crates/bridges/evm/erigon-bridge/src/proto.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/proto.rs rename to crates/bridges/evm/erigon-bridge/src/proto.rs diff --git a/crates/bridges/erigon-bridge/src/streaming_service.rs b/crates/bridges/evm/erigon-bridge/src/streaming_service.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/streaming_service.rs rename to crates/bridges/evm/erigon-bridge/src/streaming_service.rs diff --git a/crates/bridges/erigon-bridge/src/trie_client.rs b/crates/bridges/evm/erigon-bridge/src/trie_client.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/trie_client.rs rename to crates/bridges/evm/erigon-bridge/src/trie_client.rs diff --git a/crates/bridges/erigon-bridge/src/trie_converter.rs b/crates/bridges/evm/erigon-bridge/src/trie_converter.rs similarity index 100% rename from crates/bridges/erigon-bridge/src/trie_converter.rs rename to crates/bridges/evm/erigon-bridge/src/trie_converter.rs diff --git a/crates/bridges/jsonrpc-bridge/Cargo.toml b/crates/bridges/evm/jsonrpc-bridge/Cargo.toml similarity index 91% rename from crates/bridges/jsonrpc-bridge/Cargo.toml rename to crates/bridges/evm/jsonrpc-bridge/Cargo.toml index b98e3e7..0000b3e 100644 --- a/crates/bridges/jsonrpc-bridge/Cargo.toml +++ b/crates/bridges/evm/jsonrpc-bridge/Cargo.toml @@ -13,8 +13,8 @@ path = "src/main.rs" [dependencies] # Bridge framework -phaser-bridge = { path = "../../phaser-bridge" } -evm-common = { path = "../../schemas/evm/common" } +phaser-bridge = { workspace = true } +evm-common = { workspace = true } # Alloy - all from workspace alloy = { workspace = true } diff --git a/crates/bridges/jsonrpc-bridge/src/bridge.rs b/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/bridge.rs rename to crates/bridges/evm/jsonrpc-bridge/src/bridge.rs diff --git a/crates/bridges/jsonrpc-bridge/src/client.rs b/crates/bridges/evm/jsonrpc-bridge/src/client.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/client.rs rename to crates/bridges/evm/jsonrpc-bridge/src/client.rs diff --git a/crates/bridges/jsonrpc-bridge/src/converter.rs b/crates/bridges/evm/jsonrpc-bridge/src/converter.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/converter.rs rename to crates/bridges/evm/jsonrpc-bridge/src/converter.rs diff --git a/crates/bridges/jsonrpc-bridge/src/error.rs b/crates/bridges/evm/jsonrpc-bridge/src/error.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/error.rs rename to crates/bridges/evm/jsonrpc-bridge/src/error.rs diff --git a/crates/bridges/jsonrpc-bridge/src/lib.rs b/crates/bridges/evm/jsonrpc-bridge/src/lib.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/lib.rs rename to crates/bridges/evm/jsonrpc-bridge/src/lib.rs diff --git a/crates/bridges/jsonrpc-bridge/src/main.rs b/crates/bridges/evm/jsonrpc-bridge/src/main.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/main.rs rename to crates/bridges/evm/jsonrpc-bridge/src/main.rs diff --git a/crates/bridges/jsonrpc-bridge/src/streaming.rs b/crates/bridges/evm/jsonrpc-bridge/src/streaming.rs similarity index 100% rename from crates/bridges/jsonrpc-bridge/src/streaming.rs rename to crates/bridges/evm/jsonrpc-bridge/src/streaming.rs diff --git a/crates/phaser-query/Cargo.toml b/crates/phaser-query/Cargo.toml index bdb0961..2657e99 100644 --- a/crates/phaser-query/Cargo.toml +++ b/crates/phaser-query/Cargo.toml @@ -13,9 +13,9 @@ path = "src/bin/phaser-cli.rs" [dependencies] # Local dependencies -phaser-bridge = { path = "../phaser-bridge" } -erigon-bridge = { path = "../bridges/erigon-bridge" } -evm-common = { path = "../schemas/evm/common" } +phaser-bridge = { workspace = true } +erigon-bridge = { workspace = true } +evm-common = { workspace = true } # Typed Arrow support typed-arrow = { workspace = true } From b3b83311333b2f6e4f911901dc88ed09e7edecb1 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:21 -0700 Subject: [PATCH 2/7] Update alloy provider builder API calls Migrate from deprecated .on_* methods to new .connect_* methods: - on_ws -> connect_ws - on_http -> connect_http - on_ipc -> connect_ipc --- crates/bridges/evm/jsonrpc-bridge/src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bridges/evm/jsonrpc-bridge/src/client.rs b/crates/bridges/evm/jsonrpc-bridge/src/client.rs index b9c528b..c53aabb 100644 --- a/crates/bridges/evm/jsonrpc-bridge/src/client.rs +++ b/crates/bridges/evm/jsonrpc-bridge/src/client.rs @@ -32,7 +32,7 @@ impl JsonRpcClient { Arc::new( ProviderBuilder::new() .network::() - .on_ws(alloy::transports::ws::WsConnect::new(url)) + .connect_ws(alloy::transports::ws::WsConnect::new(url)) .await?, ) } else if url.ends_with(".ipc") || url.starts_with("/") { @@ -40,7 +40,7 @@ impl JsonRpcClient { Arc::new( ProviderBuilder::new() .network::() - .on_ipc(alloy::transports::ipc::IpcConnect::new(url.to_string())) + .connect_ipc(alloy::transports::ipc::IpcConnect::new(url.to_string())) .await?, ) } else { @@ -48,7 +48,7 @@ impl JsonRpcClient { Arc::new( ProviderBuilder::new() .network::() - .on_http(url.parse()?), + .connect_http(url.parse()?), ) }; From 453e82561936748ed063495a2bae5bf8bd8e9a45 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:26 -0700 Subject: [PATCH 3/7] Add historical query support to jsonrpc-bridge Implement create_historical_stream to fetch specific block ranges via batch requests. Update do_get to handle both QueryMode::Historical and QueryMode::Live. Add max_batch_size configuration and update capabilities to include data type support (blocks/transactions/logs). Historical mode fetches blocks in batches and converts them to Arrow RecordBatches, while live mode continues using broadcast channels. --- .../bridges/evm/jsonrpc-bridge/src/bridge.rs | 205 +++++++++++++++--- 1 file changed, 179 insertions(+), 26 deletions(-) diff --git a/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs b/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs index 1f5b82c..3895471 100644 --- a/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs +++ b/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs @@ -11,6 +11,7 @@ use futures::{stream, Stream, StreamExt}; use phaser_bridge::{ bridge::{BridgeCapabilities, FlightBridge}, descriptors::{BridgeInfo, StreamType}, + subscription::QueryMode, }; use std::pin::Pin; use std::sync::Arc; @@ -23,6 +24,7 @@ pub struct JsonRpcFlightBridge { chain_id: u64, streaming_service: Arc, node_url: String, + max_batch_size: usize, } impl JsonRpcFlightBridge { @@ -61,21 +63,31 @@ impl JsonRpcFlightBridge { chain_id, streaming_service, node_url, + max_batch_size: 1000, // Default batch size, matches BridgeCapabilities }) } /// Get bridge information pub fn bridge_info(&self) -> BridgeInfo { + let mut capabilities = vec![ + "blocks".to_string(), + "transactions".to_string(), + "logs".to_string(), + ]; + + if self.client.supports_subscriptions() { + capabilities.push("streaming".to_string()); + capabilities.push("subscriptions".to_string()); + } else { + capabilities.push("polling".to_string()); + } + BridgeInfo { name: "jsonrpc-bridge".to_string(), node_type: "json-rpc".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), chain_id: self.chain_id, - capabilities: if self.client.supports_subscriptions() { - vec!["streaming".to_string(), "subscriptions".to_string()] - } else { - vec!["polling".to_string()] - }, + capabilities, current_block: 0, // Would need to query this oldest_block: 0, // Would need to query this } @@ -106,6 +118,128 @@ impl JsonRpcFlightBridge { )), } } + + /// Create a stream for historical data (specific block range) + fn create_historical_stream( + &self, + stream_type: StreamType, + start_block: u64, + end_block: u64, + ) -> impl Stream> + Send { + let client = self.client.clone(); + let batch_size = self.max_batch_size as u64; + + async_stream::stream! { + use alloy::eips::BlockNumberOrTag; + use alloy_rpc_types_eth::Filter; + use arrow::compute::concat_batches; + use tracing::debug; + + info!("Fetching historical {:?} from block {} to {} (batch size: {})", + stream_type, start_block, end_block, batch_size); + + let mut current_block = start_block; + + while current_block <= end_block { + let batch_end = std::cmp::min(current_block + batch_size - 1, end_block); + let batch_count = (batch_end - current_block + 1) as usize; + + debug!("Fetching batch: blocks {} to {} ({} blocks)", current_block, batch_end, batch_count); + + // Collect RecordBatches for this batch + let mut record_batches = Vec::new(); + + for block_num in current_block..=batch_end { + // Fetch block with transactions + let block = match client.get_block_with_txs(BlockNumberOrTag::Number(block_num)).await { + Ok(Some(block)) => block, + Ok(None) => { + error!("Block #{} not found", block_num); + continue; + } + Err(e) => { + error!("Failed to fetch block #{}: {}", block_num, e); + yield Err(Status::internal(format!("Failed to fetch block {}: {}", block_num, e))); + continue; + } + }; + + match stream_type { + StreamType::Blocks => { + // Convert block header to RecordBatch + match evm_common::rpc_conversions::convert_any_header(&block.header) { + Ok(batch) => record_batches.push(batch), + Err(e) => { + error!("Failed to convert block header #{}: {}", block_num, e); + yield Err(Status::internal(format!("Conversion error: {}", e))); + } + } + } + StreamType::Transactions => { + // Convert transactions (if any) + if !block.transactions.is_empty() { + match JsonRpcConverter::convert_transactions(&block) { + Ok(batch) => record_batches.push(batch), + Err(e) => { + error!("Failed to convert transactions for block #{}: {}", block_num, e); + yield Err(Status::internal(format!("Conversion error: {}", e))); + } + } + } + } + StreamType::Logs => { + // Fetch and convert logs + let filter = Filter::new().from_block(block_num).to_block(block_num); + + match client.get_logs(filter).await { + Ok(logs) if !logs.is_empty() => { + let block_hash = block.header.hash; + match JsonRpcConverter::convert_logs(&logs, block_num, block_hash, block.header.timestamp) { + Ok(batch) => record_batches.push(batch), + Err(e) => { + error!("Failed to convert logs for block #{}: {}", block_num, e); + yield Err(Status::internal(format!("Conversion error: {}", e))); + } + } + } + Ok(_) => { + // No logs in this block, skip + } + Err(e) => { + error!("Failed to fetch logs for block #{}: {}", block_num, e); + yield Err(Status::internal(format!("Failed to fetch logs: {}", e))); + } + } + } + StreamType::Trie => { + yield Err(Status::unimplemented("Trie streaming not supported via JSON-RPC")); + return; + } + } + } + + // If we collected any batches, concatenate and yield them + if !record_batches.is_empty() { + let schema = record_batches[0].schema(); + match concat_batches(&schema, &record_batches) { + Ok(combined_batch) => { + debug!("Yielding combined batch with {} rows for blocks {} to {}", + combined_batch.num_rows(), current_block, batch_end); + yield Ok(combined_batch); + } + Err(e) => { + error!("Failed to concatenate batches: {}", e); + yield Err(Status::internal(format!("Failed to concatenate batches: {}", e))); + } + } + } + + current_block = batch_end + 1; + } + + info!("Completed historical {:?} query for blocks {} to {}", stream_type, start_block, end_block); + } + } } #[async_trait] @@ -117,10 +251,10 @@ impl FlightBridge for JsonRpcFlightBridge { async fn get_capabilities(&self) -> std::result::Result { Ok(BridgeCapabilities { supports_historical: true, // Can fetch historical blocks via JSON-RPC - supports_streaming: self.client.supports_subscriptions(), + supports_streaming: true, // Always support streaming (HTTP uses polling, WS/IPC use subscriptions) supports_reorg_notifications: false, supports_filters: true, - max_batch_size: 1000, + max_batch_size: self.max_batch_size, }) } @@ -234,35 +368,54 @@ impl FlightBridge for JsonRpcFlightBridge { })?; let stream_type = blockchain_desc.stream_type; - info!("Processing do_get for {:?}", stream_type); - - // Get the appropriate receiver - let receiver = match stream_type { - StreamType::Blocks => self.streaming_service.subscribe_blocks(), - StreamType::Transactions => self.streaming_service.subscribe_transactions(), - StreamType::Logs => self.streaming_service.subscribe_logs(), - StreamType::Trie => { - return Err(Status::unimplemented( - "Trie streaming not supported via JSON-RPC", - )) - } - }; + let query_mode = blockchain_desc.query_mode.clone(); + info!( + "Processing do_get for {:?} in {:?} mode", + stream_type, query_mode + ); // Get schema for the stream type let schema = Self::get_schema_for_type(stream_type)?; - // Create stream from the broadcast receiver - let batch_stream = async_stream::stream! { - let mut rx = receiver; - while let Ok(batch) = rx.recv().await { - yield Ok(batch); + // Branch based on query mode + let batch_stream: Pin< + Box> + Send>, + > = match query_mode { + QueryMode::Historical { start, end } => { + // Historical query - fetch specific block range + Box::pin(self.create_historical_stream(stream_type, start, end)) + } + QueryMode::Live => { + // Live streaming - subscribe to broadcast channels + let receiver = match stream_type { + StreamType::Blocks => self.streaming_service.subscribe_blocks(), + StreamType::Transactions => self.streaming_service.subscribe_transactions(), + StreamType::Logs => self.streaming_service.subscribe_logs(), + StreamType::Trie => { + return Err(Status::unimplemented( + "Trie streaming not supported via JSON-RPC", + )) + } + }; + + Box::pin(async_stream::stream! { + let mut rx = receiver; + while let Ok(batch) = rx.recv().await { + yield Ok(batch); + } + }) } }; + // Convert Status errors to FlightError for the encoder + let flight_batch_stream = batch_stream.map(|result| { + result.map_err(|status| arrow_flight::error::FlightError::Tonic(Box::new(status))) + }); + // Encode as Flight data let encoder = FlightDataEncoderBuilder::new() .with_schema(schema) - .build(batch_stream); + .build(flight_batch_stream); let flight_stream = encoder.map(|result| { result.map_err(|e| { From cb9d96cdbc40293bf00a56c8102abb8e30c68955 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:26 -0700 Subject: [PATCH 4/7] Remove unused AnyHeader import from converter --- crates/bridges/evm/jsonrpc-bridge/src/converter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bridges/evm/jsonrpc-bridge/src/converter.rs b/crates/bridges/evm/jsonrpc-bridge/src/converter.rs index c67b38f..be8a029 100644 --- a/crates/bridges/evm/jsonrpc-bridge/src/converter.rs +++ b/crates/bridges/evm/jsonrpc-bridge/src/converter.rs @@ -1,4 +1,4 @@ -use alloy::network::{AnyHeader, AnyRpcBlock}; +use alloy::network::AnyRpcBlock; use alloy_rpc_types_eth::{Header as RpcHeader, Log}; use anyhow::Result; use arrow::datatypes::Schema; From c90347ad8658ddd8435cb36dbe9e2bef46e5e91c Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:26 -0700 Subject: [PATCH 5/7] Track first and last block in parquet writer batches Extract both first and last block numbers from each batch to accurately track the block range being written. Update temp filenames to include the starting block number for better debugging. --- crates/phaser-query/src/parquet_writer.rs | 29 +++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/crates/phaser-query/src/parquet_writer.rs b/crates/phaser-query/src/parquet_writer.rs index 1119011..bf97745 100644 --- a/crates/phaser-query/src/parquet_writer.rs +++ b/crates/phaser-query/src/parquet_writer.rs @@ -61,14 +61,16 @@ impl ParquetWriter { } pub async fn write_batch(&mut self, batch: RecordBatch) -> Result<()> { - // Extract block number from the batch (assuming _block_num is first column) - let block_num = if let Some(array) = batch + // Extract first and last block numbers from the batch (assuming _block_num is first column) + let (first_block, last_block) = if let Some(array) = batch .column(0) .as_any() .downcast_ref::() { if !array.is_empty() { - array.value(0) + let first = array.value(0); + let last = array.value(array.len() - 1); + (first, last) } else { return Ok(()); // Skip empty batch } @@ -77,29 +79,30 @@ impl ParquetWriter { return Ok(()); }; - // Check if we need to start a new file - if self.should_start_new_file(block_num)? { + // Check if we need to start a new file (use first block for boundary check) + if self.should_start_new_file(first_block)? { self.finalize_current_file()?; - self.start_new_file(block_num, batch.schema())?; + self.start_new_file(first_block, batch.schema())?; } // Initialize file if needed if self.current_file.is_none() { - self.start_new_file(block_num, batch.schema())?; + self.start_new_file(first_block, batch.schema())?; } // Write the batch if let Some(ref mut current) = self.current_file { current.writer.write(&batch)?; current.row_count += batch.num_rows(); - current.end_block = block_num; + current.end_block = last_block; // Track the last block we've written debug!( - "Wrote batch with {} rows to {}, total rows: {}, block: {}", + "Wrote batch with {} rows to {}, total rows: {}, blocks: {}-{}", batch.num_rows(), current.temp_path.display(), current.row_count, - block_num + first_block, + last_block ); } @@ -123,13 +126,13 @@ impl ParquetWriter { } fn start_new_file(&mut self, block_num: u64, schema: arrow_schema::SchemaRef) -> Result<()> { - // Create temporary filename - will be renamed with actual range when finalized - // Format: {data_type}_temp_{timestamp}.parquet.tmp + // Create temporary filename with start block - will be renamed with actual range when finalized + // Format: {data_type}_from_{start}_{timestamp}.parquet.tmp let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis(); - let filename = format!("{}_{}.parquet.tmp", self.data_type, timestamp); + let filename = format!("{}_from_{}_{}.parquet.tmp", self.data_type, block_num, timestamp); let temp_path = self.data_dir.join(filename); info!( From e79cf813e168a0a9a76e3f01626ad51490082ea9 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:26 -0700 Subject: [PATCH 6/7] Fix segment boundary calculation in sync service Correctly calculate segment_to as (segment_num + 1) * segment_size - 1 and clamp segment_from to respect the requested from_block parameter. --- crates/phaser-query/src/sync/service.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 645a7aa..456583b 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -222,9 +222,10 @@ impl SyncServer { // Calculate block range for this segment let segment_from = segment_num * segment_size; - let segment_to = segment_from + segment_size - 1; + let segment_to = (segment_num + 1) * segment_size - 1; - // Ensure we don't go past the requested to_block + // Clamp to the requested range (both start and end) + let segment_from = std::cmp::max(segment_from, from_block); let segment_to = std::cmp::min(segment_to, to_block); info!( From 7231e3bffbca404e71d154f6d685cf6597cd75e6 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 2 Oct 2025 20:23:26 -0700 Subject: [PATCH 7/7] Format phaser-cli and remove default endpoint Remove default endpoint value to require explicit configuration. Apply cargo fmt formatting throughout. --- crates/phaser-query/src/bin/phaser-cli.rs | 172 ++++++++++++---------- 1 file changed, 96 insertions(+), 76 deletions(-) diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 0caf45a..e4f4e48 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -38,7 +38,7 @@ struct WorkerOutput { #[clap(author, version, about = "CLI for phaser-query admin operations", long_about = None)] struct Args { /// Admin gRPC endpoint - #[clap(short, long, default_value = "http://localhost:9090")] + #[clap(short, long)] endpoint: String, #[clap(subcommand)] @@ -120,7 +120,10 @@ enum Commands { }, } -async fn run_progress_json(mut client: SyncServiceClient, job_id: &str) -> Result<()> { +async fn run_progress_json( + mut client: SyncServiceClient, + job_id: &str, +) -> Result<()> { use futures::StreamExt; use std::collections::HashMap; @@ -147,44 +150,48 @@ async fn run_progress_json(mut client: SyncServiceClient, job_id: &str) -> Result<()> { - use futures::StreamExt; - use ratatui::prelude::*; - use ratatui::widgets::{Block, Borders, Gauge, Paragraph, Row, Table}; +async fn run_progress_tui( + mut client: SyncServiceClient, + job_id: &str, +) -> Result<()> { use crossterm::{ event::{self, Event, KeyCode}, terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, ExecutableCommand, }; + use futures::StreamExt; + use ratatui::prelude::*; + use ratatui::widgets::{Block, Borders, Gauge, Paragraph, Row, Table}; + use std::collections::HashMap; use std::io::stdout; use std::time::Duration; - use std::collections::HashMap; // Setup terminal enable_raw_mode()?; @@ -241,10 +251,9 @@ async fn run_progress_tui(mut client: SyncServiceClient = update.workers.iter().map(|w| { - let blocks_range = format!("{}-{}", w.from_block, w.to_block); - let current = if w.stage == "completed" { - "✓".to_string() - } else { - w.current_block.to_string() - }; - - let elapsed = if w.stage == "completed" { - format_duration(completed_workers[&w.worker_id]) - } else { - let e = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64 - - w.started_at; - format_duration(e as u64) - }; - - Row::new(vec![ - w.worker_id.to_string(), - w.stage.clone(), - blocks_range, - current, - format!("{:.1}/s", w.rate), - elapsed, - ]) - }).collect(); + let rows: Vec = update + .workers + .iter() + .map(|w| { + let blocks_range = format!("{}-{}", w.from_block, w.to_block); + let current = if w.stage == "completed" { + "✓".to_string() + } else { + w.current_block.to_string() + }; + + let elapsed = if w.stage == "completed" { + format_duration(completed_workers[&w.worker_id]) + } else { + let e = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - w.started_at; + format_duration(e as u64) + }; + + Row::new(vec![ + w.worker_id.to_string(), + w.stage.clone(), + blocks_range, + current, + format!("{:.1}/s", w.rate), + elapsed, + ]) + }) + .collect(); let table = Table::new( rows, @@ -350,7 +370,7 @@ async fn run_progress_tui(mut client: SyncServiceClient