Conversation
📝 WalkthroughWalkthroughAdds a batching system and binary COPY path for bulk PostgreSQL ingestion: new tokio-postgres deps, modules Changes
Sequence DiagramsequenceDiagram
participant RPC as RPC Provider
participant Fetcher as Fetcher
participant Indexer as Indexer
participant Batch as BlockBatch
participant Copy as Copy Functions
participant DB as PostgreSQL
Note over Indexer: Startup
Indexer->>DB: Load known ERC20 & NFT contracts
Indexer->>DB: Get start block, ensure partitions
Note over Indexer: Main loop
Fetcher->>RPC: Batch fetch blocks + receipts
RPC-->>Fetcher: Block data + receipts
Fetcher-->>Indexer: Vec<FetchResult>
loop For each fetched block
Indexer->>Batch: collect_block (populate batch)
Batch->>Batch: accumulate blocks, txs, logs, transfers, balances
end
Indexer->>Copy: write_batch(BlockBatch)
Copy->>DB: create temp tables
Copy->>DB: Binary COPY rows (blocks, transactions, logs, transfers)
Copy->>DB: INSERT/UPSERT into target tables
DB-->>Copy: confirmation
Copy-->>Indexer: batch persisted
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @pthmas, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental shift in how the Atlas indexer interacts with its PostgreSQL database, moving from individual record inserts to an optimized batch processing approach. By leveraging PostgreSQL's binary COPY functionality, the indexer can now process blocks significantly faster, improving overall data ingestion rates and efficiency. This change involved substantial refactoring to collect data in memory before a single, consolidated write operation, enhancing both performance and transactional integrity. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a fantastic improvement, significantly boosting indexing performance by transitioning to batch inserts with PostgreSQL's binary COPY protocol. The refactoring of the indexer into more focused modules (fetcher, batch, copy) greatly enhances code clarity and maintainability. The overall architecture is solid, with a clear separation between data collection and database writing. My review includes a few suggestions to further refine the code for conciseness, maintainability, and security.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/crates/atlas-indexer/src/indexer.rs (2)
196-222:⚠️ Potential issue | 🔴 CriticalCritical data loss: blocks after a mid-batch failure are silently dropped.
When a block in the middle of a batch fails,
next_to_processgets stuck at the failed block number. All subsequently-fetched blocks are inserted intobufferbut never consumed by thewhile let Some(data) = buffer.remove(&next_to_process)loop. After the receive loop exits (blocks_received == batch_size), these buffered blocks are dropped. Sincecurrent_blockadvances toend_block + 1(Line 313), these blocks are permanently skipped.Example: batch covers blocks 3–8, block 5 fails → only blocks 3–4 are collected. Blocks 6–8 sit in
bufferunconsumed, then dropped. The retry logic only handles block 5. Blocks 6–8 are never written.Proposed fix — drain remaining buffer after receive loop
} } + // Drain any remaining blocks from the reorder buffer that were + // stranded behind a failed block. + while let Some((&block_num, _)) = buffer.iter().next() { + if let Some(data) = buffer.remove(&block_num) { + Self::collect_block(&mut batch, &known_erc20, &known_nft, data); + } + } + // Extract newly discovered contracts before consuming the batch.This drains the buffer in ascending order (BTreeMap guarantees sorted keys), so blocks are still collected sequentially. The "gap" from the failed block means these blocks are collected out-of-strict-sequence relative to the failed block, but since each block's data is self-contained and the failed block is retried separately, this is safe.
Alternatively, you could iterate
bufferdirectly since it's aBTreeMap:+ for (_, data) in buffer { + Self::collect_block(&mut batch, &known_erc20, &known_nft, data); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 196 - 222, The receive loop can leave later blocks in buffer unconsumed when a mid-batch block fails (blocks_received == batch_size), causing data loss; after the while blocks_received < batch_size loop (the match on result_rx.recv().await that handles FetchResult::Success and FetchResult::Error), drain the remaining entries from buffer in ascending order and call Self::collect_block for each so blocks > next_to_process are not dropped—use buffer's ordered nature (BTreeMap) to iterate/removing sequential keys starting at next_to_process until buffer is empty (or use while let Some(data) = buffer.remove(&next_to_process) in a loop) so buffered blocks are processed even when a gap exists from a failed block.
710-744:⚠️ Potential issue | 🟡 MinorPartition 0 is indistinguishable from "uninitialized", causing repeated
pg_classqueries.
current_max_partitionis initialized to0and the check on Line 723 (if current_max == 0) treats0as "first run", triggering apg_classlookup. After partition 0 is created and stored, the next call seescurrent_max == 0again and re-queries. This repeats on every batch until partition 1 is created (block 10M).A simple fix: use
u64::MAXas the sentinel value or wrap inOption<u64>/AtomicBoolto distinguish "uninitialized" from "partition 0 exists".Proposed fix using a sentinel
- current_max_partition: std::sync::atomic::AtomicU64::new(0), + current_max_partition: std::sync::atomic::AtomicU64::new(u64::MAX),- if current_max == 0 { + if current_max == u64::MAX {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 710 - 744, The current_max_partition atomic uses 0 as both "uninitialized" and "partition 0 exists", causing repeated pg_class queries in ensure_partitions_exist; change the sentinel to u64::MAX (or use an Option/AtomicBool) so you can detect uninitialized state: initialize current_max_partition to u64::MAX, update the check in ensure_partitions_exist from `if current_max == 0` to `if current_max == u64::MAX` (or the chosen uninitialized marker), and when you read an existing max from the pg_class query store the real value with self.current_max_partition.store(max as u64, Ordering::Relaxed) so subsequent calls skip the pg_class lookup; ensure all logic that computes start_partition and the fast-path comparison `if partition_num <= current_max` handles the sentinel correctly (treat sentinel as "need to query/create").
🧹 Nitpick comments (4)
backend/crates/atlas-indexer/src/batch.rs (2)
192-201: Avoid cloningBigDecimalon every delta accumulation.Line 197 clones
entry.deltaon every call.BigDecimalsupportsAddAssign(+=), which avoids an allocation.Proposed fix
- entry.delta = entry.delta.clone() + delta; + entry.delta += delta;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/batch.rs` around lines 192 - 201, The code in apply_balance_delta clones entry.delta then adds delta, causing an unnecessary allocation; replace the assignment "entry.delta = entry.delta.clone() + delta;" with an in-place addition using AddAssign (e.g. "entry.delta += delta;") to avoid cloning BigDecimal. Update the apply_balance_delta function to use entry.delta += delta; and keep the rest of the logic (BalanceDelta and last_block update) unchanged.
27-111: Consider aclear()method instead of allocating a freshBlockBatcheach iteration.Currently, a new
BlockBatchwith 40+ emptyVecs is allocated every batch cycle. Adding aclear()method that calls.clear()on each vec and map would allow reuse of the allocated capacity across iterations, reducing allocator pressure during sustained indexing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/batch.rs` around lines 27 - 111, Implement a reusable clear method on BlockBatch (e.g., impl BlockBatch { pub fn clear(&mut self) { ... } }) that calls .clear() on every Vec field (b_numbers, b_hashes, ..., et_timestamps, el_datas, etc.) and .clear() on every HashMap/HashSet (addr_map, nft_token_map, balance_map, new_erc20, new_nft), and reset scalar state like last_block to 0 (or another appropriate sentinel); then replace allocations in the batch loop to call batch.clear() instead of creating a new BlockBatch to preserve capacity and reduce allocator churn.backend/crates/atlas-indexer/src/indexer.rs (2)
267-274: Retried blocks bypass the main batch'sfailed_blockstracking for write errors.If
write_batchfails for a retried mini-batch (Line 272), the error propagates via?and terminates the entirerun()method. This is probably acceptable for the outer retry to handle, but it means a transient DB error on a single retry could abort an otherwise successful batch. Consider catching the write error and pushing the block back intostill_failedinstead.Proposed fix
Some(FetchResult::Success(fetched)) => { let mut mini_batch = BlockBatch::new(); Self::collect_block(&mut mini_batch, &known_erc20, &known_nft, fetched); let new_erc20 = std::mem::take(&mut mini_batch.new_erc20); let new_nft = std::mem::take(&mut mini_batch.new_nft); - self.write_batch(&mut copy_client, mini_batch).await?; - known_erc20.extend(new_erc20); - known_nft.extend(new_nft); + match self.write_batch(&mut copy_client, mini_batch).await { + Ok(()) => { + known_erc20.extend(new_erc20); + known_nft.extend(new_nft); + } + Err(e) => { + tracing::warn!("Write failed for retried block {}: {}", block_num, e); + still_failed.push((block_num, format!("Write failed: {}", e))); + continue; + } + } tracing::info!("Block {} retry succeeded", block_num);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 267 - 274, The retried mini-batch created in run() (built via BlockBatch::new() and populated by Self::collect_block) currently calls self.write_batch(...).await? which will propagate DB errors and abort the entire run; instead, change the call to handle the Result from write_batch (e.g. match or if let Err(e) = ...) and on error push the original fetched block back into the still_failed collection (the same failed-block tracking used by the main batch) and skip extending known_erc20/known_nft, logging the error; on success continue to take new_erc20/new_nft and extend known_erc20/known_nft as before. Ensure you reference the write_batch call, the mini_batch/new_erc20/new_nft handling, and still_failed update so retried write failures are recorded rather than propagated.
296-310: SQL referencesfailed_blocks.retry_count— verify this resolves to the existing row, not the table name alias.Line 302:
retry_count = failed_blocks.retry_count + 3— in theON CONFLICT DO UPDATEclause,failed_blockshere refers to the table. This is valid PostgreSQL syntax (the target table can be referenced by name in the UPDATE SET clause). However, adding 3 unconditionally on every batch is slightly misleading since some of the 3 retry attempts may have succeeded for other blocks. This is minor and cosmetic since the value represents "total attempts", not "total failures."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 296 - 310, The upsert is incrementing retry_count by +3 for every failed block and references the table name in the SET clause; change the UPDATE to increment the existing row's retry_count by 1 and use the EXCLUDED pseudo-row for the incoming error_message—i.e. in the SQL inside the loop that iterates over failed_blocks, set "error_message = EXCLUDED.error_message, retry_count = failed_blocks.retry_count + 1, last_failed_at = NOW()" (and keep the .bind(*block_num as i64) and .bind(error) calls), so each failed block increments attempts by one rather than adding 3 and uses EXCLUDED for the new error text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-indexer/src/fetcher.rs`:
- Line 45: The debug log uses start_block + count as u64 - 1 which underflows
when count == 0; add a guard at the start of the function (or before the
tracing::debug) to early-return when count == 0, or compute the end block safely
using saturating_sub/checked_sub (e.g., let end = start_block + (count as
u64).saturating_sub(1)) and use that in the tracing::debug call so no underflow
occurs; reference the tracing::debug call and the variables start_block and
count to locate where to add the check or safe computation.
- Around line 190-207: The current receipts_result handling in fetcher.rs
silently returns Ok(Vec::new()) when response_map.get(&receipts_id) is None or
when the response has no "result" field; change this to treat those cases as
errors so the block is retried: when response_map.get(&receipts_id) is None or
when resp.get("result") is None, return Err with a descriptive message (or
propagate into a FetchResult::Error path used by the surrounding code) instead
of Ok(Vec::new()); update the match arm for receipts_result (referencing
response_map, receipts_id, and receipts_result) to map missing/absent-result
cases to an Err and ensure the caller converts that Err into a
FetchResult::Error for retrying the block.
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 51-58: The COPY connection currently uses NoTls which can
downgrade security; update the tokio_postgres connect call that creates (mut
copy_client, connection) to use a TLS connector that matches the sqlx pool's
sslmode from self.config.database_url (e.g., parse the URL/sslmode and construct
a rustls/native-tls connector and pass it instead of NoTls), or use
tokio_postgres_rustls/tokio_native_tls to negotiate TLS like sqlx; ensure the
logic that parses the URL and chooses the TLS implementation is invoked before
creating the COPY connection so the COPY client uses the same TLS behavior as
the sqlx pool.
---
Outside diff comments:
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 196-222: The receive loop can leave later blocks in buffer
unconsumed when a mid-batch block fails (blocks_received == batch_size), causing
data loss; after the while blocks_received < batch_size loop (the match on
result_rx.recv().await that handles FetchResult::Success and
FetchResult::Error), drain the remaining entries from buffer in ascending order
and call Self::collect_block for each so blocks > next_to_process are not
dropped—use buffer's ordered nature (BTreeMap) to iterate/removing sequential
keys starting at next_to_process until buffer is empty (or use while let
Some(data) = buffer.remove(&next_to_process) in a loop) so buffered blocks are
processed even when a gap exists from a failed block.
- Around line 710-744: The current_max_partition atomic uses 0 as both
"uninitialized" and "partition 0 exists", causing repeated pg_class queries in
ensure_partitions_exist; change the sentinel to u64::MAX (or use an
Option/AtomicBool) so you can detect uninitialized state: initialize
current_max_partition to u64::MAX, update the check in ensure_partitions_exist
from `if current_max == 0` to `if current_max == u64::MAX` (or the chosen
uninitialized marker), and when you read an existing max from the pg_class query
store the real value with self.current_max_partition.store(max as u64,
Ordering::Relaxed) so subsequent calls skip the pg_class lookup; ensure all
logic that computes start_partition and the fast-path comparison `if
partition_num <= current_max` handles the sentinel correctly (treat sentinel as
"need to query/create").
---
Nitpick comments:
In `@backend/crates/atlas-indexer/src/batch.rs`:
- Around line 192-201: The code in apply_balance_delta clones entry.delta then
adds delta, causing an unnecessary allocation; replace the assignment
"entry.delta = entry.delta.clone() + delta;" with an in-place addition using
AddAssign (e.g. "entry.delta += delta;") to avoid cloning BigDecimal. Update the
apply_balance_delta function to use entry.delta += delta; and keep the rest of
the logic (BalanceDelta and last_block update) unchanged.
- Around line 27-111: Implement a reusable clear method on BlockBatch (e.g.,
impl BlockBatch { pub fn clear(&mut self) { ... } }) that calls .clear() on
every Vec field (b_numbers, b_hashes, ..., et_timestamps, el_datas, etc.) and
.clear() on every HashMap/HashSet (addr_map, nft_token_map, balance_map,
new_erc20, new_nft), and reset scalar state like last_block to 0 (or another
appropriate sentinel); then replace allocations in the batch loop to call
batch.clear() instead of creating a new BlockBatch to preserve capacity and
reduce allocator churn.
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 267-274: The retried mini-batch created in run() (built via
BlockBatch::new() and populated by Self::collect_block) currently calls
self.write_batch(...).await? which will propagate DB errors and abort the entire
run; instead, change the call to handle the Result from write_batch (e.g. match
or if let Err(e) = ...) and on error push the original fetched block back into
the still_failed collection (the same failed-block tracking used by the main
batch) and skip extending known_erc20/known_nft, logging the error; on success
continue to take new_erc20/new_nft and extend known_erc20/known_nft as before.
Ensure you reference the write_batch call, the mini_batch/new_erc20/new_nft
handling, and still_failed update so retried write failures are recorded rather
than propagated.
- Around line 296-310: The upsert is incrementing retry_count by +3 for every
failed block and references the table name in the SET clause; change the UPDATE
to increment the existing row's retry_count by 1 and use the EXCLUDED pseudo-row
for the incoming error_message—i.e. in the SQL inside the loop that iterates
over failed_blocks, set "error_message = EXCLUDED.error_message, retry_count =
failed_blocks.retry_count + 1, last_failed_at = NOW()" (and keep the
.bind(*block_num as i64) and .bind(error) calls), so each failed block
increments attempts by one rather than adding 3 and uses EXCLUDED for the new
error text.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
backend/crates/atlas-indexer/Cargo.tomlbackend/crates/atlas-indexer/src/batch.rsbackend/crates/atlas-indexer/src/copy.rsbackend/crates/atlas-indexer/src/fetcher.rsbackend/crates/atlas-indexer/src/indexer.rsbackend/crates/atlas-indexer/src/main.rsbackend/migrations/20240107000001_nft_transfers_unique.sql
| count: usize, | ||
| rate_limiter: &SharedRateLimiter, | ||
| ) -> Vec<FetchResult> { | ||
| tracing::debug!("Fetching batch: blocks {} to {}", start_block, start_block + count as u64 - 1); |
There was a problem hiding this comment.
Potential underflow when count == 0.
start_block + count as u64 - 1 will underflow (wrap around) if count is 0. Current callers always pass count >= 1, but this function is pub(crate) and has no guard.
Proposed fix — early return guard
) -> Vec<FetchResult> {
+ if count == 0 {
+ return Vec::new();
+ }
tracing::debug!("Fetching batch: blocks {} to {}", start_block, start_block + count as u64 - 1);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-indexer/src/fetcher.rs` at line 45, The debug log uses
start_block + count as u64 - 1 which underflows when count == 0; add a guard at
the start of the function (or before the tracing::debug) to early-return when
count == 0, or compute the end block safely using saturating_sub/checked_sub
(e.g., let end = start_block + (count as u64).saturating_sub(1)) and use that in
the tracing::debug call so no underflow occurs; reference the tracing::debug
call and the variables start_block and count to locate where to add the check or
safe computation.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
backend/crates/atlas-indexer/src/fetcher.rs (1)
44-45: Unresolved: underflow oncount == 0still present (also at line 116).
start_block + count as u64 - 1wraps tou64::MAXwhencount == 0; the same expression reappears in the success-retry log at line 116. The guard proposed in the previous review has not been applied.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/fetcher.rs` around lines 44 - 45, The debug log that prints "blocks {} to {}" uses start_block + count as u64 - 1 which underflows when count == 0; change the end-block calculation to a safe form (e.g., compute end_block = start_block.saturating_add(count as u64).saturating_sub(1) or only format the range when count > 0) and update both occurrences of this expression (the initial tracing::debug call and the success-retry log) so they never produce u64::MAX for count == 0.
🧹 Nitpick comments (4)
backend/crates/atlas-indexer/src/fetcher.rs (1)
156-164:BTreeMapis unnecessary here — preferHashMap.The response map is built and looked up by integer ID with no ordering requirement.
HashMap<u64, &serde_json::Value>gives O(1) amortised access vs. O(log n) forBTreeMap.♻️ Proposed change
- let mut response_map: BTreeMap<u64, &serde_json::Value> = BTreeMap::new(); + let mut response_map: HashMap<u64, &serde_json::Value> = HashMap::new();The
BTreeMapimport can also be removed from the use statement at line 7 (or replaced withHashMapifBTreeMapisn't used elsewhere).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/fetcher.rs` around lines 156 - 164, Replace the BTreeMap usage with a HashMap for response_map: change the type from BTreeMap<u64, &serde_json::Value> to HashMap<u64, &serde_json::Value>, construct a HashMap with capacity (e.g., HashMap::with_capacity(count) or default), and update the insert/lookups that use response_map; also remove or replace the BTreeMap import in the use statements so HashMap is imported instead. Keep the rest of the loop over batch_response and the id extraction logic unchanged.backend/crates/atlas-indexer/src/batch.rs (1)
1-142: Clean columnar batch design — looks good overall.The columnar Vec layout, HashMap-based deduplication for addresses and NFT tokens, and BigDecimal delta aggregation for balances are all well-structured.
touch_addr's use ofentry().or_insert+min/|=/+=is idiomatic and correct.One minor suggestion:
AddrState,NftTokenState, andBalanceDeltadon't deriveDebug. Adding#[derive(Debug)]to these three structs would make it easier to add structured tracing/log statements on the batch contents when diagnosing issues.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/batch.rs` around lines 1 - 142, Add Debug derives to the small state structs so they can be easily logged; update the definitions of AddrState, NftTokenState, and BalanceDelta by adding #[derive(Debug)] above each struct declaration (refer to the types AddrState, NftTokenState, and BalanceDelta in block.rs) so tracing/log statements can print their contents.backend/crates/atlas-indexer/src/indexer.rs (1)
49-79: TLS-conditional path correctly addresses the previousNoTlsdowngrade concern.The
connect_copy_clienthelper now chooses TLS vs plain TCP based onsslmode, which resolves the reported security gap.One optional hardening:
database_url.contains("sslmode=require")is a substring search and would false-positive on edge cases (e.g., a password containing that literal string, orsslmode=requireappearing in a fragment). Parsing the URL with a proper connector string parser would be more robust.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 49 - 79, The current needs_tls detection in connect_copy_client uses a substring search on database_url which can yield false positives; instead parse the connection string properly and read the sslmode setting to decide TLS. Update connect_copy_client to parse database_url with a proper parser (e.g., use tokio_postgres::Config::from_str or url::Url for URI forms, or postgres::config::Config::from_str for libpq-style strings), then inspect the parsed sslmode/query parameter to set needs_tls rather than using database_url.contains(...); keep the rest of the function (tls setup, MakeRustlsConnect usage, NoTls path, and spawn handling) unchanged.backend/crates/atlas-indexer/Cargo.toml (1)
29-32: Upgradetokio-postgres-rustlsto 0.13 and configure therustlscrypto provider.
tokio-postgres-rustls 0.12is outdated; version 0.13.0 (released 2024-10-19) includes bug fixes and improved documentation. Upgrade to"0.13".Additionally,
rustls 0.23defaults to theaws-lc-rscrypto provider (enabled by default), which requires a C build toolchain. Specify theringprovider instead to avoid this C dependency:rustls = { version = "0.23", default-features = false, features = ["ring"] }Alternatively, if you prefer to keep the default behavior, explicitly allow it in your build configuration for clarity.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/Cargo.toml` around lines 29 - 32, Update the dependency entries: bump tokio-postgres-rustls to "0.13" and change the rustls entry to explicitly disable default-features and enable the ring provider by setting rustls = { version = "0.23", default-features = false, features = ["ring"] } so the project avoids the aws-lc-rs C toolchain; adjust the Cargo.toml lines for tokio-postgres-rustls and rustls accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 266-267: The current code silently ignores a potential panic from
the spawned task by doing `let _ = send_task.await`; change this to inspect the
`JoinError` and fail fast (or return an Err) so the caller doesn't hang waiting
in the `while blocks_received < batch_size` loop; for example, replace the
discard with matching the result of `send_task.await` and propagate or panic
with a descriptive message referencing `send_task` (so the panic is visible),
ensuring the code that waits on `blocks_received` / `batch_size` won't deadlock
if the send task panics.
- Around line 693-700: The upsert in write_batch unconditionally overwrites
indexer_state.last_indexed_block with last_block, allowing retries of earlier
blocks (e.g., mini_batch.last_block) to regress the checkpoint; change the
upsert so the stored value is monotonic by only updating when the incoming
last_block is greater than the current value (e.g., use a SQL expression that
sets value = GREATEST(existing_value, $1) or add a DO UPDATE ... SET value = $1
WHERE (existing_value::bigint) < ($1::bigint)), ensuring last_indexed_block
never decreases; update the INSERT/ON CONFLICT for indexer_state (key =
'last_indexed_block') in write_batch to perform this conditional/greater-than
update.
---
Duplicate comments:
In `@backend/crates/atlas-indexer/src/fetcher.rs`:
- Around line 44-45: The debug log that prints "blocks {} to {}" uses
start_block + count as u64 - 1 which underflows when count == 0; change the
end-block calculation to a safe form (e.g., compute end_block =
start_block.saturating_add(count as u64).saturating_sub(1) or only format the
range when count > 0) and update both occurrences of this expression (the
initial tracing::debug call and the success-retry log) so they never produce
u64::MAX for count == 0.
---
Nitpick comments:
In `@backend/crates/atlas-indexer/Cargo.toml`:
- Around line 29-32: Update the dependency entries: bump tokio-postgres-rustls
to "0.13" and change the rustls entry to explicitly disable default-features and
enable the ring provider by setting rustls = { version = "0.23",
default-features = false, features = ["ring"] } so the project avoids the
aws-lc-rs C toolchain; adjust the Cargo.toml lines for tokio-postgres-rustls and
rustls accordingly.
In `@backend/crates/atlas-indexer/src/batch.rs`:
- Around line 1-142: Add Debug derives to the small state structs so they can be
easily logged; update the definitions of AddrState, NftTokenState, and
BalanceDelta by adding #[derive(Debug)] above each struct declaration (refer to
the types AddrState, NftTokenState, and BalanceDelta in block.rs) so tracing/log
statements can print their contents.
In `@backend/crates/atlas-indexer/src/fetcher.rs`:
- Around line 156-164: Replace the BTreeMap usage with a HashMap for
response_map: change the type from BTreeMap<u64, &serde_json::Value> to
HashMap<u64, &serde_json::Value>, construct a HashMap with capacity (e.g.,
HashMap::with_capacity(count) or default), and update the insert/lookups that
use response_map; also remove or replace the BTreeMap import in the use
statements so HashMap is imported instead. Keep the rest of the loop over
batch_response and the id extraction logic unchanged.
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 49-79: The current needs_tls detection in connect_copy_client uses
a substring search on database_url which can yield false positives; instead
parse the connection string properly and read the sslmode setting to decide TLS.
Update connect_copy_client to parse database_url with a proper parser (e.g., use
tokio_postgres::Config::from_str or url::Url for URI forms, or
postgres::config::Config::from_str for libpq-style strings), then inspect the
parsed sslmode/query parameter to set needs_tls rather than using
database_url.contains(...); keep the rest of the function (tls setup,
MakeRustlsConnect usage, NoTls path, and spawn handling) unchanged.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/crates/atlas-indexer/Cargo.tomlbackend/crates/atlas-indexer/src/batch.rsbackend/crates/atlas-indexer/src/fetcher.rsbackend/crates/atlas-indexer/src/indexer.rs
| // Wait for send task to complete | ||
| let _ = send_task.await; |
There was a problem hiding this comment.
send_task panic is silently swallowed.
let _ = send_task.await discards a JoinError caused by a panic. If send_task panics mid-send, fewer work items than batch_size reach the workers, and the while blocks_received < batch_size loop below will stall waiting for results that never arrive.
🛡️ Proposed fix
- let _ = send_task.await;
+ if let Err(e) = send_task.await {
+ return Err(anyhow::anyhow!("send_task panicked: {:?}", e));
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Wait for send task to complete | |
| let _ = send_task.await; | |
| // Wait for send task to complete | |
| if let Err(e) = send_task.await { | |
| return Err(anyhow::anyhow!("send_task panicked: {:?}", e)); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 266 - 267, The
current code silently ignores a potential panic from the spawned task by doing
`let _ = send_task.await`; change this to inspect the `JoinError` and fail fast
(or return an Err) so the caller doesn't hang waiting in the `while
blocks_received < batch_size` loop; for example, replace the discard with
matching the result of `send_task.await` and propagate or panic with a
descriptive message referencing `send_task` (so the panic is visible), ensuring
the code that waits on `blocks_received` / `batch_size` won't deadlock if the
send task panics.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/crates/atlas-indexer/src/indexer.rs (1)
819-826:⚠️ Potential issue | 🟡 Minor
truncate_tablesomitstx_hash_lookupandfailed_blocks, leaving stale data after reindex.
tx_hash_lookupinserts useON CONFLICT DO NOTHING, so stale rows silently survive a reindex and ghost entries remain queryable.failed_blocksretains oldretry_countvalues that accumulate incorrectly across reindexes.🐛 Proposed fix
sqlx::query( - "TRUNCATE blocks, transactions, addresses, nft_contracts, nft_tokens, nft_transfers, - erc20_contracts, erc20_transfers, erc20_balances, event_logs, proxy_contracts, indexer_state CASCADE" + "TRUNCATE blocks, transactions, addresses, nft_contracts, nft_tokens, nft_transfers, + erc20_contracts, erc20_transfers, erc20_balances, event_logs, proxy_contracts, + tx_hash_lookup, failed_blocks, indexer_state CASCADE" )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 819 - 826, The TRUNCATE SQL in async fn truncate_tables currently omits tx_hash_lookup and failed_blocks causing stale data; update the SQL string inside truncate_tables (the sqlx::query call) to include tx_hash_lookup and failed_blocks in the comma-separated table list so both tables are truncated with CASCADE before returning Ok(()). Ensure you modify the query associated with .execute(&self.pool).await? within truncate_tables.
♻️ Duplicate comments (1)
backend/crates/atlas-indexer/src/indexer.rs (1)
266-267:send_taskpanic still silently swallowed.
let _ = send_task.awaitdiscards anyJoinErrorfrom a panic. This was flagged in a prior review and remains unaddressed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 266 - 267, The code currently swallows a potential panic by doing let _ = send_task.await; — change this to handle the JoinError from send_task.await instead of discarding it: await the JoinHandle, match or if let Err(e) = send_task.await and then either propagate the error (use ? or convert JoinError into the function's error type) or log the panic with context (e.g., tracing::error!("send_task panicked: {:?}", e)) and return an error; reference the send_task JoinHandle and its await call so the change replaces the ignored result with explicit error handling.
🧹 Nitpick comments (1)
backend/crates/atlas-indexer/src/indexer.rs (1)
56-78: Minor duplication inconnect_copy_client: the connection-spawning block is identical in both branches.♻️ Suggested collapse
async fn connect_copy_client(database_url: &str) -> Result<Client> { let needs_tls = database_url.contains("sslmode=require") || database_url.contains("sslmode=verify-ca") || database_url.contains("sslmode=verify-full"); - if needs_tls { + let client = if needs_tls { let mut root_store = rustls::RootCertStore::empty(); root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); let tls_config = rustls::ClientConfig::builder() .with_root_certificates(root_store) .with_no_client_auth(); let tls = MakeRustlsConnect::new(tls_config); - let (client, connection) = tokio_postgres::connect(database_url, tls).await?; - tokio::spawn(async move { - if let Err(e) = connection.await { - tracing::error!("copy connection error: {}", e); - } - }); - Ok(client) + let (client, conn) = tokio_postgres::connect(database_url, tls).await?; + (client, conn as _) // box as dyn Future if needed, or use macro below } else { - let (client, connection) = tokio_postgres::connect(database_url, NoTls).await?; - tokio::spawn(async move { - if let Err(e) = connection.await { - tracing::error!("copy connection error: {}", e); - } - }); - Ok(client) - } + let (client, conn) = tokio_postgres::connect(database_url, NoTls).await?; + (client, conn as _) + }; + // Alternatively keep a macro or helper: + // fn spawn_conn(conn: impl Future<Output=Result<(),_>> + Send + 'static) { ... } + Ok(client) }Since
tokio_postgres::Connectionis not object-safe, the simplest approach is a shared inner helper that accepts the(Client, Connection<…>)pair and spawns the driver task:fn drive_connection<S, T>(connection: tokio_postgres::Connection<S, T>) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!("copy connection error: {}", e); } }); }Then each branch calls
drive_connection(connection)followed byreturn Ok(client).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 56 - 78, The duplicated connection-spawning logic in connect_copy_client should be extracted into a small helper like drive_connection that accepts the tokio_postgres::Connection<S, T> and spawns the driver task; implement drive_connection with the appropriate bounds (S: AsyncRead + AsyncWrite + Unpin + Send + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static) and the existing tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!("copy connection error: {}", e); } }); then call drive_connection(connection) in both the TLS and NoTls branches and return Ok(client).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 819-826: The TRUNCATE SQL in async fn truncate_tables currently
omits tx_hash_lookup and failed_blocks causing stale data; update the SQL string
inside truncate_tables (the sqlx::query call) to include tx_hash_lookup and
failed_blocks in the comma-separated table list so both tables are truncated
with CASCADE before returning Ok(()). Ensure you modify the query associated
with .execute(&self.pool).await? within truncate_tables.
---
Duplicate comments:
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 266-267: The code currently swallows a potential panic by doing
let _ = send_task.await; — change this to handle the JoinError from
send_task.await instead of discarding it: await the JoinHandle, match or if let
Err(e) = send_task.await and then either propagate the error (use ? or convert
JoinError into the function's error type) or log the panic with context (e.g.,
tracing::error!("send_task panicked: {:?}", e)) and return an error; reference
the send_task JoinHandle and its await call so the change replaces the ignored
result with explicit error handling.
---
Nitpick comments:
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 56-78: The duplicated connection-spawning logic in
connect_copy_client should be extracted into a small helper like
drive_connection that accepts the tokio_postgres::Connection<S, T> and spawns
the driver task; implement drive_connection with the appropriate bounds (S:
AsyncRead + AsyncWrite + Unpin + Send + 'static, T: AsyncRead + AsyncWrite +
Unpin + Send + 'static) and the existing tokio::spawn(async move { if let Err(e)
= connection.await { tracing::error!("copy connection error: {}", e); } }); then
call drive_connection(connection) in both the TLS and NoTls branches and return
Ok(client).
Overview
For a slow single thread processor this boosted the indexer from 400 to 5000blocks/s
Summary by CodeRabbit
New Features
Refactor
Bug Fixes