Skip to content

Commit

Permalink
Compute lock owner and lock expiration time if job does not have one …
Browse files Browse the repository at this point in the history
…when locking job scope
  • Loading branch information
filiphr committed Nov 14, 2023
1 parent c130d98 commit a3ae8a5
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ protected void handleJobDeleteInternal(Job job) {
@Override
protected void lockJobScopeInternal(Job job) {
CaseInstanceEntityManager caseInstanceEntityManager = cmmnEngineConfiguration.getCaseInstanceEntityManager();
String lockOwner;
Date lockExpirationTime;
String lockOwner = null;
Date lockExpirationTime = null;

if (job instanceof JobInfoEntity) {
lockOwner = ((JobInfoEntity) job).getLockOwner();
lockExpirationTime = ((JobInfoEntity) job).getLockExpirationTime();
} else {
}
if (lockOwner == null || lockExpirationTime == null) {
int lockMillis = cmmnEngineConfiguration.getAsyncExecutor().getAsyncJobLockTimeInMillis();
GregorianCalendar lockCal = new GregorianCalendar();
lockCal.setTime(cmmnEngineConfiguration.getClock().getCurrentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ protected void lockJobScopeInternal(Job job) {
ExecutionEntityManager executionEntityManager = getExecutionEntityManager();
ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId());
if (execution != null) {
String lockOwner;
Date lockExpirationTime;
String lockOwner = null;
Date lockExpirationTime = null;

if (job instanceof JobInfoEntity) {
lockOwner = ((JobInfoEntity) job).getLockOwner();
lockExpirationTime = ((JobInfoEntity) job).getLockExpirationTime();
} else {
}

if (lockOwner == null || lockExpirationTime == null) {
int lockMillis = processEngineConfiguration.getAsyncExecutor().getAsyncJobLockTimeInMillis();
GregorianCalendar lockCal = new GregorianCalendar();
lockCal.setTime(processEngineConfiguration.getClock().getCurrentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.impl.test.HistoryTestHelper;
import org.flowable.engine.impl.test.PluggableFlowableTestCase;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.test.Deployment;
import org.flowable.job.service.impl.asyncexecutor.AsyncJobExecutorConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;

Expand Down Expand Up @@ -57,4 +60,35 @@ public void testExclusiveJobs() {

}

@Test
@Deployment
public void testParallelGatewayExclusiveJobs() {
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("parallelExclusiveServiceTasks")
.variable("counter", 0L)
.start();

assertThat(runtimeService.getVariable(processInstance.getId(), "counter"))
.isEqualTo(0L);

AsyncJobExecutorConfiguration asyncExecutorConfiguration = processEngineConfiguration.getAsyncExecutorConfiguration();
boolean originalGlobalAcquireLockEnabled = asyncExecutorConfiguration.isGlobalAcquireLockEnabled();
CollectingAsyncRunnableExecutionExceptionHandler executionExceptionHandler = new CollectingAsyncRunnableExecutionExceptionHandler();
try {
asyncExecutorConfiguration.setGlobalAcquireLockEnabled(true);
processEngineConfiguration.getJobServiceConfiguration()
.getAsyncRunnableExecutionExceptionHandlers()
.add(0, executionExceptionHandler);
waitForJobExecutorToProcessAllJobs(15000, 200);
} finally {
asyncExecutorConfiguration.setGlobalAcquireLockEnabled(originalGlobalAcquireLockEnabled);
processEngineConfiguration.getJobServiceConfiguration()
.getAsyncRunnableExecutionExceptionHandlers()
.remove(executionExceptionHandler);
}

assertThat(executionExceptionHandler.getExceptions()).isEmpty();
assertThat(runtimeService.getVariable(processInstance.getId(), "counter"))
.isEqualTo(3L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.engine.test.bpmn.async;

import java.util.ArrayList;
import java.util.Collection;

import org.flowable.job.api.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;

/**
* @author Filip Hrisafov
*/
public class CollectingAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {

protected final Collection<Throwable> exceptions = new ArrayList<>();

@Override
public boolean handleException(JobServiceConfiguration jobServiceConfiguration, JobInfo job, Throwable exception) {
exceptions.add(exception);
return false;
}

public Collection<Throwable> getExceptions() {
return exceptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testHistoryJobBulkUpdate() {
return null;
});

for (Job job : managementService.createJobQuery().list()) {
for (HistoryJob job : managementService.createHistoryJobQuery().list()) {
assertThat(((HistoryJobEntity) job).getLockOwner()).isEqualTo("test");
assertThat(((HistoryJobEntity) job).getLockExpirationTime()).isNotNull();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:flowable="http://flowable.org/bpmn"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:design="http://flowable.org/design" typeLanguage="http://www.w3.org/2001/XMLSchema"
expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://flowable.org/test" design:palette="flowable-engage-process-palette"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL http://www.omg.org/spec/BPMN/2.0/20100501/BPMN20.xsd">
<process id="parallelExclusiveServiceTasks" name="Parallel Exclusive Service Tasks" isExecutable="true" flowable:candidateStarterGroups="flowableUser">
<extensionElements>
<design:stencilid><![CDATA[BPMNDiagram]]></design:stencilid>
<design:creationdate><![CDATA[2023-11-13T15:20:20.369Z]]></design:creationdate>
<design:modificationdate><![CDATA[2023-11-13T15:29:47.381Z]]></design:modificationdate>
</extensionElements>
<scriptTask id="bpmnTask_3" name="Script Task" scriptFormat="groovy" flowable:async="true" flowable:exclusive="true">
<script>
<![CDATA[
Thread.sleep(500);
execution.setVariable("counter", counter + 1);]]>
</script>
</scriptTask>
<scriptTask id="bpmnTask_5" name="Script Task" scriptFormat="groovy" flowable:async="true" flowable:exclusive="true">
<script>
<![CDATA[
Thread.sleep(500);
execution.setVariable("counter", counter + 1);]]>
</script>
</scriptTask>
<scriptTask id="bpmnTask_7" name="Script Task" scriptFormat="groovy" flowable:async="true" flowable:exclusive="true">
<script>
<![CDATA[
Thread.sleep(500);
execution.setVariable("counter", counter + 1);]]>
</script>
</scriptTask>
<userTask id="bpmnTask_13" name="User task" flowable:assignee="${initiator}" flowable:formFieldValidation="false">
<extensionElements>
<flowable:task-candidates-type><![CDATA[all]]></flowable:task-candidates-type>
<design:stencilid><![CDATA[FormTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</userTask>
<parallelGateway id="bpmnGateway_1">
<extensionElements>
<design:stencilid><![CDATA[ParallelGateway]]></design:stencilid>
</extensionElements>
</parallelGateway>
<parallelGateway id="bpmnGateway_9">
<extensionElements>
<design:stencilid><![CDATA[ParallelGateway]]></design:stencilid>
</extensionElements>
</parallelGateway>
<startEvent id="startnoneevent1" flowable:initiator="initiator" flowable:formFieldValidation="false">
<extensionElements>
<flowable:work-form-field-validation><![CDATA[false]]></flowable:work-form-field-validation>
<design:stencilid><![CDATA[StartNoneEvent]]></design:stencilid>
</extensionElements>
</startEvent>
<endEvent id="bpmnEndEvent_15">
<extensionElements>
<design:stencilid><![CDATA[EndNoneEvent]]></design:stencilid>
</extensionElements>
</endEvent>
<sequenceFlow id="bpmnSequenceFlow_10" sourceRef="bpmnTask_3" targetRef="bpmnGateway_9">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_11" sourceRef="bpmnTask_5" targetRef="bpmnGateway_9">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_12" sourceRef="bpmnTask_7" targetRef="bpmnGateway_9">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_16" sourceRef="bpmnTask_13" targetRef="bpmnEndEvent_15">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_4" sourceRef="bpmnGateway_1" targetRef="bpmnTask_3">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_6" sourceRef="bpmnGateway_1" targetRef="bpmnTask_5">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_8" sourceRef="bpmnGateway_1" targetRef="bpmnTask_7">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_14" sourceRef="bpmnGateway_9" targetRef="bpmnTask_13">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_2" sourceRef="startnoneevent1" targetRef="bpmnGateway_1">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
</process>
<bpmndi:BPMNDiagram id="BPMNDiagram_parallelExclusiveServiceTasks">
<bpmndi:BPMNPlane bpmnElement="parallelExclusiveServiceTasks" id="BPMNPlane_parallelExclusiveServiceTasks">
<bpmndi:BPMNShape bpmnElement="bpmnTask_3" id="BPMNShape_bpmnTask_3">
<omgdc:Bounds height="80.0" width="100.0" x="549.0" y="111.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_5" id="BPMNShape_bpmnTask_5">
<omgdc:Bounds height="80.0" width="100.0" x="549.0" y="236.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_7" id="BPMNShape_bpmnTask_7">
<omgdc:Bounds height="80.0" width="100.0" x="549.0" y="371.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_13" id="BPMNShape_bpmnTask_13">
<omgdc:Bounds height="80.0" width="100.0" x="839.0" y="236.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnGateway_1" id="BPMNShape_bpmnGateway_1">
<omgdc:Bounds height="40.0" width="40.0" x="409.0" y="256.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnGateway_9" id="BPMNShape_bpmnGateway_9">
<omgdc:Bounds height="40.0" width="40.0" x="749.0" y="256.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="startnoneevent1" id="BPMNShape_startnoneevent1">
<omgdc:Bounds height="30.0" width="30.0" x="317.0" y="261.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnEndEvent_15" id="BPMNShape_bpmnEndEvent_15">
<omgdc:Bounds height="28.0" width="28.0" x="989.0" y="262.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_11" id="BPMNEdge_bpmnSequenceFlow_11" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="20.0" flowable:targetDockerY="20.0">
<omgdi:waypoint x="649.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="749.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_10" id="BPMNEdge_bpmnSequenceFlow_10" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="20.0" flowable:targetDockerY="20.0">
<omgdi:waypoint x="649.0" y="151.0"></omgdi:waypoint>
<omgdi:waypoint x="769.0" y="151.0"></omgdi:waypoint>
<omgdi:waypoint x="769.0" y="256.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_12" id="BPMNEdge_bpmnSequenceFlow_12" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="20.0" flowable:targetDockerY="20.0">
<omgdi:waypoint x="649.0" y="411.0"></omgdi:waypoint>
<omgdi:waypoint x="769.0" y="411.0"></omgdi:waypoint>
<omgdi:waypoint x="769.0" y="296.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_14" id="BPMNEdge_bpmnSequenceFlow_14" flowable:sourceDockerX="20.0" flowable:sourceDockerY="20.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="789.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="839.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_16" id="BPMNEdge_bpmnSequenceFlow_16" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="14.0" flowable:targetDockerY="14.0">
<omgdi:waypoint x="939.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="989.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_4" id="BPMNEdge_bpmnSequenceFlow_4" flowable:sourceDockerX="20.0" flowable:sourceDockerY="20.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="429.0" y="256.0"></omgdi:waypoint>
<omgdi:waypoint x="429.0" y="151.0"></omgdi:waypoint>
<omgdi:waypoint x="549.0" y="151.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_2" id="BPMNEdge_bpmnSequenceFlow_2" flowable:sourceDockerX="15.0" flowable:sourceDockerY="15.0"
flowable:targetDockerX="20.0" flowable:targetDockerY="20.0">
<omgdi:waypoint x="347.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="409.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_8" id="BPMNEdge_bpmnSequenceFlow_8" flowable:sourceDockerX="20.0" flowable:sourceDockerY="20.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="429.0" y="296.0"></omgdi:waypoint>
<omgdi:waypoint x="429.0" y="411.0"></omgdi:waypoint>
<omgdi:waypoint x="549.0" y="411.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_6" id="BPMNEdge_bpmnSequenceFlow_6" flowable:sourceDockerX="20.0" flowable:sourceDockerY="20.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="449.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="549.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>

0 comments on commit a3ae8a5

Please sign in to comment.