feat: implement reconciliation service worker and monitoring dashboar…#345
feat: implement reconciliation service worker and monitoring dashboar…#345godamongstmen897 wants to merge 5 commits intoDXmakers:mainfrom
Conversation
|
@godamongstmen897 Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
There was a problem hiding this comment.
Pull request overview
Introduces a new Rust-based reconciliation/indexing worker service with Postgres checkpointing + Prometheus metrics, and updates the web landing page to a technical reconciliation monitoring dashboard.
Changes:
- Added
backend/reconciliation-servicecrate (config, migrations, RPC client w/ retries, worker loop, health/metrics routes, Dockerfile). - Updated Rust workspace dependencies to use rustls-based
reqwest/sqlxplus Prometheus + JSON tracing subscriber. - Added
ReconciliationDashboardUI and replaced the web app landing page with this monitoring view.
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/reconciliation-service/src/worker.rs | Worker loop + integration test exercising checkpoint recovery after RPC failures |
| backend/reconciliation-service/src/rpc.rs | JSON-RPC client with retry + exponential backoff |
| backend/reconciliation-service/src/routes.rs | /health and /metrics endpoints |
| backend/reconciliation-service/src/repository.rs | Postgres checkpoint + idempotent indexed event insertion |
| backend/reconciliation-service/src/models.rs | Sync snapshot/status models surfaced via health endpoint |
| backend/reconciliation-service/src/metrics.rs | Prometheus registry + counters/gauges + render helper |
| backend/reconciliation-service/src/main.rs | Service bootstrap (config, DB, migrations, tracing, worker spawn, Axum server) |
| backend/reconciliation-service/src/db.rs | DB connection and embedded migrations runner |
| backend/reconciliation-service/src/config.rs | Env-based configuration parsing (poll interval, retry policy, bind addr, etc.) |
| backend/reconciliation-service/migrations/20260428000001_initial.sql | Initial schema for checkpoints and indexed events |
| backend/reconciliation-service/Dockerfile | Container build/runtime setup for the new service |
| backend/reconciliation-service/Cargo.toml | Crate manifest for the reconciliation service |
| backend/reconciliation-service/prometheus.yml | Example Prometheus scrape config for local monitoring |
| apps/web/components/ReconciliationDashboard.tsx | New monitoring dashboard component (currently mock/live-updating UI) |
| apps/web/app/page.tsx | Replaces previous role-based landing page with reconciliation monitoring |
| Cargo.toml | Registers new crate in workspace and updates shared deps/features (rustls, prometheus, json tracing) |
| Cargo.lock | Lockfile updates reflecting new crate and dependency/feature changes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use crate::{db, repository::Repository, rpc::RetryConfig}; | ||
| use serde_json::json; | ||
| use std::time::Duration; | ||
| use tokio::time::sleep; |
There was a problem hiding this comment.
The test module imports tokio::time::sleep but never uses it. CI runs cargo clippy -- -D warnings, so this unused import will fail the build. Remove the import or use it.
| use tokio::time::sleep; |
| .and(body_string_contains("\"sequence\": 1")) | ||
| .respond_with(ResponseTemplate::new(200).set_body_json(json!({"result": ledger1_payload, "error": null}))) | ||
| .mount(&mock_server) | ||
| .await; | ||
|
|
||
| // Ledger 2 will fail for the first run | ||
| Mock::given(method("POST")).and(body_string_contains("\"method\":\"getLedger\"")) | ||
| .and(body_string_contains("\"sequence\": 2")) |
There was a problem hiding this comment.
These wiremock matchers look for substrings like "sequence": 1 (with a space). The JSON body produced by serde_json/reqwest will be minified (e.g. "sequence":1), so the mock likely won’t match and the test will fail. Prefer a JSON-aware matcher (e.g. matching params.sequence via body_json / partial JSON) or change the substring to not depend on whitespace.
| .and(body_string_contains("\"sequence\": 1")) | |
| .respond_with(ResponseTemplate::new(200).set_body_json(json!({"result": ledger1_payload, "error": null}))) | |
| .mount(&mock_server) | |
| .await; | |
| // Ledger 2 will fail for the first run | |
| Mock::given(method("POST")).and(body_string_contains("\"method\":\"getLedger\"")) | |
| .and(body_string_contains("\"sequence\": 2")) | |
| .and(body_string_contains("\"sequence\":1")) | |
| .respond_with(ResponseTemplate::new(200).set_body_json(json!({"result": ledger1_payload, "error": null}))) | |
| .mount(&mock_server) | |
| .await; | |
| // Ledger 2 will fail for the first run | |
| Mock::given(method("POST")).and(body_string_contains("\"method\":\"getLedger\"")) | |
| .and(body_string_contains("\"sequence\":2")) |
| pub fn update_success(&mut self, last_processed_ledger: i64, latest_ledger: i64) { | ||
| self.last_processed_ledger = last_processed_ledger; | ||
| self.latest_ledger = latest_ledger; | ||
| self.ledger_lag = latest_ledger.saturating_sub(last_processed_ledger); | ||
| self.last_success_at = Some(Utc::now()); | ||
| self.last_error = None; | ||
| self.status = if self.ledger_lag == 0 { | ||
| SyncStatus::Synced | ||
| } else { | ||
| SyncStatus::CatchingUp | ||
| }; |
There was a problem hiding this comment.
ledger_lag can become negative when latest_ledger < last_processed_ledger (you already warn about this case in the worker). saturating_sub on signed integers does not clamp to 0; it returns a negative value unless overflow occurs. Since lag should be non-negative, clamp the subtraction at 0 (or consider using an unsigned type for lag).
| self.processed_ledgers_total.inc(); | ||
| self.checkpoint_ledger_gauge.set(checkpoint_ledger); | ||
| self.latest_ledger_gauge.set(latest_ledger); | ||
| self.ledger_lag_gauge.set(latest_ledger.saturating_sub(checkpoint_ledger)); |
There was a problem hiding this comment.
record_success sets ledger_lag_gauge using latest_ledger.saturating_sub(checkpoint_ledger), which can go negative if the RPC reports a lower ledger than the checkpoint. Since this metric represents lag, it should be clamped to 0 to avoid exporting negative lag values.
| self.ledger_lag_gauge.set(latest_ledger.saturating_sub(checkpoint_ledger)); | |
| self.ledger_lag_gauge | |
| .set(latest_ledger.saturating_sub(checkpoint_ledger).max(0)); |
| last_error: snapshot.last_error, | ||
| }; | ||
|
|
||
| (StatusCode::OK, Json(response)) |
There was a problem hiding this comment.
/health always returns StatusCode::OK even when healthy is false (e.g., SyncStatus::Degraded). Elsewhere in this repo, unhealthy sync status endpoints return 503 Service Unavailable (see backend/src/routes/health.rs:137-141). Consider returning SERVICE_UNAVAILABLE when not healthy so load balancers/alerts can rely on HTTP status.
| (StatusCode::OK, Json(response)) | |
| let status = if healthy { | |
| StatusCode::OK | |
| } else { | |
| StatusCode::SERVICE_UNAVAILABLE | |
| }; | |
| (status, Json(response)) |
| </div> | ||
| <div className="bg-zinc-950 border border-zinc-800 rounded p-2 text-xs"> | ||
| <div className="text-zinc-400 mb-1">Last Success</div> | ||
| <div className="font-mono text-sm">{new Date().toISOString()}</div> |
There was a problem hiding this comment.
This component renders non-deterministic values during the initial render (seedThroughput/seedResources/seedEvents use Date.now() and Math.random(), and the JSX renders new Date().toISOString()). When a client component is server-rendered, this commonly causes hydration mismatches because the server-rendered HTML won’t match the client’s first render. To avoid this, initialize with deterministic placeholders and populate random/time-based mock data in useEffect after mount (or render the timestamp with suppressHydrationWarning, or import the dashboard with next/dynamic({ ssr: false })).
| <div className="font-mono text-sm">{new Date().toISOString()}</div> | |
| <div className="font-mono text-sm" suppressHydrationWarning>{new Date().toISOString()}</div> |
| function onRestart() { | ||
| const ok = window.confirm("Are you sure you want to restart the indexer?"); | ||
| if (!ok) return; | ||
| setActionMsg("Restarting indexer..."); | ||
| setTimeout(() => setActionMsg("Indexer restarted"), 900); | ||
| setTimeout(() => setActionMsg(null), 1800); | ||
| } | ||
|
|
||
| function onRescan() { | ||
| const ok = window.confirm("Trigger ledger re-scan from checkpoint? This may re-process many ledgers."); | ||
| if (!ok) return; | ||
| setActionMsg("Starting ledger re-scan..."); | ||
| setTimeout(() => setActionMsg("Re-scan queued"), 1200); | ||
| setTimeout(() => setActionMsg(null), 2800); | ||
| } |
There was a problem hiding this comment.
onRestart/onRescan schedule multiple setTimeout callbacks but don’t clear them on unmount. If the user navigates away quickly, React can warn about state updates on an unmounted component. Consider tracking timeout IDs (e.g., in a ref) and clearing them in a cleanup function.
| context.metrics.record_success(checkpoint, latest); | ||
|
|
There was a problem hiding this comment.
run_once calls metrics.record_success(checkpoint, latest) and updates the snapshot even when no new ledgers are processed (the latest <= checkpoint branch). Because record_success increments processed_ledgers_total, the counter will grow on every poll interval even during idle periods, making the metric inaccurate. Consider splitting gauge updates from the counter (e.g., a separate method that only updates gauges/lag) or only incrementing when a ledger is actually processed.
| context.metrics.record_success(checkpoint, latest); |
This pull request introduces a new backend reconciliation service, including its configuration, database migrations, metrics, and a technical monitoring dashboard in the frontend. It also updates dependencies to improve security and observability and refactors the web app's landing page to focus on reconciliation monitoring. The most important changes are summarized below.
Backend: Reconciliation Service Introduction
reconciliation-serviceRust crate with all supporting files, includingCargo.toml,Dockerfile, configuration, database connection, migration scripts, Prometheus metrics integration, and main service entrypoint. This service is designed for ledger checkpointing, event indexing, and exposes metrics for monitoring. [1] [2] [3] [4] [5] [6] [7]Workspace and Dependency Updates
backend/reconciliation-servicein the Rust workspace (Cargo.toml) and updated dependencies for improved security and observability, including switching torustlsfor TLS, enabling Prometheus, and structured JSON logging. [1] [2]Frontend: Reconciliation Dashboard
ReconciliationDashboardReact component that provides real-time monitoring of the reconciliation service, including throughput, resource usage, health status, and recent events, using mock data and technical UI elements.page.tsx) to display theReconciliationDashboardinstead of the previous role-based dashboard, focusing the UI on reconciliation monitoring.Closes #195