Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an in-memory HeadTracker with replay buffering, wires it into indexing and SSE delivery, propagates indexed_at through ingestion, exposes SSE_REPLAY_BUFFER_BLOCKS config, and updates /api/status to prefer in-memory head before falling back to the DB. Changes
Sequence DiagramsequenceDiagram
participant Indexer
participant HeadTracker as HeadTracker
participant Postgres as Postgres
participant SSEHandler as SSEHandler
participant Client
Note over Postgres,HeadTracker: Bootstrap
Postgres->>HeadTracker: bootstrap(pool, replay_capacity)
HeadTracker->>HeadTracker: initialize latest & replay buffer
Note over Indexer,Postgres: Commit flow
Indexer->>HeadTracker: publish_committed_batch(blocks)
HeadTracker->>HeadTracker: append to replay, update latest, trim buffer
Indexer->>Postgres: copy_blocks(batch, indexed_at)
Note over Client,SSEHandler: Client connects
Client->>SSEHandler: GET /api/events
SSEHandler->>HeadTracker: latest()
HeadTracker-->>SSEHandler: latest (or none)
SSEHandler->>Client: emit initial event if present
loop New committed batches
Indexer->>HeadTracker: publish_committed_batch(blocks)
SSEHandler->>HeadTracker: replay_after(last_seen)
HeadTracker-->>SSEHandler: ReplaySnapshot (blocks, buffer_start, buffer_end)
alt client within replay tail
SSEHandler->>Client: stream buffered blocks
else client behind replay tail
SSEHandler->>Client: send behind-tail warning and close
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip Migrating from UI to YAML configuration.Use the |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
backend/crates/atlas-server/src/config.rs (1)
35-41: Consider adding an upper-bound validation.The validation ensures the buffer is > 0, but there's no upper limit. An extremely large value (e.g.,
usize::MAX) could cause excessive memory allocation when the HeadTracker stores blocks. Consider adding a reasonable ceiling.💡 Optional: Add upper bound check
if sse_replay_buffer_blocks == 0 { bail!("SSE_REPLAY_BUFFER_BLOCKS must be greater than 0"); } + if sse_replay_buffer_blocks > 100_000 { + bail!("SSE_REPLAY_BUFFER_BLOCKS exceeds maximum of 100000"); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/config.rs` around lines 35 - 41, The code parses SSE_REPLAY_BUFFER_BLOCKS into sse_replay_buffer_blocks and only checks for zero — add an upper-bound validation to avoid unbounded allocation (affecting HeadTracker's stored blocks). Define a reasonable ceiling constant (e.g., MAX_SSE_REPLAY_BUFFER_BLOCKS) and after parsing sse_replay_buffer_blocks check that it does not exceed this constant; if it does, return an error (bail! or Context) with a clear message like "SSE_REPLAY_BUFFER_BLOCKS must be between 1 and {MAX}". Keep the existing zero check and use the symbol SSE_REPLAY_BUFFER_BLOCKS and sse_replay_buffer_blocks to locate the change.backend/crates/atlas-server/src/head.rs (2)
112-185: Good test coverage for core HeadTracker behavior.The tests cover:
- Full buffer retrieval with empty cursor
- Buffer trimming on overflow
- Non-advancing block rejection
Consider adding a test for
clear()behavior.💡 Optional: Add test for clear()
#[tokio::test] async fn clear_resets_state_to_empty() { let tracker = HeadTracker::empty(3); tracker.publish_committed_batch(&[sample_block(10)]).await; assert!(tracker.latest().await.is_some()); tracker.clear().await; assert!(tracker.latest().await.is_none()); let snapshot = tracker.replay_after(None).await; assert!(snapshot.blocks_after_cursor.is_empty()); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/head.rs` around lines 112 - 185, Add a test for HeadTracker::clear() to verify it resets state: create a tracker via HeadTracker::empty(...), publish_committed_batch(...) with at least one sample_block, assert tracker.latest().await is Some, call tracker.clear().await, then assert tracker.latest().await is None and that tracker.replay_after(None).await returns snapshot.blocks_after_cursor empty; reference functions HeadTracker::empty, publish_committed_batch, clear, latest, and replay_after when locating where to add the test.
90-109: Consider performance of cloning blocks on every call.Both
latest()andreplay_after()clone Block instances on every call. For the status endpoint (potentially polled frequently) and SSE streaming, this could add overhead.If profiling shows this is a bottleneck, consider returning
Arc<Block>instead. However, for the current use case with sub-second polling intervals, the overhead is likely acceptable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/head.rs` around lines 90 - 109, The current latest() and replay_after() implementations clone Block on each call which is costly; change the in-memory storage and APIs to use shared ownership (Arc<Block>) instead: update the internal collection state.replay to store Arc<Block> (e.g., VecDeque<Arc<Block>>), change latest() to return Option<Arc<Block>> and ReplaySnapshot.blocks_after_cursor to Vec<Arc<Block>>, and adjust replay_after() to collect and return Arc clones (cheap clones of Arc). Also update any code that constructs blocks to wrap them in Arc and update callers of latest, replay_after, and ReplaySnapshot to accept Arc<Block> instead of owned Block. Ensure imports (std::sync::Arc) are added and tests/consumers are updated accordingly.backend/crates/atlas-server/src/api/handlers/sse.rs (1)
62-72: Edge case: behind-tail guard may behave unexpectedly whenbuffer_startis 0.When
buffer_startis 0 (e.g., indexing from genesis),buffer_start.saturating_sub(1)evaluates to 0. The conditioncursor < 0would never be true for valid block numbers (which are non-negative), so this is likely safe. However, consider adding a comment explaining this edge case for future maintainers.💡 Optional: Add clarifying comment
if let Some(buffer_start) = snapshot.buffer_start { + // When buffer_start is 0 (genesis), saturating_sub(1) returns 0, + // and cursor < 0 is never true for valid block numbers, so no false positive. if cursor < buffer_start.saturating_sub(1) { warn!(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/sse.rs` around lines 62 - 72, The guard checking if cursor < buffer_start.saturating_sub(1) can be confusing when snapshot.buffer_start is 0 because saturating_sub(1) yields 0 and the comparison will never be true for non-negative cursors; add a short clarifying comment next to the buffer_start check (in the same block that uses snapshot.buffer_start, cursor, and buffer_end) explaining the zero-case (indexing from genesis) and why saturating_sub(1) returning 0 is intentional and safe, so future maintainers understand the edge-case reasoning without changing logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 62-72: The guard checking if cursor <
buffer_start.saturating_sub(1) can be confusing when snapshot.buffer_start is 0
because saturating_sub(1) yields 0 and the comparison will never be true for
non-negative cursors; add a short clarifying comment next to the buffer_start
check (in the same block that uses snapshot.buffer_start, cursor, and
buffer_end) explaining the zero-case (indexing from genesis) and why
saturating_sub(1) returning 0 is intentional and safe, so future maintainers
understand the edge-case reasoning without changing logic.
In `@backend/crates/atlas-server/src/config.rs`:
- Around line 35-41: The code parses SSE_REPLAY_BUFFER_BLOCKS into
sse_replay_buffer_blocks and only checks for zero — add an upper-bound
validation to avoid unbounded allocation (affecting HeadTracker's stored
blocks). Define a reasonable ceiling constant (e.g.,
MAX_SSE_REPLAY_BUFFER_BLOCKS) and after parsing sse_replay_buffer_blocks check
that it does not exceed this constant; if it does, return an error (bail! or
Context) with a clear message like "SSE_REPLAY_BUFFER_BLOCKS must be between 1
and {MAX}". Keep the existing zero check and use the symbol
SSE_REPLAY_BUFFER_BLOCKS and sse_replay_buffer_blocks to locate the change.
In `@backend/crates/atlas-server/src/head.rs`:
- Around line 112-185: Add a test for HeadTracker::clear() to verify it resets
state: create a tracker via HeadTracker::empty(...),
publish_committed_batch(...) with at least one sample_block, assert
tracker.latest().await is Some, call tracker.clear().await, then assert
tracker.latest().await is None and that tracker.replay_after(None).await returns
snapshot.blocks_after_cursor empty; reference functions HeadTracker::empty,
publish_committed_batch, clear, latest, and replay_after when locating where to
add the test.
- Around line 90-109: The current latest() and replay_after() implementations
clone Block on each call which is costly; change the in-memory storage and APIs
to use shared ownership (Arc<Block>) instead: update the internal collection
state.replay to store Arc<Block> (e.g., VecDeque<Arc<Block>>), change latest()
to return Option<Arc<Block>> and ReplaySnapshot.blocks_after_cursor to
Vec<Arc<Block>>, and adjust replay_after() to collect and return Arc clones
(cheap clones of Arc). Also update any code that constructs blocks to wrap them
in Arc and update callers of latest, replay_after, and ReplaySnapshot to accept
Arc<Block> instead of owned Block. Ensure imports (std::sync::Arc) are added and
tests/consumers are updated accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d9562577-d363-4be4-8870-fccddd64f70e
📒 Files selected for processing (14)
.env.examplebackend/crates/atlas-server/Cargo.tomlbackend/crates/atlas-server/src/api/handlers/sse.rsbackend/crates/atlas-server/src/api/handlers/status.rsbackend/crates/atlas-server/src/api/mod.rsbackend/crates/atlas-server/src/config.rsbackend/crates/atlas-server/src/head.rsbackend/crates/atlas-server/src/indexer/batch.rsbackend/crates/atlas-server/src/indexer/copy.rsbackend/crates/atlas-server/src/indexer/indexer.rsbackend/crates/atlas-server/src/main.rsdocs/API.mddocs/ARCHITECTURE.mdfrontend/src/hooks/useBlockSSE.ts
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
backend/crates/atlas-server/src/api/handlers/mod.rs (1)
19-26: Add colocated tests forget_latest_block.This introduces new query/mapping logic but there is no
#[cfg(test)]coverage in this file. Please add at least empty-table and latest-row mapping cases.As per coding guidelines:
Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/mod.rs` around lines 19 - 26, Add a #[cfg(test)] mod tests block in the same file containing async unit tests for get_latest_block that cover (1) empty-table returning Ok(None) and (2) latest-row mapping returning the most recent Block; in the tests, create a temporary/test Postgres connection (PgPool), ensure the blocks table schema matches the columns referenced by BLOCK_COLUMNS, seed rows for the latest-row case, call get_latest_block(&pool).await and assert results; use teardown/transaction rollback or per-test ephemeral schema to avoid cross-test state.backend/crates/atlas-server/src/api/handlers/sse.rs (1)
118-178: Add async tests for replay/tail behavior, not only JSON serialization.The new SSE control flow (replay, lag handling, behind-tail close) is untested here. Add focused
tokio::testcoverage for ordering and tail-disconnect paths.As per coding guidelines:
Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/sse.rs` around lines 118 - 178, Add async tokio tests in the existing #[cfg(test)] mod tests that exercise the SSE control flow (replay ordering, lag handling, and behind-tail disconnect) rather than only JSON serialization: create tokio::test functions that reuse sample_block and NewBlockEvent to feed a mock/source stream to the SSE handler, use an async client/stream consumer (e.g., futures::StreamExt) to collect and assert event ordering during replay, simulate lag and assert that the handler switches to tail behavior correctly, and simulate a behind-tail condition to assert the handler closes the connection; place these tests in the same tests module and name them to reflect replay_ordering and tail_disconnect so they run with cargo test --workspace.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 56-84: The loop currently skips head_tracker.replay_after(...)
when last_block_number is None, causing per-block events to be dropped; change
the flow so you always call head_tracker.replay_after(last_block_number) (i.e.,
pass None when cursor is None) and iterate snapshot.blocks_after_cursor
(updating last_block_number and emitting via block_to_event/yield Ok(event))
before falling back to head_tracker.latest(); only use latest() when replay
returns no blocks to emit. Ensure the checks around snapshot.buffer_start and
the canonical-refetch break remain applied to the replay result.
In `@backend/crates/atlas-server/src/api/handlers/status.rs`:
- Around line 25-30: The handler currently falls back to reading the latest row
via get_latest_block(&state.pool) to build ChainStatus, which touches the large
blocks path; change this to perform a single key-value lookup from the
indexer_state table (e.g., implement/use a get_indexer_state(&state.pool).await?
or run a single SELECT on indexer_state) and populate ChainStatus {
block_height, indexed_at } from that result so /api/status uses the low-latency
indexer_state read instead of get_latest_block; ensure the indexed_at is
converted to rfc3339 and errors are propagated exactly as before.
---
Nitpick comments:
In `@backend/crates/atlas-server/src/api/handlers/mod.rs`:
- Around line 19-26: Add a #[cfg(test)] mod tests block in the same file
containing async unit tests for get_latest_block that cover (1) empty-table
returning Ok(None) and (2) latest-row mapping returning the most recent Block;
in the tests, create a temporary/test Postgres connection (PgPool), ensure the
blocks table schema matches the columns referenced by BLOCK_COLUMNS, seed rows
for the latest-row case, call get_latest_block(&pool).await and assert results;
use teardown/transaction rollback or per-test ephemeral schema to avoid
cross-test state.
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 118-178: Add async tokio tests in the existing #[cfg(test)] mod
tests that exercise the SSE control flow (replay ordering, lag handling, and
behind-tail disconnect) rather than only JSON serialization: create tokio::test
functions that reuse sample_block and NewBlockEvent to feed a mock/source stream
to the SSE handler, use an async client/stream consumer (e.g.,
futures::StreamExt) to collect and assert event ordering during replay, simulate
lag and assert that the handler switches to tail behavior correctly, and
simulate a behind-tail condition to assert the handler closes the connection;
place these tests in the same tests module and name them to reflect
replay_ordering and tail_disconnect so they run with cargo test --workspace.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bd3a28bd-d3e2-455b-a09e-8cb3237b14b6
📒 Files selected for processing (3)
backend/crates/atlas-server/src/api/handlers/mod.rsbackend/crates/atlas-server/src/api/handlers/sse.rsbackend/crates/atlas-server/src/api/handlers/status.rs
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
backend/crates/atlas-server/src/api/handlers/sse.rs (1)
56-83:⚠️ Potential issue | 🟠 MajorAlways run replay before
latest()fallback, including when no cursor exists.Line 57 currently gates replay behind
last_block_number, so aNonecursor skipsreplay_afterand can collapse multiple committed blocks into a singlelatest()event. Also, Line 61 usescursor + 1, which is avoidable overflow-prone arithmetic.Proposed fix
while let Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { - if let Some(cursor) = last_block_number { - let snapshot = head_tracker.replay_after(Some(cursor)).await; + let snapshot = head_tracker.replay_after(last_block_number).await; + if let Some(cursor) = last_block_number { if let Some(buffer_start) = snapshot.buffer_start { - if cursor + 1 < buffer_start { + if cursor < buffer_start.saturating_sub(1) { warn!( last_seen = cursor, buffer_start, buffer_end = ?snapshot.buffer_end, "sse head-only: client fell behind replay tail; closing stream for canonical refetch" ); break; } } - - if !snapshot.blocks_after_cursor.is_empty() { - for block in snapshot.blocks_after_cursor { - last_block_number = Some(block.number); - if let Some(event) = block_to_event(block) { - yield Ok(event); - } - } - continue; - } } + if !snapshot.blocks_after_cursor.is_empty() { + for block in snapshot.blocks_after_cursor { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); + } + } + continue; + } match head_tracker.latest().await { Some(block) if last_block_number.is_none_or(|last_seen| block.number > last_seen) => { last_block_number = Some(block.number); if let Some(event) = block_to_event(block) { yield Ok(event); } } Some(_) | None => {} } }Based on learnings:
Use GET /api/events endpoint for SSE live updates — publishes one new_block event per block in order, fed by the indexer's broadcast::Sender<()>; falls back to /api/status polling on disconnect.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/sse.rs` around lines 56 - 83, Always call head_tracker.replay_after with the current last_block_number (i.e. pass the Option directly: head_tracker.replay_after(last_block_number).await) before falling back to head_tracker.latest(), so replay runs even when last_block_number is None and you can deliver each block in snapshot.blocks_after_cursor; also avoid the overflow-prone cursor + 1 by comparing using buffer_start.saturating_sub(1) (e.g. if cursor < buffer_start.saturating_sub(1) { ... }) or by using a map-based check (last_block_number.map_or(false, |c| c < buffer_start.saturating_sub(1))). Ensure the replay handling (snapshot.buffer_start check, iterating snapshot.blocks_after_cursor and calling block_to_event, updating last_block_number) occurs for every loop iteration before executing head_tracker.latest().backend/crates/atlas-server/src/api/handlers/status.rs (1)
26-31:⚠️ Potential issue | 🟠 MajorUse
indexer_statefor/api/statusDB fallback instead ofget_latest_block.On Line 26, the fallback still reads from the latest block path. This endpoint contract calls for a single key-value lookup from
indexer_stateto keep latency predictable.
As per coding guidelines:Use GET /api/status endpoint for navbar polling fallback — returns { block_height, indexed_at } via a single key-value lookup from indexer_state table (sub-ms latency).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/status.rs` around lines 26 - 31, The fallback currently calls get_latest_block(&state.pool) — replace that path with a single key-value lookup from the indexer_state table (do not query the blocks table). Query indexer_state via the existing DB pool (e.g. add or reuse a helper like get_indexer_state(&state.pool) or run a simple SELECT for the single row), extract block_height and indexed_at from that record, and return ChainStatus { block_height, indexed_at: Some(indexed_at.to_rfc3339()) } (handle the None case consistently). Update the code in the handler that references get_latest_block to use the indexer_state lookup to keep latency predictable.
🧹 Nitpick comments (2)
backend/crates/atlas-common/src/types.rs (1)
229-232: LGTM — constant correctly mirrors theBlockstruct.The column list matches the struct field order, and the usage with
sqlx::query_as::<_, Block>()(per the relevant snippets) relies onFromRow's name-based matching, so positional order is for documentation consistency rather than correctness.Consider adding a test to guard against drift if the
Blockstruct evolves:🧪 Optional: Add a drift-detection test
#[cfg(test)] mod tests { use super::*; #[test] fn block_columns_matches_struct_fields() { let expected_fields = [ "number", "hash", "parent_hash", "timestamp", "gas_used", "gas_limit", "transaction_count", "indexed_at", ]; let actual: Vec<&str> = BLOCK_COLUMNS.split(", ").collect(); assert_eq!(actual, expected_fields, "BLOCK_COLUMNS drifted from Block struct fields"); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-common/src/types.rs` around lines 229 - 232, Add a unit test that ensures the BLOCK_COLUMNS constant stays in sync with the Block struct field names to detect drift; implement a test (under #[cfg(test)] mod tests) that splits BLOCK_COLUMNS (e.g., BLOCK_COLUMNS.split(", ")) and compares the resulting Vec<&str> to the expected array of field names ["number","hash","parent_hash","timestamp","gas_used","gas_limit","transaction_count","indexed_at"] using assert_eq! so any divergence between BLOCK_COLUMNS and the Block struct is caught during CI.backend/crates/atlas-server/src/indexer/indexer.rs (1)
935-937: Consider adding tests for HeadTracker integration.The existing tests thoroughly cover
collect_blockbehavior. However, the new HeadTracker interactions (clear()on reindex,publish_committed_batch()after commit) lack direct test coverage. SinceHeadTrackeris injected via constructor, you could add integration tests that verify:
clear()is called whenconfig.reindexis truepublish_committed_batch()receives the correct blocks after a successful writeThis can be deferred if HeadTracker has its own unit tests in
head.rs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/indexer/indexer.rs` around lines 935 - 937, Add tests exercising HeadTracker integration by constructing the Indexer with a mock or test double for HeadTracker and asserting interactions: (1) for reindex path, call Indexer::new(...) with config.reindex = true, run the reindex flow that calls collect_block and ensure mock HeadTracker.clear() was invoked; (2) for committing path, perform a commit that triggers publish_committed_batch() and assert the mock received the expected blocks; target functions/types: Indexer::new, Indexer::collect_block, HeadTracker::clear, and HeadTracker::publish_committed_batch to locate where to inject the mock and verify calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 34-95: Add a #[cfg(test)] mod tests block in the same sse.rs file
and implement unit tests that exercise the SSE replay/tail logic around the
stream created in the file: specifically write tests that mock or stub
head_tracker.latest(), get_latest_block(), head_tracker.replay_after(), and the
rx.recv() broadcast path to validate (1) initial head seed yields an event via
block_to_event, (2) replay_after returns blocks_after_cursor and those are
yielded, (3) when cursor < buffer_start the stream breaks (behind-tail
behavior), and (4) when replay has no new blocks the latest() fallback yields
new head events; call the stream (or sse_response wrapper) and assert emitted
events and termination behavior to run under cargo test --workspace.
In `@backend/crates/atlas-server/src/api/handlers/status.rs`:
- Around line 18-37: Add a new #[cfg(test)] mod tests in the same file with unit
tests for get_status that cover all three branches: (1) head-tracker hit where
State(Arc<AppState>) has a head_tracker whose latest().await returns Some(block)
and asserts returned ChainStatus uses that block, (2) DB fallback where
head_tracker.latest().await returns None but get_latest_block(&state.pool).await
returns Some(block) and assert ChainStatus uses DB block, and (3) empty default
where both return None and assert block_height == 0 and indexed_at is None;
implement minimal test doubles/mocks for Arc<AppState> (or construct an AppState
with a test head_tracker and a test pool) and call get_status(State(state)) via
.await, asserting Json<ChainStatus> contents accordingly and running with cargo
test --workspace.
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 98-116: The enqueue path can replay older SSE events and regress
state after polling advances height; add a monotonic guard by tracking the
highest-seen block number in a ref (e.g. lastSeenHeightRef) and ignore/return
early from enqueue if data.block.number <= lastSeenHeightRef.current; update
lastSeenHeightRef.current whenever you accept a new block (and also update it in
the polling/merge code paths referenced around the other block-handling logic
such as the drain/scheduleDrain use and the code at the other location that
enqueues events) so both SSE replay and polling use the same monotonic height
source and bps/log updates only run for strictly newer blocks.
---
Duplicate comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 56-83: Always call head_tracker.replay_after with the current
last_block_number (i.e. pass the Option directly:
head_tracker.replay_after(last_block_number).await) before falling back to
head_tracker.latest(), so replay runs even when last_block_number is None and
you can deliver each block in snapshot.blocks_after_cursor; also avoid the
overflow-prone cursor + 1 by comparing using buffer_start.saturating_sub(1)
(e.g. if cursor < buffer_start.saturating_sub(1) { ... }) or by using a
map-based check (last_block_number.map_or(false, |c| c <
buffer_start.saturating_sub(1))). Ensure the replay handling
(snapshot.buffer_start check, iterating snapshot.blocks_after_cursor and calling
block_to_event, updating last_block_number) occurs for every loop iteration
before executing head_tracker.latest().
In `@backend/crates/atlas-server/src/api/handlers/status.rs`:
- Around line 26-31: The fallback currently calls get_latest_block(&state.pool)
— replace that path with a single key-value lookup from the indexer_state table
(do not query the blocks table). Query indexer_state via the existing DB pool
(e.g. add or reuse a helper like get_indexer_state(&state.pool) or run a simple
SELECT for the single row), extract block_height and indexed_at from that
record, and return ChainStatus { block_height, indexed_at:
Some(indexed_at.to_rfc3339()) } (handle the None case consistently). Update the
code in the handler that references get_latest_block to use the indexer_state
lookup to keep latency predictable.
---
Nitpick comments:
In `@backend/crates/atlas-common/src/types.rs`:
- Around line 229-232: Add a unit test that ensures the BLOCK_COLUMNS constant
stays in sync with the Block struct field names to detect drift; implement a
test (under #[cfg(test)] mod tests) that splits BLOCK_COLUMNS (e.g.,
BLOCK_COLUMNS.split(", ")) and compares the resulting Vec<&str> to the expected
array of field names
["number","hash","parent_hash","timestamp","gas_used","gas_limit","transaction_count","indexed_at"]
using assert_eq! so any divergence between BLOCK_COLUMNS and the Block struct is
caught during CI.
In `@backend/crates/atlas-server/src/indexer/indexer.rs`:
- Around line 935-937: Add tests exercising HeadTracker integration by
constructing the Indexer with a mock or test double for HeadTracker and
asserting interactions: (1) for reindex path, call Indexer::new(...) with
config.reindex = true, run the reindex flow that calls collect_block and ensure
mock HeadTracker.clear() was invoked; (2) for committing path, perform a commit
that triggers publish_committed_batch() and assert the mock received the
expected blocks; target functions/types: Indexer::new, Indexer::collect_block,
HeadTracker::clear, and HeadTracker::publish_committed_batch to locate where to
inject the mock and verify calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7002300a-8f67-4c0e-9cc3-bdeafc7cd218
📒 Files selected for processing (11)
backend/crates/atlas-common/src/types.rsbackend/crates/atlas-server/src/api/handlers/mod.rsbackend/crates/atlas-server/src/api/handlers/sse.rsbackend/crates/atlas-server/src/api/handlers/status.rsbackend/crates/atlas-server/src/head.rsbackend/crates/atlas-server/src/indexer/indexer.rsfrontend/src/api/status.tsfrontend/src/components/Layout.tsxfrontend/src/hooks/useBlockSSE.tsfrontend/src/hooks/useLatestBlockHeight.tsfrontend/src/hooks/useStats.ts
💤 Files with no reviewable changes (1)
- frontend/src/hooks/useLatestBlockHeight.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/crates/atlas-server/src/api/handlers/mod.rs
- backend/crates/atlas-server/src/head.rs
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
backend/crates/atlas-server/src/api/handlers/sse.rs (1)
34-95:⚠️ Potential issue | 🟠 MajorAdd stream-behavior tests for replay/tail branches in this file.
The new SSE control flow (initial seed, replay emission, behind-tail break, latest fallback) is still untested here.
As per coding guidelines:
backend/**/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.Also applies to: 116-176
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/sse.rs` around lines 34 - 95, Add a #[cfg(test)] mod tests in the same sse.rs and add unit tests that exercise the async stream behavior produced by the async_stream::stream! block (the logic that uses head_tracker.latest(), get_latest_block(&pool), head_tracker.replay_after(last_block_number), rx.recv(), and block_to_event) covering: (1) initial seed when head_tracker.latest() returns a block, (2) initial seed via get_latest_block fallback, (3) replay emission when snapshot.blocks_after_cursor contains blocks, (4) the behind-tail branch where cursor + 1 < snapshot.buffer_start causes the loop to break, and (5) the latest fallback branch where head_tracker.latest() yields a newer block; implement lightweight test doubles/mocks for head_tracker, rx, and get_latest_block and assert that sse_response(stream) yields the expected events and stream termination, then run with cargo test --workspace.backend/crates/atlas-server/src/api/handlers/status.rs (1)
17-43:⚠️ Potential issue | 🟠 MajorAdd same-file tests for the new status branching paths.
This file still lacks
#[cfg(test)]coverage for the head-tracker hit,indexer_statefallback, and empty default branches.As per coding guidelines:
backend/**/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/status.rs` around lines 17 - 43, Add a #[cfg(test)] mod tests block to this file and add unit tests exercising get_status's three branches: (1) head-tracker hit — create an Arc<AppState> whose head_tracker.latest() returns Some(mock block with number and indexed_at) and assert the returned ChainStatus matches; (2) indexer_state fallback — provide a test DB pool containing a row in indexer_state (key='last_indexed_block') and ensure head_tracker.latest() returns None, then call get_status and assert parsed block_height and indexed_at come from the row; (3) empty default — ensure head_tracker.latest() returns None and the indexer_state query returns None and assert ChainStatus has block_height 0 and indexed_at None; build AppState/State wrappers as in production, use async test harness (#[tokio::test]) and sqlx test helpers for the pool, and run with cargo test --workspace.
🧹 Nitpick comments (1)
backend/crates/atlas-server/src/head.rs (1)
96-113: Add a direct test forreplay_after(Some(cursor))across wrapped deque slices.The
partition_pointpath is the trickiest branch and should be explicitly tested (including a wrappedVecDeque) to lock in ordering/cursor semantics.As per coding guidelines:
backend/**/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.Also applies to: 124-215
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/head.rs` around lines 96 - 113, Add a unit test in a #[cfg(test)] mod tests block in the same file that explicitly exercises Head::replay_after(Some(cursor)) when state.replay is wrapped into two slices (i.e. a VecDeque that has wrapped-around contents) to ensure the partition_point branch returns the correct ordering and excludes blocks <= cursor; construct a Head (or its State) with a replay VecDeque whose internal layout spans the boundary (so as_slices() yields two non-empty slices) with known block.number values, call replay_after(Some(cursor)) for a cursor in the middle, and assert the returned ReplaySnapshot contains only the blocks > cursor in the correct order; run cargo test --workspace to verify.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-server/src/api/handlers/status.rs`:
- Around line 26-27: The code currently selects indexer_state.value as text and
later uses unwrap_or(0), which silently coerces invalid/corrupt values to 0;
change the query that fetches the last indexed block (the sqlx::query_as call
that binds into row: Option<(String, chrono::DateTime<chrono::Utc>)>) to perform
an SQL-side cast of value to bigint (e.g. SELECT value::bigint AS value,
updated_at ...) so invalid data fails at query time, and remove the unwrap_or(0)
parsing fallback; apply the same change to the other occurrence that parses
indexer_state.value at the referenced lines (the second query/parse around lines
34-35) so both reads surface parse errors instead of returning 0.
In `@backend/crates/atlas-server/src/config.rs`:
- Around line 35-41: Add a #[cfg(test)] mod tests block in the same config.rs
that exercises the SSE_REPLAY_BUFFER_BLOCKS parsing branch: write tests that (1)
run with no SSE_REPLAY_BUFFER_BLOCKS env var and assert the default (4096) is
used, (2) set SSE_REPLAY_BUFFER_BLOCKS to a non-numeric value and assert parse
failure, (3) set it to "0" and assert the function bails with the "must be
between 1 and 100000" error, (4) set it to "100001" and assert the same
out-of-range error, and (5) set it to a valid in-range value (e.g., "12345") and
assert it is accepted; each test should manipulate std::env::set_var/remove_var
around invoking the config-loading function that contains the
sse_replay_buffer_blocks parsing and assert the Result/returned config or error
matches expectations.
---
Duplicate comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 34-95: Add a #[cfg(test)] mod tests in the same sse.rs and add
unit tests that exercise the async stream behavior produced by the
async_stream::stream! block (the logic that uses head_tracker.latest(),
get_latest_block(&pool), head_tracker.replay_after(last_block_number),
rx.recv(), and block_to_event) covering: (1) initial seed when
head_tracker.latest() returns a block, (2) initial seed via get_latest_block
fallback, (3) replay emission when snapshot.blocks_after_cursor contains blocks,
(4) the behind-tail branch where cursor + 1 < snapshot.buffer_start causes the
loop to break, and (5) the latest fallback branch where head_tracker.latest()
yields a newer block; implement lightweight test doubles/mocks for head_tracker,
rx, and get_latest_block and assert that sse_response(stream) yields the
expected events and stream termination, then run with cargo test --workspace.
In `@backend/crates/atlas-server/src/api/handlers/status.rs`:
- Around line 17-43: Add a #[cfg(test)] mod tests block to this file and add
unit tests exercising get_status's three branches: (1) head-tracker hit — create
an Arc<AppState> whose head_tracker.latest() returns Some(mock block with number
and indexed_at) and assert the returned ChainStatus matches; (2) indexer_state
fallback — provide a test DB pool containing a row in indexer_state
(key='last_indexed_block') and ensure head_tracker.latest() returns None, then
call get_status and assert parsed block_height and indexed_at come from the row;
(3) empty default — ensure head_tracker.latest() returns None and the
indexer_state query returns None and assert ChainStatus has block_height 0 and
indexed_at None; build AppState/State wrappers as in production, use async test
harness (#[tokio::test]) and sqlx test helpers for the pool, and run with cargo
test --workspace.
---
Nitpick comments:
In `@backend/crates/atlas-server/src/head.rs`:
- Around line 96-113: Add a unit test in a #[cfg(test)] mod tests block in the
same file that explicitly exercises Head::replay_after(Some(cursor)) when
state.replay is wrapped into two slices (i.e. a VecDeque that has wrapped-around
contents) to ensure the partition_point branch returns the correct ordering and
excludes blocks <= cursor; construct a Head (or its State) with a replay
VecDeque whose internal layout spans the boundary (so as_slices() yields two
non-empty slices) with known block.number values, call
replay_after(Some(cursor)) for a cursor in the middle, and assert the returned
ReplaySnapshot contains only the blocks > cursor in the correct order; run cargo
test --workspace to verify.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 527e66e3-99f1-41ef-a290-e4acf8b64097
📒 Files selected for processing (4)
backend/crates/atlas-server/src/api/handlers/sse.rsbackend/crates/atlas-server/src/api/handlers/status.rsbackend/crates/atlas-server/src/config.rsbackend/crates/atlas-server/src/head.rs
pthmas
left a comment
There was a problem hiding this comment.
Tested locally and everything is working. Only nit would be that the block table updates in the frontend are not as smooth as they used to be, but that was purely artificial and cosmetic so I am happy to leave it as is.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
backend/crates/atlas-server/src/api/handlers/sse.rs (1)
196-295:⚠️ Potential issue | 🟡 MinorThe
latest()fallback branch still isn't covered by tests.The new tests exercise seed, replay, and behind-tail termination, but none drives the
replay_after(...)-empty path on Lines 80-87 where the stream falls back tohead_tracker.latest(). That branch is part of the new delivery flow and easy to regress. As per coding guidelines:backend/**/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file, and run with cargo test --workspace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/api/handlers/sse.rs` around lines 196 - 295, Add a new #[tokio::test] in the same file (under the existing #[cfg(test)] mod tests) that drives the branch where replay_after(...) returns empty and the stream falls back to head_tracker.latest(): create a HeadTracker via HeadTracker::empty(...), publish an initial committed batch, create the stream with make_block_stream(dummy_pool(), tracker.clone(), rx), consume the initial seed, then trigger a broadcast without publishing new blocks (so tracker.replay_after(client_cursor) yields empty) and assert the next yielded event equals head_tracker.latest() (use HeadTracker::latest() to obtain expected value); this will cover the fallback branch referenced in replay_after -> latest().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-server/src/config.rs`:
- Around line 127-135: The rustfmt failure is caused by the assertion message
formatting in the SSE_REPLAY_BUFFER_BLOCKS test block; update the failing
assert! call that reads assert!(err.to_string().contains("must be between 1 and
100000"), "expected range error for {val}") to use Rust-style formatting (e.g.,
pass the message and placeholder args to assert! rather than a Python-style
"{val}"), leaving the logic intact around Config::from_env() and the
env::set_var("SSE_REPLAY_BUFFER_BLOCKS", val) loop; also ensure the following
non-numeric assertion remains unchanged.
In `@backend/crates/atlas-server/src/indexer/indexer.rs`:
- Around line 284-297: The code currently calls batch.materialize_blocks(...)
and head_tracker.publish_committed_batch(...) before persisting, which can
publish gaps or blocks that later roll back; change the flow so you only advance
the in-memory head after a successful write_batch(&mut copy_client, batch,
true).await, and publish only the contiguous committed prefix using the same
persisted indexed_at timestamp returned/confirmed by the write (e.g., compute
the contiguous prefix from batch excluding failed_blocks or use the DB write
result), then call
head_tracker.publish_committed_batch(committed_contiguous_blocks) and send on
block_events_tx; ensure publish_committed_batch is never called with a partial
non-contiguous batch or before the DB transaction succeeds.
---
Duplicate comments:
In `@backend/crates/atlas-server/src/api/handlers/sse.rs`:
- Around line 196-295: Add a new #[tokio::test] in the same file (under the
existing #[cfg(test)] mod tests) that drives the branch where replay_after(...)
returns empty and the stream falls back to head_tracker.latest(): create a
HeadTracker via HeadTracker::empty(...), publish an initial committed batch,
create the stream with make_block_stream(dummy_pool(), tracker.clone(), rx),
consume the initial seed, then trigger a broadcast without publishing new blocks
(so tracker.replay_after(client_cursor) yields empty) and assert the next
yielded event equals head_tracker.latest() (use HeadTracker::latest() to obtain
expected value); this will cover the fallback branch referenced in replay_after
-> latest().
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d023aad1-169d-4a09-bc7f-84f8cd25fe67
📒 Files selected for processing (5)
backend/crates/atlas-server/src/api/handlers/sse.rsbackend/crates/atlas-server/src/api/handlers/status.rsbackend/crates/atlas-server/src/config.rsbackend/crates/atlas-server/src/indexer/indexer.rsfrontend/src/hooks/useBlockSSE.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/crates/atlas-server/src/api/handlers/status.rs
Overview
This pr is an attempt to simplify the SSE system based on frontend requirements
Summary by CodeRabbit
New Features
Performance Improvements
Behavioral Changes
Frontend
Documentation