Skip to content

Commit

Permalink
[kie-roadmap-119] Scheduled jobs do not keep configured execution time (
Browse files Browse the repository at this point in the history
kiegroup#2292)

* [kie-roadmap-119] Scheduled jobs do not keep configured execution time

* [kie-roadmap-119] Make sure job gets rescheduled even if execution time exceeds nextRun value
  • Loading branch information
martinweiler authored and fjtirado committed Sep 15, 2023
1 parent d9bde80 commit f32c104
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 10 deletions.
65 changes: 65 additions & 0 deletions jbpm-services/jbpm-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@
<artifactId>hornetq-jms-server</artifactId>
<scope>test</scope>
</dependency>

<!-- byteman deps for testing -->
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-submit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-install</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-bmunit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -219,6 +241,49 @@
</testResources>

<plugins>
<plugin>
<!-- We need to fork always as we have tests which are configured by system properties when a JVM starts -->
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
<!-- Properties for Byteman which allows it to run on IBM Java as well -->
<argLine>
-javaagent:${project.build.directory}/agents/byteman.jar=port:9091,boot:${project.build.directory}/agents/byteman.jar
-Dfile.encoding=${project.build.sourceEncoding}
${jacoco.agent.line}
</argLine>
<systemPropertyVariables>
<org.jboss.byteman.contrib.bmunit.agent.inhibit>true</org.jboss.byteman.contrib.bmunit.agent.inhibit>
<org.jboss.byteman.allow.config.update>true</org.jboss.byteman.allow.config.update>
</systemPropertyVariables>
</configuration>
</plugin>

<!-- Byteman jar is copied to target/agents directory so Java is able to bootstrap it -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-agent</id>
<phase>process-test-classes</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
<destFileName>byteman.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
* <li>DateFormat - date format for further date related params - if not given yyyy-MM-dd is used (pattern of SimpleDateFormat class)</li>
* <li>EmfName - name of entity manager factory to be used for queries (valid persistence unit name)</li>
* <li>SingleRun - indicates if execution should be single run only (true|false)</li>
* <li>RepeatMode - defines how the next execution time is calculated
* <ul>
* <li>FIXED - Scheduled time + NextRun parameter value</li>
* <li>INTERVAL (or not present) - End of execution + NextRun parameter value (default)</li>
* </ul>
* </li>
* <li>NextRun - provides next execution time (valid time expression e.g. 1d, 5h, etc)</li>
* <li>OlderThan - indicates what logs should be deleted - older than given date</li>
* <li>OlderThanPeriod - indicated what logs should be deleted older than given time expression (valid time expression e.g. 1d, 5h, etc)</li>
Expand Down Expand Up @@ -125,15 +131,6 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
emfName = "org.jbpm.domain";
}
nextScheduleTimeAdd = TimeUnit.DAYS.toMillis(1);
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
}
String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun);
}

// get hold of persistence and create instance of audit service
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(emfName);
Expand Down Expand Up @@ -268,11 +265,40 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
executionResults.setData("ProcessInstanceLogRemoved", piLogsRemoved);
}
}
calculateNextRun(ctx);
executionResults.setData("BAMLogRemoved", 0L);
return executionResults;
}

private static boolean mightBeMore (int deleted, int recordPerTransaction) {
return recordPerTransaction > 0 && deleted >= recordPerTransaction;
}

private void calculateNextRun(CommandContext ctx) {
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
return;
}

String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
// if calculated next execution time is negative, schedule next run for immediate execution
nextScheduleTimeAdd = Math.max(DateTimeUtils.parseDateAsDuration(nextRun) - calculateExecutionTimeInMillis(ctx), 100);
}
}

private long calculateExecutionTimeInMillis(CommandContext ctx) {
long executionTimeInMillis = 0;
String repeatMode = (String) ctx.getData("RepeatMode");
if( "fixed".equalsIgnoreCase(repeatMode) ) {
// return diff between scheduled time and actual time
Date scheduledExecutionTime = (Date) ctx.getData("scheduledExecutionTime");
executionTimeInMillis = Instant.now().minus(scheduledExecutionTime.toInstant().toEpochMilli(), ChronoUnit.MILLIS).toEpochMilli();
logger.debug("Calculated execution time {}ms, based on scheduled execution time {}", executionTimeInMillis, scheduledExecutionTime);
}
// no calculation required for interval (or empty) mode
return executionTimeInMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void executeGivenJob(RequestInfo request) {
}
// add class loader so internally classes can be created with valid (kjar) deployment
ctx.setData("ClassLoader", cl);


// add scheduled execution time
ctx.setData("scheduledExecutionTime", request.getTime());

cmd = classCacheManager.findCommand(request.getCommandName(), cl);
// increment execution counter directly to cover both success and failure paths
request.setExecutions(request.getExecutions() + 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.executor;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.jboss.byteman.contrib.bmunit.BMScript;
import org.jboss.byteman.contrib.bmunit.BMUnitConfig;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.test.util.ExecutorTestUtil;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
import org.kie.api.runtime.query.QueryContext;
import org.kie.test.util.db.PoolingDataSourceWrapper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(BMUnitRunner.class)
@BMUnitConfig(loadDirectory = "target/test-classes")
public class LogCleanupCommandTest {

private PoolingDataSourceWrapper pds;

protected ExecutorService executorService;
protected EntityManagerFactory emf = null;

@Before
public void setUp() {
pds = ExecutorTestUtil.setupPoolingDataSource();
emf = Persistence.createEntityManagerFactory("org.jbpm.executor");

executorService = ExecutorServiceFactory.newExecutorService(emf);

executorService.init();
executorService.setThreadPoolSize(1);
executorService.setInterval(3);
}

@After
public void tearDown() {
executorService.clearAllErrors();
executorService.clearAllRequests();

executorService.destroy();
if (emf != null) {
emf.close();
}
pds.close();
}

protected CountDownAsyncJobListener configureListener(int threads) {
CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(threads);
((ExecutorServiceImpl) executorService).addAsyncJobListener(countDownListener);

return countDownListener;
}


@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupSingleRunTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("SingleRun", "true");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(0, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(0, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunIntervalTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be at least 11 seconds (10s next interval + artificial slowdown from byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff > 11000);
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunFixedTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
ctxCMD.setData("RepeatMode", "fixed");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be less than 11 seconds (10s next interval, regardless of artifial slowdown by byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff < 11000);
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunFixedIntervalTooSmallTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);
// this delay will be invoked 3 times during the test, resulting in an overall delay of 2s
System.setProperty("byteman.jpaaudit.sleep", "666");

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "1s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
ctxCMD.setData("RepeatMode", "fixed");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
// check if the job has been rescheduled
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be around 2 seconds (2nd job rescheduled immediately)
long diff = nextExecution - firstExecution;
assertTrue(diff >= 2000 && diff < 3000);
System.clearProperty("byteman.jpaaudit.sleep");
}
}

0 comments on commit f32c104

Please sign in to comment.