From 6118fcb6819737293c7daf0d4e3759732e5e6d66 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 27 May 2026 14:41:43 -0400 Subject: [PATCH 1/3] Disable cache on java prism vr gradle task. --- runners/prism/java/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index c89974cb6ea5..9357515f36c2 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -185,6 +185,7 @@ def sickbayTests = [ def createPrismValidatesRunnerTask = { name, environmentType -> Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { description "PrismRunner Java $environmentType ValidatesRunner suite" + outputs.upToDateWhen { false } classpath = configurations.validatesRunner var prismBuildTask = dependsOn(':runners:prism:build') From 06d4cbbc6d6f1af05ebd0b1dd7d6cfc9e90b96e8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 28 May 2026 12:35:53 -0400 Subject: [PATCH 2/3] Fix race conditions in Prism ElementManager Cond Broadcasts Ensures 'refreshCond.Broadcast()' is fully synchronized with its mutex lock 'refreshCond.L' during pipeline termination and processing-time timer scheduling ('wakeUpAt'). This avoids race conditions causing lost wake-up signals and indefinite test hangs. --- .../beam/runners/prism/internal/engine/elementmanager.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index f720be20e375..3949d1af3248 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -387,8 +387,11 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.pendingElements.Wait() slog.Debug("no more pending elements: terminating pipeline") cancelFn(fmt.Errorf("elementManager out of elements, cleaning up")) - // Ensure the watermark evaluation goroutine exits. + // Ensure the watermark evaluation goroutine exits by locking the mutex + // before broadcasting, preventing a lost wake-up signal. + em.refreshCond.L.Lock() em.refreshCond.Broadcast() + em.refreshCond.L.Unlock() }() // Watermark evaluation goroutine. go func() { @@ -2496,7 +2499,9 @@ func (em *ElementManager) wakeUpAt(t mtime.Time) { // only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream). go func(fireAt time.Time) { time.AfterFunc(time.Until(fireAt), func() { + em.refreshCond.L.Lock() em.refreshCond.Broadcast() + em.refreshCond.L.Unlock() }) }(t.ToTime()) } From abe1f00cf885fdf5787b20c87831a64f5bbb47a6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 28 May 2026 13:55:05 -0400 Subject: [PATCH 3/3] Trigger post commit python versions test. --- .github/trigger_files/beam_PostCommit_Python_Versions.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json b/.github/trigger_files/beam_PostCommit_Python_Versions.json index 9cc78c7d1c6c..8b2c8c445c1f 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 4 + "revision": 5 }