From 5a1c9b8bc8b21c0b0dc7d131130a3f17767fb51e Mon Sep 17 00:00:00 2001 From: Hans Van Akelyen Date: Fri, 17 Apr 2026 13:59:02 +0200 Subject: [PATCH] revert commit when manually stopping workflow in transactional setting, fixes #4681 --- .../org/apache/hop/workflow/Workflow.java | 7 ++ .../engines/local/LocalWorkflowEngine.java | 5 +- .../org/apache/hop/workflow/WorkflowTest.java | 104 +++++++++++++++++- 3 files changed, 114 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/org/apache/hop/workflow/Workflow.java b/engine/src/main/java/org/apache/hop/workflow/Workflow.java index 6db1dbff841..689d044e191 100644 --- a/engine/src/main/java/org/apache/hop/workflow/Workflow.java +++ b/engine/src/main/java/org/apache/hop/workflow/Workflow.java @@ -997,6 +997,13 @@ private Result executeFromStart( res.setResult(false); } + // Toolbar / API stop sets the engine's stopped flag; the last completed action can still + // return a successful Result with stopped=false. Propagate stop into Result so consumers + // (e.g. transactional workflow commit/rollback) behave correctly. + if (isStopped() && res != null) { + res.setStopped(true); + } + return res; } diff --git a/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java b/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java index 96091eca78e..61f32968a37 100644 --- a/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java +++ b/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java @@ -139,7 +139,10 @@ public Result startExecution() { // All fine? Commit! // try { - if (result.isResult() && !result.isStopped() && result.getNrErrors() == 0) { + if (result.isResult() + && !result.isStopped() + && !workflow.isStopped() + && result.getNrErrors() == 0) { try { database.commit(true); workflow diff --git a/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java b/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java index 2361800f6de..1fae0e3faae 100644 --- a/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java +++ b/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java @@ -18,14 +18,26 @@ package org.apache.hop.workflow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hop.core.HopEnvironment; +import org.apache.hop.core.Result; +import org.apache.hop.core.annotations.Action; import org.apache.hop.core.exception.HopException; +import org.apache.hop.core.exception.HopPluginException; import org.apache.hop.core.exception.HopRuntimeException; import org.apache.hop.core.logging.HopLogStore; import org.apache.hop.core.logging.LogLevel; +import org.apache.hop.core.plugins.ActionPluginType; +import org.apache.hop.core.plugins.PluginRegistry; +import org.apache.hop.workflow.action.ActionBase; +import org.apache.hop.workflow.action.ActionMeta; +import org.apache.hop.workflow.action.IAction; +import org.apache.hop.workflow.actions.dummy.ActionDummy; +import org.apache.hop.workflow.actions.start.ActionStart; import org.apache.hop.workflow.engine.IWorkflowEngine; import org.apache.hop.workflow.engines.local.LocalWorkflowEngine; import org.junit.jupiter.api.BeforeAll; @@ -90,9 +102,51 @@ public void run() { } } + /** Used by {@link ActionBlockingForWorkflowStopTest} to coordinate with the test thread. */ + private static volatile CountDownLatch blockingEntered; + + private static volatile CountDownLatch blockingRelease; + + @Action(id = "ActionBlockingForWorkflowStopTest", name = "Blocking action for workflow stop test") + public static class ActionBlockingForWorkflowStopTest extends ActionBase implements IAction { + + public ActionBlockingForWorkflowStopTest() { + super("", "", "ActionBlockingForWorkflowStopTest"); + } + + @Override + public Result execute(Result prevResult, int nr) throws HopException { + CountDownLatch entered = blockingEntered; + CountDownLatch release = blockingRelease; + if (entered != null) { + entered.countDown(); + } + if (release != null) { + try { + if (!release.await(60, TimeUnit.SECONDS)) { + throw new HopException("Timed out waiting in blocking test action"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new HopException(e); + } + } + Result r = prevResult == null ? new Result() : prevResult.clone(); + r.setResult(true); + r.setNrErrors(0); + r.setStopped(false); + return r; + } + } + @BeforeAll - static void beforeClass() throws HopException { + static void beforeClass() throws HopException, HopPluginException { HopEnvironment.init(); + PluginRegistry.getInstance() + .registerPluginClass( + ActionBlockingForWorkflowStopTest.class.getName(), + ActionPluginType.class, + Action.class); } @BeforeEach @@ -137,4 +191,52 @@ private void startThreads(Runnable run1, Runnable run2, CountDownLatch start) thread1.join(); thread2.join(); } + + /** + * When stop is requested while an action is running, that action can still finish with a + * successful Result that has stopped=false. The workflow result must still report stopped so + * transactional handling can roll back. + */ + @Test + void workflowResultReflectsStopWhenStoppedDuringActionExecution() throws Exception { + blockingEntered = new CountDownLatch(1); + blockingRelease = new CountDownLatch(1); + try { + WorkflowMeta meta = new WorkflowMeta(); + meta.setName("workflow-stop-result-test"); + + ActionStart start = new ActionStart("START"); + ActionMeta startMeta = new ActionMeta(start); + meta.addAction(startMeta); + + ActionBlockingForWorkflowStopTest blocking = new ActionBlockingForWorkflowStopTest(); + blocking.setName("blocking"); + ActionMeta blockingMeta = new ActionMeta(blocking); + meta.addAction(blockingMeta); + + ActionDummy neverRun = new ActionDummy(); + neverRun.setName("never-run"); + ActionMeta neverRunMeta = new ActionMeta(neverRun); + meta.addAction(neverRunMeta); + + meta.addWorkflowHop(new WorkflowHopMeta(startMeta, blockingMeta)); + WorkflowHopMeta hopToNext = new WorkflowHopMeta(blockingMeta, neverRunMeta); + hopToNext.setUnconditional(); + meta.addWorkflowHop(hopToNext); + + LocalWorkflowEngine engine = new LocalWorkflowEngine(meta); + engine.setLogLevel(LogLevel.MINIMAL); + + Thread runner = new Thread(engine::startExecution); + runner.start(); + assertTrue(blockingEntered.await(30, TimeUnit.SECONDS)); + engine.stopExecution(); + blockingRelease.countDown(); + runner.join(60000); + assertTrue(engine.getResult().isStopped(), "Result should reflect user stop for rollback"); + } finally { + blockingEntered = null; + blockingRelease = null; + } + } }