feat(worker): make upload_finisher source-of-truth for commit uploads#756
feat(worker): make upload_finisher source-of-truth for commit uploads#756thomasrockhu-codecov wants to merge 19 commits intomainfrom
Conversation
Ensure upload_finisher reconstructs all mergeable uploads for a commit rather than relying only on callback payload, and stop processor-side finisher enqueueing now that finisher input is source-of-truth. Made-with: Cursor
Update upload state transitions so successful finisher merges write MERGED state, and align finisher test expectations with the new terminal state. Made-with: Cursor
Drop old per-commit concurrency limiter test coverage that no longer matches upload_finisher behavior and was failing test module import. Made-with: Cursor
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #756 +/- ##
==========================================
- Coverage 92.26% 92.26% -0.01%
==========================================
Files 1304 1304
Lines 47925 47923 -2
Branches 1628 1628
==========================================
- Hits 44218 44214 -4
- Misses 3398 3400 +2
Partials 309 309
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
Use reconstructed results as the base and let callback payload win on duplicate upload IDs so stale reconstruction cannot overwrite successful processor outcomes. Made-with: Cursor
Use the full processed set instead of random sampling so finisher reconstruction cannot miss uploads and produce incomplete merged reports. Made-with: Cursor
drazisil-codecov
left a comment
There was a problem hiding this comment.
Approved with a question.
Treat only merged and error uploads as already completed and rename the guard variable to all_already_merged for clearer intent. Made-with: Cursor
Treat processed uploads as already finalized in finisher idempotency checks to avoid duplicate merges during mixed-version deployments. Made-with: Cursor
Guard update_uploads against malformed unsuccessful processing results by treating missing error payloads as unknown_processing instead of crashing with UnboundLocalError. Made-with: Cursor
Add a regression test for unsuccessful processing results without an error dict to ensure we persist UNKNOWN_PROCESSING instead of crashing. Made-with: Cursor
…ibilities Aggregate follow-up task scheduling through an enum-driven helper, extract finisher gate operations into a dedicated module, move remaining upload counting into ProcessingState, drop sweep-attempt caps, rely on DB reconstruction as source of truth, and remove UploadFlow wiring from processor/finisher paths. Made-with: Cursor
b2da3b8 to
c3ff0f9
Compare
Use followup-type countdown mapping in a single scheduler and process uploads in batches of 10 with early continuation when time budget is near exhaustion, then re-read merge-ready uploads from ProcessingState. Made-with: Cursor
Persist processed-state visibility before finisher enqueue, remove redundant merged transition in finisher, and update idempotency-era test expectations to match state-driven reconstruction behavior. Made-with: Cursor
Pass db_session into ProcessingState usage in upload scheduling and align finisher unit setup with state-reconstruction behavior so worker CI paths continue to run under DB-backed processing state. Made-with: Cursor
Apply ruff formatting output required by the CI Run Lint hook for touched worker files. Made-with: Cursor
Return the updated master report from batch merges and stop rolling back before final save so staged upload state transitions are retained through commit. Made-with: Cursor
| "reportid": str(upload.report.external_id), | ||
| }, | ||
| "successful": bool(has_report), | ||
| "upload_id": upload.id_, |
There was a problem hiding this comment.
Reconstruction loses error status when intermediate report exists
Medium Severity
The _reconstruct_processing_results method determines upload success by checking whether an intermediate report exists in Redis (redis_connection.exists(report_key)). However, in processing.py, when processing_result.error is truthy but processing_result.report is also present, both the error is recorded AND the intermediate report is saved. The original callback payload correctly set successful=False in that case, but the reconstruction path now sees the intermediate report exists and marks the upload as successful=True. This causes uploads with processing errors to be incorrectly merged as successful instead of being marked as errors.
Additional Locations (1)
| "commit_yaml": {}, | ||
| "trigger": "cron", | ||
| } | ||
| ) |
There was a problem hiding this comment.
Stuck uploads cron passes empty YAML configuration
Medium Severity
The StuckUploadsCheckTask triggers the merger with commit_yaml: {}, an empty dict. The merger uses this YAML for report merging configuration including coverage precision, rounding, carry-forward session clearing, and notification settings. Commits with custom YAML configuration (e.g., coverage.precision, flag carry-forward rules) would be merged with default settings instead, potentially producing incorrect coverage values or failing to clear stale carry-forward sessions.
| merge_result, | ||
| ) | ||
|
|
||
| return master_report, processing_results |
There was a problem hiding this comment.
Unused diff parameter in merge_batch function signature
Low Severity
The merge_batch function accepts a diff parameter that is never used within the function body. The diff is instead applied later in save_and_commit. This dead parameter is misleading — a reader might expect the diff to be applied during merging, and a future maintainer could accidentally remove the save_and_commit diff application thinking it's handled in merge_batch.
| # this is a noop in normal cases, but relevant for task retries: | ||
| state.mark_uploads_as_processing([upload_id]) |
There was a problem hiding this comment.
Bug: A call is made to state.mark_uploads_as_processing(), but this method was removed from the refactored ProcessingState class, which will cause a runtime AttributeError.
Severity: CRITICAL
Suggested Fix
Remove the call to the non-existent mark_uploads_as_processing method from processing.py and upload.py. The refactoring of ProcessingState to be database-backed made this state-tracking call obsolete, and its removal from the callers will align the code with the new design.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: apps/worker/services/processing/processing.py#L51-L52
Potential issue: Code in `processing.py` and `upload.py` calls the method
`state.mark_uploads_as_processing()`. However, this method was removed from the
refactored `ProcessingState` class, which is now database-backed instead of
Redis-backed. As a result, any attempt to process an upload will raise an
`AttributeError` because the method does not exist on the `ProcessingState` object. This
will cause the task to crash and completely block the upload processing pipeline.
| "continuation_needed": remaining_processed_uploads > 0, | ||
| "processing_results": merged_processing_results, | ||
| "upload_ids": merged_upload_ids, | ||
| } |
There was a problem hiding this comment.
Inverted time check causes merge loop to never execute
High Severity
The time-budget condition in _process_reports_with_lock is inverted. On the first iteration, time.monotonic() - merge_start_time is ~0, which is always <= FINISHER_CONTINUATION_BUFFER_SECONDS (10). This causes an immediate return before any merging work is done. The defined constant FINISHER_MERGE_TIME_BUDGET_SECONDS (200) is never referenced in the check. The correct condition likely needs to check whether elapsed time has exceeded the budget minus the buffer, e.g., >= FINISHER_MERGE_TIME_BUDGET_SECONDS - FINISHER_CONTINUATION_BUFFER_SECONDS. As written, the finisher will never merge any reports and will instead spin in an infinite self-scheduling continuation loop.
| merge_result, | ||
| ) | ||
|
|
||
| return master_report, processing_results |
There was a problem hiding this comment.
Unused diff parameter in merge_batch function
Low Severity
The diff parameter in merge_batch is declared but never referenced in the function body. The diff is only applied later in save_and_commit. This dead parameter adds confusion about where diff application occurs and could mislead future contributors into thinking it's handled per-batch.
Reapply no-chord coverage scheduling in UploadTask, restore ProcessingState APIs used by callers/tests, bring back finisher gate existence helper, and keep finisher merge-loop helper improvements with corrected continuation timing. Made-with: Cursor
| "run_impl: Saving combined report", | ||
| extra={"processing_results": current_batch}, | ||
| ) | ||
| save_and_commit( | ||
| db_session, | ||
| report_service, | ||
| commit, | ||
| report, | ||
| pending_upload_ids, | ||
| diff, | ||
| ) |
There was a problem hiding this comment.
Bug: The report processing loop may apply the same diff multiple times when handling uploads in batches, as the report is reloaded from storage on each iteration.
Severity: MEDIUM
Suggested Fix
To prevent the diff from being applied multiple times, load the master report from storage once before the batch processing loop begins. Pass this report object through each iteration, merging new data and only applying the diff once before the final save after the loop completes.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: apps/worker/tasks/upload_finisher.py#L415-L442
Potential issue: In the batch processing loop within `_process_reports_with_lock`, the
master report is reloaded from storage on each iteration. The `save_and_commit` function
is then called, which applies a diff to this report before saving it. Consequently, if a
commit's uploads are processed in multiple batches, the same diff will be applied
multiple times—once for each batch. This repeated application could lead to data
corruption or incorrect report states if the `apply_diff` operation is not idempotent.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| log.info("run_impl: Cleaning up intermediate reports") | ||
| cleanup_intermediate_reports(upload_ids) | ||
| merged_processing_results.extend(current_batch) | ||
| merged_upload_ids.extend(pending_upload_ids) |
There was a problem hiding this comment.
Diff applied multiple times in multi-batch finisher loop
High Severity
In _process_reports_with_lock, each loop iteration calls perform_report_merging (which reloads the master report from storage — already with the diff applied from the previous save_and_commit) and then save_and_commit applies the diff again. For commits with more than FINISHER_UPLOAD_MERGE_BATCH_SIZE (10) uploads, the diff is applied once per batch, corrupting coverage line annotation in the final report. The upload_merger.py counterpart correctly accumulates master_report in-memory across batches and applies the diff only once at the end.
Additional Locations (2)
| state.mark_upload_as_processed(id) | ||
|
|
||
| # we have only processed 8 out of 9. we want to do a batched merge | ||
| # We now reconstruct all processed uploads in one pass. |
There was a problem hiding this comment.
ProcessingState test suite broken by API change
Medium Severity
ProcessingState.__init__ was changed in this PR to require db_session as a mandatory third argument, but test_processing_state.py still instantiates ProcessingState(1234, uuid4().hex) everywhere — including in the test_batch_merging_many_uploads test that was modified in this very diff. Every test in the file will raise TypeError at instantiation. Additionally, the TestProcessingStateEmptyListGuards class and the parametrized tests still assert Redis calls (mock_redis.sadd, mock_redis.srem) that are no longer made by the DB-backed implementation.


Summary
upload_finisherrebuild its input from commit processing state and merge with callback payload byupload_idmergedin finisher idempotency terminal statesTest plan
ruff check --fix apps/worker/tasks/upload_finisher.py apps/worker/tasks/tests/unit/test_upload_finisher_task.py apps/worker/services/processing/processing.py apps/worker/services/tests/test_processing.pyruff format apps/worker/tasks/upload_finisher.py apps/worker/tasks/tests/unit/test_upload_finisher_task.py apps/worker/services/processing/processing.py apps/worker/services/tests/test_processing.pyMade with Cursor
Note
High Risk
Reworks the core upload pipeline (state tracking, task scheduling, and merge/finish behavior) and adds new cron/merge task paths, which could impact report correctness, notification timing, and lock/idempotency under concurrency and retries.
Overview
Makes upload merging/finishing commit-state-driven and gate-controlled.
ProcessingStateis migrated from Redis sets to databaseUpload.state_idqueries/updates, andupdate_uploads()now marks successful uploads asMERGED(with safer handling for missing error payloads).Changes task orchestration for coverage uploads. Coverage processing no longer uses a Celery chord to call the finisher; processors mark uploads
PROCESSEDand attempt to enqueueupload_finishervia a new Redis finisher gate (single-run per commit with TTL). The finisher is refactored to always reconstruct batches from commit state, run time-budgeted/batched merges with self-scheduled continuation/sweep followups, and to clear the gate on terminal outcomes.Adds recovery paths. Introduces
StuckUploadsCheckTaskcron to detect commits with long-stalePROCESSEDuploads and trigger a newupload_mergertask (with its own gate/metrics), providing watchdog/sweep/continuation behavior and post-processing (notify/PR comparison/timeseries/cache invalidation) when fully complete.Written by Cursor Bugbot for commit c48148b. This will update automatically on new commits. Configure here.