From 2b9f95dac5995cc4559f511673969c2a0301d871 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 2 May 2026 12:19:00 -0700 Subject: [PATCH] test(amber): drop stale Thread.sleep(1000) after COMPLETED state (#4680) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this PR? Remove the `Thread.sleep(1000)` workaround inside the result-reading block of `DataProcessingSpec` and `ReconfigurationSpec`. The sleep was annotated with a TODO calling it a workaround for "the issue of reporting `completed` status too early" — the current engine path is synchronous end-to-end, so the workaround is dead code. ```diff .map(terminalOpId => { - //TODO: remove the delay after fixing the issue of reporting "completed" status too early. - Thread.sleep(1000) val uri = getResultUriByLogicalPortId(...) ... }) ``` The synchronous path that emits `COMPLETED`: 1. `DataProcessor.outputOneTuple` handles `FinalizePort` → `outputManager.closeOutputStorageWriterIfNeeded(portId)` 2. `closeOutputStorageWriterIfNeeded` puts a terminate signal on the writer thread's queue and `join()`s it 3. The writer thread, on terminate, calls `IcebergTableWriter.close()` → `flushBuffer()` → `table.newAppend().appendFile(dataFile).commit()` 4. Only then does `DataProcessor` fire `portCompleted` to the controller 5. After all ports complete, controller emits `ExecutionStateUpdate(COMPLETED)` to the client 6. Read side does `IcebergDocument.get()` → `seekToUsableFile()` → `table.refresh()` before scan ### Any related issues, documentation, discussions? Closes #4679. ### How was this PR tested? Ran the two specs together with `-T 1` (sequential) on a clean local checkout, 5 consecutive runs: ``` sbt 'WorkflowExecutionService / Test / testOnly \ org.apache.texera.amber.engine.e2e.DataProcessingSpec \ org.apache.texera.amber.engine.e2e.ReconfigurationSpec' run 1: 21/21 passed in 39.2 s run 2: 21/21 passed in 39.5 s run 3: 21/21 passed in 43.2 s run 4: 21/21 passed in 37.3 s run 5: 21/21 passed in 36.9 s ``` Compared to the same two specs with the sleep on the same machine: | spec | before | after | delta | |---|---|---|---| | DataProcessingSpec | 36.5 s | 16.9 s | -19.6 s | | ReconfigurationSpec | 27.2 s | 18.7 s | -8.5 s | DataProcessingSpec saves more than 16 × 1 s because the sleep was inside a per-terminal-port `.map(...)` block — workflows with multiple terminal ports paid the cost more than once per test. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7, 1M context) Co-authored-by: Claude Opus 4.7 (1M context) --- .../org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala | 2 -- .../apache/texera/amber/engine/e2e/ReconfigurationSpec.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala index 69ee9c6a5fb..f93909f53f5 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala @@ -129,8 +129,6 @@ class DataProcessingSpec uri.nonEmpty }) .map(terminalOpId => { - //TODO: remove the delay after fixing the issue of reporting "completed" status too early. - Thread.sleep(1000) val uri = getResultUriByLogicalPortId( workflowContext.executionId, terminalOpId, diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala index 92dfba19de1..a125c1be004 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala @@ -132,8 +132,6 @@ class ReconfigurationSpec uri.nonEmpty }) .map(terminalOpId => { - //TODO: remove the delay after fixing the issue of reporting "completed" status too early. - Thread.sleep(1000) val uri = getResultUriByLogicalPortId( workflow.context.executionId, terminalOpId,