test(amber): drop stale Thread.sleep(1000) after COMPLETED state#4680
Merged
Yicong-Huang merged 1 commit intoMay 2, 2026
Merged
Conversation
The sleep was a workaround for an early engine version where the COMPLETED state could fire before the sink's iceberg commit landed. Today's path is fully synchronous: closeOutputStorageWriterIfNeeded joins the writer thread, the writer's close() commits the iceberg table, and only then does the worker emit portCompleted. Local 5x sequential runs of the two specs pass 21/21 with the sleep removed. Closes apache#4679. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
63889d3 to
9f8ad9b
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4680 +/- ##
============================================
+ Coverage 46.17% 53.02% +6.84%
+ Complexity 1999 1995 -4
============================================
Files 1013 817 -196
Lines 38165 24630 -13535
Branches 3712 1921 -1791
============================================
- Hits 17623 13060 -4563
+ Misses 19769 10912 -8857
+ Partials 773 658 -115
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Yicong-Huang
added a commit
that referenced
this pull request
May 3, 2026
…#4874) ### What changes were proposed in this PR? Backport [#4680](#4680) (commit `598ed0f7f037b96ebe5549759158e46ba736b372` on `main`) onto `release/v1.1.0-incubating`. Removes the dead `Thread.sleep(1000)` + TODO inside the result-reading block in `ReconfigurationSpec.scala` and `DataProcessingSpec.scala`, matching what already landed on `main`. ### Any related issues, documentation, discussions? Backports #4680. Unblocks the backport leg of #4871. ### How was this PR tested? Cherry-pick of an already-reviewed and merged commit; applies cleanly. CI on this PR exercises the change against the release branch's full Python matrix. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this PR?
Remove the
Thread.sleep(1000)workaround inside the result-reading block ofDataProcessingSpecandReconfigurationSpec. The sleep was annotated with a TODO calling it a workaround for "the issue of reportingcompletedstatus too early" — the current engine path is synchronous end-to-end, so the workaround is dead code..map(terminalOpId => { - //TODO: remove the delay after fixing the issue of reporting "completed" status too early. - Thread.sleep(1000) val uri = getResultUriByLogicalPortId(...) ... })The synchronous path that emits
COMPLETED:DataProcessor.outputOneTuplehandlesFinalizePort→outputManager.closeOutputStorageWriterIfNeeded(portId)closeOutputStorageWriterIfNeededputs a terminate signal on the writer thread's queue andjoin()s itIcebergTableWriter.close()→flushBuffer()→table.newAppend().appendFile(dataFile).commit()DataProcessorfireportCompletedto the controllerExecutionStateUpdate(COMPLETED)to the clientIcebergDocument.get()→seekToUsableFile()→table.refresh()before scanAny related issues, documentation, discussions?
Closes #4679.
How was this PR tested?
Ran the two specs together with
-T 1(sequential) on a clean local checkout, 20 consecutive runs, 21/21 tests passing every run, 0 flakes:Run wall-clock distribution: min 35.6 s, mean 39.5 s, max 45.9 s.
Compared to the same two specs with the sleep on the same machine:
DataProcessingSpec saves more than 16 × 1 s because the sleep was inside a per-terminal-port
.map(...)block — workflows with multiple terminal ports paid the cost more than once per test.Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7, 1M context)