Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions engine/src/main/java/org/apache/hop/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,13 @@ private Result executeFromStart(
res.setResult(false);
}

// Toolbar / API stop sets the engine's stopped flag; the last completed action can still
// return a successful Result with stopped=false. Propagate stop into Result so consumers
// (e.g. transactional workflow commit/rollback) behave correctly.
if (isStopped() && res != null) {
res.setStopped(true);
}

return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ public Result startExecution() {
// All fine? Commit!
//
try {
if (result.isResult() && !result.isStopped() && result.getNrErrors() == 0) {
if (result.isResult()
&& !result.isStopped()
&& !workflow.isStopped()
&& result.getNrErrors() == 0) {
try {
database.commit(true);
workflow
Expand Down
104 changes: 103 additions & 1 deletion engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@
package org.apache.hop.workflow;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hop.core.HopEnvironment;
import org.apache.hop.core.Result;
import org.apache.hop.core.annotations.Action;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopPluginException;
import org.apache.hop.core.exception.HopRuntimeException;
import org.apache.hop.core.logging.HopLogStore;
import org.apache.hop.core.logging.LogLevel;
import org.apache.hop.core.plugins.ActionPluginType;
import org.apache.hop.core.plugins.PluginRegistry;
import org.apache.hop.workflow.action.ActionBase;
import org.apache.hop.workflow.action.ActionMeta;
import org.apache.hop.workflow.action.IAction;
import org.apache.hop.workflow.actions.dummy.ActionDummy;
import org.apache.hop.workflow.actions.start.ActionStart;
import org.apache.hop.workflow.engine.IWorkflowEngine;
import org.apache.hop.workflow.engines.local.LocalWorkflowEngine;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -90,9 +102,51 @@ public void run() {
}
}

/** Used by {@link ActionBlockingForWorkflowStopTest} to coordinate with the test thread. */
private static volatile CountDownLatch blockingEntered;

private static volatile CountDownLatch blockingRelease;

@Action(id = "ActionBlockingForWorkflowStopTest", name = "Blocking action for workflow stop test")
public static class ActionBlockingForWorkflowStopTest extends ActionBase implements IAction {

public ActionBlockingForWorkflowStopTest() {
super("", "", "ActionBlockingForWorkflowStopTest");
}

@Override
public Result execute(Result prevResult, int nr) throws HopException {
CountDownLatch entered = blockingEntered;
CountDownLatch release = blockingRelease;
if (entered != null) {
entered.countDown();
}
if (release != null) {
try {
if (!release.await(60, TimeUnit.SECONDS)) {
throw new HopException("Timed out waiting in blocking test action");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HopException(e);
}
}
Result r = prevResult == null ? new Result() : prevResult.clone();
r.setResult(true);
r.setNrErrors(0);
r.setStopped(false);
return r;
}
}

@BeforeAll
static void beforeClass() throws HopException {
static void beforeClass() throws HopException, HopPluginException {
HopEnvironment.init();
PluginRegistry.getInstance()
.registerPluginClass(
ActionBlockingForWorkflowStopTest.class.getName(),
ActionPluginType.class,
Action.class);
}

@BeforeEach
Expand Down Expand Up @@ -137,4 +191,52 @@ private void startThreads(Runnable run1, Runnable run2, CountDownLatch start)
thread1.join();
thread2.join();
}

/**
* When stop is requested while an action is running, that action can still finish with a
* successful Result that has stopped=false. The workflow result must still report stopped so
* transactional handling can roll back.
*/
@Test
void workflowResultReflectsStopWhenStoppedDuringActionExecution() throws Exception {
blockingEntered = new CountDownLatch(1);
blockingRelease = new CountDownLatch(1);
try {
WorkflowMeta meta = new WorkflowMeta();
meta.setName("workflow-stop-result-test");

ActionStart start = new ActionStart("START");
ActionMeta startMeta = new ActionMeta(start);
meta.addAction(startMeta);

ActionBlockingForWorkflowStopTest blocking = new ActionBlockingForWorkflowStopTest();
blocking.setName("blocking");
ActionMeta blockingMeta = new ActionMeta(blocking);
meta.addAction(blockingMeta);

ActionDummy neverRun = new ActionDummy();
neverRun.setName("never-run");
ActionMeta neverRunMeta = new ActionMeta(neverRun);
meta.addAction(neverRunMeta);

meta.addWorkflowHop(new WorkflowHopMeta(startMeta, blockingMeta));
WorkflowHopMeta hopToNext = new WorkflowHopMeta(blockingMeta, neverRunMeta);
hopToNext.setUnconditional();
meta.addWorkflowHop(hopToNext);

LocalWorkflowEngine engine = new LocalWorkflowEngine(meta);
engine.setLogLevel(LogLevel.MINIMAL);

Thread runner = new Thread(engine::startExecution);
runner.start();
assertTrue(blockingEntered.await(30, TimeUnit.SECONDS));
engine.stopExecution();
blockingRelease.countDown();
runner.join(60000);
assertTrue(engine.getResult().isStopped(), "Result should reflect user stop for rollback");
} finally {
blockingEntered = null;
blockingRelease = null;
}
}
}
Loading