refactor: parallelize collect_event for tipset range#6881
Conversation
WalkthroughAdds concurrent per-tipset event collection via a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler as EthEventHandler
participant Tasks as FuturesOrdered(TaskQueue)
participant DB as Blockstore/DB
Client->>Handler: get_events_for_parsed_filter(range)
Handler->>Tasks: spawn collect_events for each Tipset
Tasks->>DB: collect_events(Tipset) [concurrent]
DB-->>Tasks: Vec<CollectedEvent> (per task)
Tasks-->>Handler: task results (ordered/completed)
Handler->>Handler: merge results, enforce max_filter_results
Handler-->>Client: combined Vec<CollectedEvent>
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/rpc/methods/eth/filter/mod.rs (1)
415-425: Add tipset context to task failures.This new async boundary will otherwise surface a range error without saying which tipset failed. Wrapping the await with tipset context keeps the failure actionable.
Suggested fix
for tipset in max_tipset .chain(&ctx.store()) .take_while(|ts| ts.epoch() >= *range.start()) { tasks.push_back(async move { + let epoch = tipset.epoch(); let mut collected_events = vec![]; Self::collect_events( ctx, &tipset, Some(pf), skip_event, &mut collected_events, ) - .await?; + .await + .with_context(|| format!("collecting events for tipset {epoch}"))?; anyhow::Ok(collected_events) }); }As per coding guidelines,
Use anyhow::Result<T> for most operations and add context with .context() when errors occur.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 415 - 425, The task closure pushed via tasks.push_back that calls Self::collect_events currently returns raw errors without tipset info; modify the awaited call to add context on failure (use anyhow::Result and .context()) so failures include which tipset failed—wrap the .await result of Self::collect_events(...) with .context(format!("collect_events failed for tipset {:?}", tipset)) (or equivalent) inside the async move block so any error surfaced from collect_events includes the tipset identifier.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 410-429: The parallel tipset loop can accumulate more than the
global max_filter_results because each task enforces the cap locally; modify the
drain loop that iterates over tasks.try_next() to enforce the global cap when
extending collected_events from each task: after receiving events from
Self::collect_events, append only up to (max_filter_results -
collected_events.len()) items and break out when the global cap is reached. Also
bound concurrent in-flight tipset tasks (the FuturesOrdered producer over
max_tipset.chain(&ctx.store())) by adding a semaphore or a limited buffer so you
don’t spawn tasks for the entire range at once.
---
Nitpick comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 415-425: The task closure pushed via tasks.push_back that calls
Self::collect_events currently returns raw errors without tipset info; modify
the awaited call to add context on failure (use anyhow::Result and .context())
so failures include which tipset failed—wrap the .await result of
Self::collect_events(...) with .context(format!("collect_events failed for
tipset {:?}", tipset)) (or equivalent) inside the async move block so any error
surfaced from collect_events includes the tipset identifier.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c72a743d-348e-4aae-92a2-9c4da8c30951
📒 Files selected for processing (1)
src/rpc/methods/eth/filter/mod.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 7 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/rpc/methods/eth/filter/mod.rs (3)
266-272: Narrow or document this new helper API.
collect_events_for_tipsetslooks like an internal implementation detail in this file, but it is exposed aspuband has no rustdoc. If it is not meant for external callers, please keep it private; otherwise add a doc comment before exporting it.As per coding guidelines,
**/*.rs: Document public functions and structs with doc comments.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 266 - 272, The function collect_events_for_tipsets is currently public without documentation; either mark it private (remove pub) if it's an internal helper, or add a Rust doc comment describing its purpose, parameters (ctx, tipsets, spec, skip_event, collected_events), behavior, return type, and thread-safety assumptions before exporting it so it conforms to the project's public API docs requirement for collect_events_for_tipsets.
275-278: Add tipset context before propagating task errors.A bare error from
collect_eventsis harder to triage once this runs through a concurrent queue, because the failing tipset is no longer obvious at the call site.Suggested change
for tipset in tipsets { tasks.push_back(async move { let mut events = vec![]; - Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?; + Self::collect_events(ctx, &tipset, spec, skip_event, &mut events) + .await + .with_context(|| { + format!("collecting events for tipset at epoch {}", tipset.epoch()) + })?; anyhow::Ok(events) }); }As per coding guidelines,
**/*.rs: Useanyhow::Result<T>for most operations and add context with.context()when errors occur.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 275 - 278, The task closure pushing to tasks (the async block calling Self::collect_events) currently returns raw errors from collect_events, making it hard to know which tipset failed; update the error propagation to add context that includes the tipset info before returning (use anyhow::Context/.context()). Specifically wrap the await call to Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await with .context(format!(...)) or similar so the resulting anyhow::Result from the closure includes the tipset identifier, ensuring the closure still returns anyhow::Result<Vec<...>> but with added context for easier triage.
420-449: Please add a regression test for the concurrent Range path.This branch changed the collection strategy while keeping the same external contract. A focused test spanning multiple tipsets should assert stable event ordering/no duplication and merged
max_filter_resultsenforcement so this path stays Lotus-compatible as it evolves.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 420 - 449, Add a regression test that exercises the ParsedFilterTipsets::Range branch and the concurrent collection behavior of Self::collect_events_for_tipsets: build a fake chain with multiple sequential tipsets (including a heaviest tipset to trigger the heaviest_epoch logic and ResolveNullTipset::TakeOlder path), install events across tipsets, then call the RPC/filter-range code path to collect events; assert the returned events have stable deterministic ordering, no duplicates, and that the merged max_filter_results limit is enforced across tipsets. Ensure the test hits the code that computes max_height (including the -1 heaviest case) and uses ctx.chain_index().tipset_by_height(...) so the new Range collection strategy is exercised end-to-end.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 266-272: The function collect_events_for_tipsets is currently
public without documentation; either mark it private (remove pub) if it's an
internal helper, or add a Rust doc comment describing its purpose, parameters
(ctx, tipsets, spec, skip_event, collected_events), behavior, return type, and
thread-safety assumptions before exporting it so it conforms to the project's
public API docs requirement for collect_events_for_tipsets.
- Around line 275-278: The task closure pushing to tasks (the async block
calling Self::collect_events) currently returns raw errors from collect_events,
making it hard to know which tipset failed; update the error propagation to add
context that includes the tipset info before returning (use
anyhow::Context/.context()). Specifically wrap the await call to
Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await with
.context(format!(...)) or similar so the resulting anyhow::Result from the
closure includes the tipset identifier, ensuring the closure still returns
anyhow::Result<Vec<...>> but with added context for easier triage.
- Around line 420-449: Add a regression test that exercises the
ParsedFilterTipsets::Range branch and the concurrent collection behavior of
Self::collect_events_for_tipsets: build a fake chain with multiple sequential
tipsets (including a heaviest tipset to trigger the heaviest_epoch logic and
ResolveNullTipset::TakeOlder path), install events across tipsets, then call the
RPC/filter-range code path to collect events; assert the returned events have
stable deterministic ordering, no duplicates, and that the merged
max_filter_results limit is enforced across tipsets. Ensure the test hits the
code that computes max_height (including the -1 heaviest case) and uses
ctx.chain_index().tipset_by_height(...) so the new Range collection strategy is
exercised end-to-end.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 183e8980-3221-4597-b51d-948c6d8e7786
📒 Files selected for processing (1)
src/rpc/methods/eth/filter/mod.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/rpc/methods/eth/filter/mod.rs (1)
371-375:⚠️ Potential issue | 🟠 MajorReject the
max + 1th event here.Because this
ensure!runs beforepush,len() == max_filter_resultsstill passes and the next event is appended. The Range path re-checks during merge, but theHash/Keybranches in this file and the directEthEventHandler::collect_eventscaller insrc/rpc/methods/eth.rs:1311can still return one more event than configured, which breaks the max-results contract.Suggested fix
let ce = CollectedEvent { entries, emitter_addr: resolved, event_idx, reverted: false, height, tipset_key: tipset_key.clone(), msg_idx: msg_idx as u64, msg_cid: message.cid(), }; ensure!( - collected_events.len() <= max_filter_results, + collected_events.len() < max_filter_results, "filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter" ); collected_events.push(ce);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 371 - 375, The ensure! check allows one extra event because it runs before pushing; update the guard in the collector so it rejects the (max+1)th item—either change the condition to require collected_events.len() < max_filter_results or use collected_events.len().saturating_add(1) <= max_filter_results (or move the check to after push and test > max_filter_results) in the block that contains ensure! and collected_events.push(ce); make this change in the same code that implements EthEventHandler::collect_events and the Hash/Key/Range branches so the max_filter_results contract is enforced everywhere.
🧹 Nitpick comments (1)
src/rpc/methods/eth/filter/mod.rs (1)
275-279: Add tipset context to per-task failures.If one branch fails,
try_next()only surfaces the inner error, so you lose which tipset caused the failure. Wrapping the await with the epoch keeps range-scan failures diagnosable.Suggested fix
for tipset in tipsets { + let epoch = tipset.epoch(); tasks.push_back(async move { let mut events = vec![]; - Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?; + Self::collect_events(ctx, &tipset, spec, skip_event, &mut events) + .await + .with_context(|| format!("collecting events for epoch {epoch}"))?; anyhow::Ok(events) }); }As per coding guidelines, "Use anyhow::Result for most operations and add context with
.context()when errors occur".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rpc/methods/eth/filter/mod.rs` around lines 275 - 279, The per-task closure pushing to tasks uses Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await but loses which tipset failed; wrap the await result in an anyhow context that includes the tipset identifier (e.g., epoch or cid) so failures report the tipset that caused them. Update the async closure (the task created in tasks.push_back with Self::collect_events) to call .context(...) on the awaited Result to append the tipset context before returning anyhow::Ok(events).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 371-375: The ensure! check allows one extra event because it runs
before pushing; update the guard in the collector so it rejects the (max+1)th
item—either change the condition to require collected_events.len() <
max_filter_results or use collected_events.len().saturating_add(1) <=
max_filter_results (or move the check to after push and test >
max_filter_results) in the block that contains ensure! and
collected_events.push(ce); make this change in the same code that implements
EthEventHandler::collect_events and the Hash/Key/Range branches so the
max_filter_results contract is enforced everywhere.
---
Nitpick comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 275-279: The per-task closure pushing to tasks uses
Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await but
loses which tipset failed; wrap the await result in an anyhow context that
includes the tipset identifier (e.g., epoch or cid) so failures report the
tipset that caused them. Update the async closure (the task created in
tasks.push_back with Self::collect_events) to call .context(...) on the awaited
Result to append the tipset context before returning anyhow::Ok(events).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5328e85e-5ee8-488a-b197-292d426bbad4
📒 Files selected for processing (1)
src/rpc/methods/eth/filter/mod.rs
Summary of changes
To mitigate #6879
(Perf is still worse than Lotus due to lack of SQL index)
Test result with https://github.com/hugomrdias/foxer
Command:
bun --filter foc-api devChanges introduced in this pull request:
Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit