Skip to content

Commit

Permalink
Add ability to name timers / scheduled jobs (openhab#2911)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <github@klug.nrw>
  • Loading branch information
J-N-K committed Jun 18, 2022
1 parent e295833 commit c2ec97c
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.ZonedDateTime;

import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.xtext.xbase.XExpression;
import org.eclipse.xtext.xbase.lib.Procedures.Procedure0;
import org.eclipse.xtext.xbase.lib.Procedures.Procedure1;
Expand Down Expand Up @@ -76,11 +77,25 @@ public static Object callScript(String scriptName) throws ScriptExecutionExcepti
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimer(ZonedDateTime instant, Procedure0 closure) {
return createTimer(null, instant, closure);
}

/**
* Schedules a block of code for later execution.
*
* @param identifier an optional identifier
* @param instant the point in time when the code should be executed
* @param closure the code block to execute
*
* @return a handle to the created timer, so that it can be canceled or rescheduled
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimer(@Nullable String identifier, ZonedDateTime instant, Procedure0 closure) {
Scheduler scheduler = ScriptServiceUtil.getScheduler();

return new TimerImpl(scheduler, instant, () -> {
closure.apply();
});
}, identifier);
}

/**
Expand All @@ -94,11 +109,25 @@ public static Timer createTimer(ZonedDateTime instant, Procedure0 closure) {
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimerWithArgument(ZonedDateTime instant, Object arg1, Procedure1<Object> closure) {
return createTimerWithArgument(null, instant, arg1, closure);
}

/**
* Schedules a block of code (with argument) for later execution
*
* @param identifier an optional identifier
* @param instant the point in time when the code should be executed
* @param arg1 the argument to pass to the code block
* @param closure the code block to execute
*
* @return a handle to the created timer, so that it can be canceled or rescheduled
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimerWithArgument(@Nullable String identifier, ZonedDateTime instant, Object arg1, Procedure1<Object> closure) {
Scheduler scheduler = ScriptServiceUtil.getScheduler();

return new TimerImpl(scheduler, instant, () -> {
closure.apply(arg1);
});
}, identifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ public class TimerImpl implements Timer {
private final Scheduler scheduler;
private final ZonedDateTime startTime;
private final SchedulerRunnable runnable;
private final @Nullable String identifier;
private ScheduledCompletableFuture<?> future;

public TimerImpl(Scheduler scheduler, ZonedDateTime startTime, SchedulerRunnable runnable) {
this(scheduler, startTime, runnable, null);
}

public TimerImpl(Scheduler scheduler, ZonedDateTime startTime, SchedulerRunnable runnable, @Nullable String identifier) {
this.scheduler = scheduler;
this.startTime = startTime;
this.runnable = runnable;
this.identifier = identifier;

future = scheduler.schedule(runnable, startTime.toInstant());
future = scheduler.schedule(runnable, identifier, startTime.toInstant());
}

@Override
Expand Down
9 changes: 9 additions & 0 deletions bundles/org.openhab.core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@

<name>openHAB Core :: Bundles :: Core</name>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.scheduler.ScheduledCompletableFuture;
import org.openhab.core.scheduler.Scheduler;
import org.openhab.core.scheduler.SchedulerRunnable;
Expand Down Expand Up @@ -96,6 +97,12 @@ public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, Te
return add(delegate.schedule(runnable, temporalAdjuster));
}

@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster) {
return add(delegate.schedule(runnable, identifier, temporalAdjuster));
}

private <T> ScheduledCompletableFuture<T> add(ScheduledCompletableFuture<T> t) {
synchronized (scheduledJobs) {
scheduledJobs.add(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,7 +68,7 @@ public ScheduledCompletableFuture<Instant> after(Duration duration) {

@Override
public <T> ScheduledCompletableFuture<T> after(Callable<T> callable, Duration duration) {
return afterInternal(new ScheduledCompletableFutureOnce<>(duration), callable);
return afterInternal(new ScheduledCompletableFutureOnce<>(null, duration), callable);
}

private <T> ScheduledCompletableFutureOnce<T> afterInternal(ScheduledCompletableFutureOnce<T> deferred,
Expand All @@ -89,7 +90,8 @@ private <T> ScheduledCompletableFutureOnce<T> afterInternal(ScheduledCompletable
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.warn("Scheduled job failed and stopped", e);
logger.warn("Scheduled job '{}' failed and stopped",
Objects.requireNonNullElse(deferred.identifier, "<unknown>"), e);
deferred.completeExceptionally(e);
}
}, duration, TimeUnit.MILLISECONDS);
Expand All @@ -114,7 +116,7 @@ public <T> ScheduledCompletableFuture<T> before(CompletableFuture<T> promise, Du
runnable.run();
}
};
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>(timeout);
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>(null, timeout);
Callable<T> callable = () -> {
wrappedPromise.completeExceptionally(new TimeoutException());
return null;
Expand Down Expand Up @@ -144,7 +146,7 @@ public ScheduledCompletableFuture<Instant> at(Instant instant) {
@Override
public <T> ScheduledCompletableFuture<T> at(Callable<T> callable, Instant instant) {
return atInternal(
new ScheduledCompletableFutureOnce<>(ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())),
new ScheduledCompletableFutureOnce<>(null, ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())),
callable);
}

Expand All @@ -155,25 +157,30 @@ private <T> ScheduledCompletableFuture<T> atInternal(ScheduledCompletableFutureO

@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, TemporalAdjuster temporalAdjuster) {
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>(
ZonedDateTime.now());
return schedule(runnable, null, temporalAdjuster);
}

schedule(schedule, runnable, temporalAdjuster);
@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster) {
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>(identifier,
ZonedDateTime.now());
schedule(schedule, runnable, identifier, temporalAdjuster);
return schedule;
}

private <T> void schedule(ScheduledCompletableFutureRecurring<T> recurringSchedule, SchedulerRunnable runnable,
TemporalAdjuster temporalAdjuster) {
@Nullable String identifier, TemporalAdjuster temporalAdjuster) {
final Temporal newTime = recurringSchedule.getScheduledTime().with(temporalAdjuster);
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>(
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>(identifier,
ZonedDateTime.from(newTime));

deferred.thenAccept(v -> {
if (temporalAdjuster instanceof SchedulerTemporalAdjuster) {
final SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster;

if (!schedulerTemporalAdjuster.isDone(newTime)) {
schedule(recurringSchedule, runnable, temporalAdjuster);
schedule(recurringSchedule, runnable, identifier, temporalAdjuster);
return;
}
}
Expand All @@ -195,9 +202,10 @@ private <T> void schedule(ScheduledCompletableFutureRecurring<T> recurringSchedu
*/
private static class ScheduledCompletableFutureRecurring<T> extends ScheduledCompletableFutureOnce<T> {
private @Nullable volatile ScheduledCompletableFuture<T> scheduledPromise;
private @Nullable String identifier;

public ScheduledCompletableFutureRecurring(ZonedDateTime scheduledTime) {
super(scheduledTime);
public ScheduledCompletableFutureRecurring(@Nullable String identifier, ZonedDateTime scheduledTime) {
super(identifier, scheduledTime);
exceptionally(e -> {
synchronized (this) {
if (e instanceof CancellationException) {
Expand Down Expand Up @@ -245,12 +253,14 @@ public ZonedDateTime getScheduledTime() {
private static class ScheduledCompletableFutureOnce<T> extends CompletableFuture<T>
implements ScheduledCompletableFuture<T> {
private ZonedDateTime scheduledTime;
private @Nullable String identifier;

public ScheduledCompletableFutureOnce(Duration duration) {
this(ZonedDateTime.now().plusNanos(duration.toNanos()));
public ScheduledCompletableFutureOnce(@Nullable String identifier, Duration duration) {
this(identifier, ZonedDateTime.now().plusNanos(duration.toNanos()));
}

public ScheduledCompletableFutureOnce(ZonedDateTime scheduledTime) {
public ScheduledCompletableFutureOnce(@Nullable String identifier, ZonedDateTime scheduledTime) {
this.identifier = identifier;
this.scheduledTime = scheduledTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,17 @@ public interface Scheduler {
* @return A {@link ScheduledCompletableFuture}
*/
<T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable callable, TemporalAdjuster temporalAdjuster);

/**
* Schedules the callable once or repeating using the temporalAdjuster to determine the
* time the callable should run. Runs until the job is cancelled or if the temporalAdjuster
* method {@link SchedulerTemporalAdjuster#isDone()) returns true.
*
* @param callable Provides the result
* @param an optional identifier for this job
* @param temporalAdjuster the temperalAdjuster to return the time the callable should run
* @return A {@link ScheduledCompletableFuture}
*/
<T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable callable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import org.openhab.core.scheduler.ScheduledCompletableFuture;
import org.openhab.core.scheduler.SchedulerRunnable;
import org.openhab.core.scheduler.SchedulerTemporalAdjuster;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;

/**
* Test class for {@link SchedulerImpl}.
Expand Down Expand Up @@ -222,24 +228,41 @@ public void testScheduleCancel() throws InterruptedException {
@Test
@Timeout(value = 15, unit = TimeUnit.SECONDS)
public void testScheduleException() throws InterruptedException {
// add logging interceptor
Logger logger = (Logger) LoggerFactory.getLogger(SchedulerImpl.class);
logger.setLevel(Level.DEBUG);
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.start();
logger.addAppender(listAppender);

Semaphore s = new Semaphore(0);
TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter();
SchedulerRunnable runnable = () -> {
// Pass a exception not very likely thrown by the scheduler it self to avoid missing real exceptions.
// Pass an exception not very likely thrown by the scheduler itself to avoid missing real exceptions.
throw new FileNotFoundException("testBeforeTimeoutException");
};

ScheduledCompletableFuture<@Nullable Void> schedule = scheduler.schedule(runnable, temporalAdjuster);
ScheduledCompletableFuture<@Nullable Void> schedule = scheduler.schedule(runnable, "myScheduledJob",
temporalAdjuster);
schedule.getPromise().exceptionally(e -> {
if (e instanceof FileNotFoundException) {
s.release();
}
return null;
});

s.acquire(1);
Thread.sleep(1000); // wait a little longer to see if not more are scheduled.
Thread.sleep(300); // wait a little longer to see if not more are scheduled.

logger.detachAppender(listAppender);

assertEquals(0, s.availablePermits(), "Scheduler should not have released more after cancel");
assertEquals(0, temporalAdjuster.getCount(), "Scheduler should have run 0 time");

assertEquals(1, listAppender.list.size());
ILoggingEvent loggingEvent = listAppender.list.get(0);
assertEquals(Level.WARN, loggingEvent.getLevel());
assertEquals("Scheduled job 'myScheduledJob' failed and stopped", loggingEvent.getFormattedMessage());
}

@Test
Expand Down

0 comments on commit c2ec97c

Please sign in to comment.