Skip to content

Commit

Permalink
ARTEMIS-2926 Scheduled task executions are skipped randomly
Browse files Browse the repository at this point in the history
Making Scheduled task to be more reliable when
using scheduledComponent.delay() method and saving
periodic tasks to be skipped although on correct timing
  • Loading branch information
franz1981 authored and clebertsuconic committed Oct 28, 2020
1 parent 647151b commit e2c1848
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
Expand All @@ -41,15 +42,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
private volatile ScheduledFuture future;
private volatile boolean isStarted;
private ScheduledFuture future;
private final boolean onDemand;

long lastTime = 0;

private final AtomicInteger delayed = new AtomicInteger(0);
// The start/stop actions shouldn't interact concurrently with delay so it doesn't need to be volatile
private AtomicBoolean bookedForRunning;

/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
Expand All @@ -73,6 +72,8 @@ public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorServ
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this.bookedForRunning = new AtomicBoolean(false);
this.isStarted = false;
}

/**
Expand All @@ -89,12 +90,7 @@ public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorServ
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this.executor = null;
this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this(scheduledExecutorService, null, initialDelay, checkPeriod, timeUnit, onDemand);
}

/**
Expand Down Expand Up @@ -150,11 +146,11 @@ public ActiveMQScheduledComponent(long checkPeriod, TimeUnit timeUnit, boolean o

@Override
public synchronized void start() {
if (future != null) {
if (isStarted) {
// already started
return;
}

isStarted = true;
if (scheduledExecutorService == null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
startedOwnScheduler = true;
Expand All @@ -165,10 +161,9 @@ public synchronized void start() {
return;
}

this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);

if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
final AtomicBoolean booked = this.bookedForRunning;
future = scheduledExecutorService.scheduleWithFixedDelay(() -> runForScheduler(booked), initialDelay >= 0 ? initialDelay : period, period, timeUnit);
} else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
Expand All @@ -188,15 +183,26 @@ public ClassLoader run() {

}

public void delay() {
int value = delayed.incrementAndGet();
if (value > 10) {
delayed.decrementAndGet();
} else {
// We only schedule up to 10 periods upfront.
// this is to avoid a window where a current one would be running and a next one is coming.
// in theory just 2 would be enough. I'm using 10 as a precaution here.
scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
/**
* A delay request can succeed only if:
* <ul>
* <li>there is no other pending delay request
* <li>there is no pending execution request
* </ul>
* <p>
* When a delay request succeed it schedule a new execution to happen in {@link #getPeriod()}.<br>
*/
public boolean delay() {
final AtomicBoolean booked = this.bookedForRunning;
if (!booked.compareAndSet(false, true)) {
return false;
}
try {
scheduledExecutorService.schedule(() -> bookedRunForScheduler(booked), period, timeUnit);
return true;
} catch (RejectedExecutionException e) {
booked.set(false);
throw e;
}
}

Expand Down Expand Up @@ -261,7 +267,14 @@ public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
}

@Override
public void stop() {
public synchronized void stop() {
if (!isStarted) {
return;
}
isStarted = false;
// Replace the existing one: a new periodic task or any new delay after stop
// won't interact with the previously running ones
this.bookedForRunning = new AtomicBoolean(false);
if (future != null) {
future.cancel(false);
future = null;
Expand All @@ -275,8 +288,8 @@ public void stop() {
}

@Override
public synchronized boolean isStarted() {
return future != null;
public boolean isStarted() {
return isStarted;
}

// this will restart the scheduled component upon changes
Expand All @@ -287,35 +300,43 @@ private void restartIfNeeded() {
}
}

final Runnable runForExecutor = new Runnable() {
@Override
public void run() {
if (onDemand && delayed.get() > 0) {
delayed.decrementAndGet();
}
private void runForExecutor(AtomicBoolean booked) {
// It unblocks:
// - a new delay request
// - next periodic run request (in case of executor != null)
// Although tempting, don't move this one after ActiveMQScheduledComponent.this.run():
// - it can cause "delay" to change semantic ie a racing delay while finished executing the task, won't succeed
// - it won't prevent "slow tasks" to accumulate, because slowness cannot be measured inside running method;
// it just cause skipping runs for perfectly timed executions too
boolean alwaysTrue = booked.compareAndSet(true, false);
assert alwaysTrue;
ActiveMQScheduledComponent.this.run();
}

if (!onDemand && lastTime > 0) {
if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
return;
private void bookedRunForScheduler(AtomicBoolean booked) {
assert booked.get();
if (executor != null) {
try {
executor.execute(() -> runForExecutor(booked));
} catch (RejectedExecutionException e) {
if (booked != null) {
booked.set(false);
}
throw e;
}

lastTime = System.currentTimeMillis();

ActiveMQScheduledComponent.this.run();
} else {
runForExecutor(booked);
}
};

final Runnable runForScheduler = new Runnable() {
@Override
public void run() {
if (executor != null) {
executor.execute(runForExecutor);
} else {
runForExecutor.run();
}
}

private void runForScheduler(AtomicBoolean booked) {
if (!booked.compareAndSet(false, true)) {
// let's skip this execution because there is:
// - a previously submitted period task yet to start -> executor is probably overbooked!
// - a pending delay request
return;
}
};
bookedRunForScheduler(booked);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ public void run() {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}

@Test
public void testSubMillisDelay() throws InterruptedException {
final CountDownLatch triggered = new CountDownLatch(2);
final long nsInterval = TimeUnit.MICROSECONDS.toNanos(900);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, nsInterval, TimeUnit.NANOSECONDS, false) {

@Override
public void run() {
triggered.countDown();
}
};
local.start();
Assert.assertTrue(triggered.await(10, TimeUnit.SECONDS));
local.stop();
}

@Test
public void testVerifyInitialDelayChanged() {
final long initialDelay = 10;
Expand Down

0 comments on commit e2c1848

Please sign in to comment.