Skip to content

Commit

Permalink
fix(engine): skip FailedJobListener for already re-acquired jobs
Browse files Browse the repository at this point in the history
related to CAM-14619

Co-authored-by: Tobias Metzke-Bernstein <tobias.metzke@camunda.com>
Co-authored-by: tasso94 <3015690+tasso94@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 21, 2022
1 parent b1c0741 commit 7b5702e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public FailedJobListener(CommandExecutor commandExecutor, JobFailureCollector jo
}

public Void execute(CommandContext commandContext) {
if (isJobReacquired(commandContext)) {
// skip failed listener if job has been already re-acquired
LOG.debugFailedJobListenerSkipped(jobFailureCollector.getJobId());
return null;
}

initTotalRetries(commandContext);

logJobFailure(commandContext);
Expand All @@ -56,6 +62,17 @@ public Void execute(CommandContext commandContext) {
return null;
}

protected boolean isJobReacquired(CommandContext commandContext) {
// if persisted job's lockExpirationTime is different, then it's been already re-acquired
JobEntity persistedJob = commandContext.getJobManager().findJobById(jobFailureCollector.getJobId());
JobEntity job = jobFailureCollector.getJob();

if (persistedJob == null || persistedJob.getLockExpirationTime() == null) {
return false;
}
return !persistedJob.getLockExpirationTime().equals(job.getLockExpirationTime());
}

private void initTotalRetries(CommandContext commandContext) {
totalRetries = commandContext.getProcessEngineConfiguration().getFailedJobListenerMaxRetries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ public void infoJobExecutorDoesNotHandleBatchJobs(ProcessEngineConfigurationImpl
config.getBatchJobPriority());
}

public void debugFailedJobListenerSkipped(String jobId) {
logDebug("031", "Failed job listener skipped for job {} because it's been already re-acquired", jobId);
}

public ProcessEngineException jobExecutorPriorityRangeException(String reason) {
return new ProcessEngineException(exceptionMessage("031", "Invalid configuration for job executor priority range. Reason: {}", reason));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.camunda.bpm.engine.impl.management.UpdateJobDefinitionSuspensionStateBuilderImpl;
import org.camunda.bpm.engine.impl.management.UpdateJobSuspensionStateBuilderImpl;
import org.camunda.bpm.engine.impl.persistence.entity.JobEntity;
import org.camunda.bpm.engine.impl.util.ClockUtil;
import org.camunda.bpm.engine.management.JobDefinition;
import org.camunda.bpm.engine.repository.ProcessDefinition;
import org.camunda.bpm.engine.runtime.Job;
Expand Down Expand Up @@ -98,7 +99,8 @@ public void initServices() {
}

@After
public void deleteJobs() {
public void tearDown() {
ClockUtil.reset();
for(final Job job : managementService.createJobQuery().list()) {

processEngineConfiguration.getCommandExecutorTxRequired().execute(new Command<Void>() {
Expand Down Expand Up @@ -137,6 +139,51 @@ public void testCompetingJobExecutionDeleteJobDuringExecution() {
assertTrue(threadOne.exception instanceof OptimisticLockingException);
}

@Test
public void shouldCompleteTimeoutRetryWhenTimeoutedJobCompletesInbetween() {
// given a simple process with an async service task
testRule.deploy(Bpmn
.createExecutableProcess("process")
.startEvent()
.serviceTask("task")
.camundaAsyncBefore()
.camundaExpression("${true}")
.endEvent()
.done());
runtimeService.startProcessInstanceByKey("process");
Job currentJob = managementService.createJobQuery().singleResult();

// and a job is executed until before the command context is closed
JobExecutionThread threadOne = new JobExecutionThread(currentJob.getId());
threadOne.startAndWaitUntilControlIsReturned();

// and lock is expiring in the meantime
ClockUtil.offset((long) engineRule.getProcessEngineConfiguration().getJobExecutor().getLockTimeInMillis() + 10_000L);

// and job is acquired again
JobAcquisitionThread acquisitionThread = new JobAcquisitionThread();
acquisitionThread.startAndWaitUntilControlIsReturned();
acquisitionThread.proceedAndWaitTillDone();

// and the job is executed again until before the command context is closed
JobExecutionThread threadTwo = new JobExecutionThread(currentJob.getId());
threadTwo.startAndWaitUntilControlIsReturned();

// and the first execution finishes
threadOne.proceedAndWaitTillDone();

// when
threadTwo.proceedAndWaitTillDone();

// then
assertThat(threadOne.exception)
.isInstanceOf(OptimisticLockingException.class)
.hasMessageContaining("DELETE MessageEntity")
.hasMessageContaining("Entity was updated by another transaction concurrently");
assertThat(threadTwo.exception).isNull();
assertThat(managementService.createJobQuery().count()).isEqualTo(0L);
}

@Test
@Deployment
public void testCompetingJobExecutionDefaultRetryStrategy() {
Expand Down Expand Up @@ -444,7 +491,8 @@ public synchronized void startAndWaitUntilControlIsReturned() {
public void run() {
try {
JobFailureCollector jobFailureCollector = new JobFailureCollector(jobId);
ExecuteJobHelper.executeJob(jobId, processEngineConfiguration.getCommandExecutorTxRequired(),jobFailureCollector, new ControlledCommand<Void>(activeThread, new ExecuteJobsCmd(jobId, jobFailureCollector)));
ExecuteJobHelper.executeJob(jobId, processEngineConfiguration.getCommandExecutorTxRequired(),jobFailureCollector,
new ControlledCommand<>(activeThread, new ExecuteJobsCmd(jobId, jobFailureCollector)));

}
catch (OptimisticLockingException e) {
Expand All @@ -467,7 +515,7 @@ public void run() {
try {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
acquiredJobs = processEngineConfiguration.getCommandExecutorTxRequired()
.execute(new ControlledCommand<AcquiredJobs>(activeThread, new AcquireJobsCmd(jobExecutor)));
.execute(new ControlledCommand<>(activeThread, new AcquireJobsCmd(jobExecutor)));

} catch (OptimisticLockingException e) {
this.exception = e;
Expand Down Expand Up @@ -572,5 +620,4 @@ public void run() {
}
}
}

}

0 comments on commit 7b5702e

Please sign in to comment.