Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

Commit

Permalink
use classic scheduler for one-time jobs (#3776)
Browse files Browse the repository at this point in the history
The (cron) expression based scheduler is still used for the "daily" jobs
which will be executed every midnight. Those "daily" jobs however create
jobs for all kinds of events during the current day, which will have to
be executed only once. For those, it does not really make sense to convert
a timestamp to a cron expression, which the scheduler again with lots of
effort boils down again to a timestamp. Therefore they are now simply
executed with a calculated delay directly using the regular scheduler.

Signed-off-by: Simon Kaufmann <simon.kfm@googlemail.com>
  • Loading branch information
sjsf authored and maggu2810 committed Jun 30, 2017
1 parent d3de694 commit 1c0be53
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 81 deletions.
Expand Up @@ -7,41 +7,36 @@
*/
package org.eclipse.smarthome.binding.astro.handler;

import static java.util.stream.Collectors.toList;
import static org.eclipse.smarthome.core.scheduler.CronHelper.*;
import static org.eclipse.smarthome.core.thing.ThingStatus.*;
import static org.eclipse.smarthome.core.thing.type.ChannelKind.TRIGGER;
import static org.eclipse.smarthome.core.types.RefreshType.REFRESH;

import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.eclipse.smarthome.binding.astro.internal.config.AstroChannelConfig;
import org.eclipse.smarthome.binding.astro.internal.config.AstroThingConfig;
import org.eclipse.smarthome.binding.astro.internal.job.Job;
import org.eclipse.smarthome.binding.astro.internal.job.PositionalJob;
import org.eclipse.smarthome.binding.astro.internal.model.Planet;
import org.eclipse.smarthome.binding.astro.internal.util.PropertyUtils;
import org.eclipse.smarthome.core.scheduler.CronExpression;
import org.eclipse.smarthome.core.scheduler.Expression;
import org.eclipse.smarthome.core.scheduler.ExpressionThreadPoolManager;
import org.eclipse.smarthome.core.scheduler.ExpressionThreadPoolManager.ExpressionThreadPoolExecutor;
import org.eclipse.smarthome.core.thing.Channel;
import org.eclipse.smarthome.core.thing.ChannelUID;
import org.eclipse.smarthome.core.thing.Thing;
import org.eclipse.smarthome.core.thing.ThingUID;
import org.eclipse.smarthome.core.thing.binding.BaseThingHandler;
import org.eclipse.smarthome.core.types.Command;
import org.eclipse.smarthome.core.types.State;
Expand All @@ -56,24 +51,23 @@
*/
public abstract class AstroThingHandler extends BaseThingHandler {

private static final String DAILY_MIDNIGHT = "0 0 0 * * ? *";

/** Logger Instance */
protected final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/** Maximum number of scheduled jobs */
private static final int MAX_SCHEDULED_JOBS = 200;

/** Scheduler to schedule jobs */
private final ExpressionThreadPoolExecutor scheduledExecutor;

private int linkedPositionalChannels = 0;
protected AstroThingConfig thingConfig;
private final Lock monitor = new ReentrantLock();
private final Queue<Job> scheduledJobs;
private Job dailyJob;
private final Set<ScheduledFuture<?>> scheduledFutures = new HashSet<>();

public AstroThingHandler(Thing thing) {
super(thing);
scheduledExecutor = ExpressionThreadPoolManager.getExpressionScheduledPool("astro");
scheduledJobs = new LinkedBlockingQueue<>(MAX_SCHEDULED_JOBS);
}

@Override
Expand Down Expand Up @@ -174,40 +168,24 @@ private void restartJobs() {
stopJobs();
if (getThing().getStatus() == ONLINE) {
String thingUID = getThing().getUID().toString();
String typeId = getThing().getThingTypeUID().getId();
if (scheduledExecutor == null) {
logger.warn("Thread Pool Executor is not available");
return;
}
// Daily Job
Job dailyJob = getDailyJob();
if (dailyJob == null) {
logger.error("Daily job instance is not available");
return;
}
Expression midNightExpression = new CronExpression(DAILY_MIDNIGHT);
if (addJobToQueue(dailyJob)) {
scheduledExecutor.schedule(dailyJob, midNightExpression);
logger.info("Scheduled astro job-daily-{} at midnight for thing {}", typeId, thingUID);
}
dailyJob = getDailyJob();
scheduledExecutor.schedule(dailyJob, new CronExpression(DAILY_MIDNIGHT));
logger.debug("Scheduled {} at midnight", dailyJob);
// Execute daily startup job immediately
dailyJob.run();

// Repeat scheduled job every configured seconds
LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime futureTimeWithInterval = currentTime.plusSeconds(thingConfig.getInterval());
Date start = Date.from(futureTimeWithInterval.atZone(ZoneId.systemDefault()).toInstant());
// Repeat positional job every configured seconds
if (isPositionalChannelLinked()) {
Expression expression = new CronExpression(
createCronForRepeatEverySeconds(thingConfig.getInterval()), start);
Job positionalJob = new PositionalJob(thingUID);
if (addJobToQueue(positionalJob)) {
scheduledExecutor.schedule(positionalJob, expression);
logger.info("Scheduled astro job-positional with interval of {} seconds for thing {}",
thingConfig.getInterval(), thingUID);
}
// Execute positional startup job immediately
positionalJob.run();
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(positionalJob, 0,
thingConfig.getInterval(), TimeUnit.SECONDS);
scheduledFutures.add(future);
logger.info("Scheduled {} every {} seconds", positionalJob, thingConfig.getInterval());
}
}
} catch (ParseException ex) {
Expand All @@ -221,34 +199,28 @@ private void restartJobs() {
* Stops all jobs for this thing.
*/
private void stopJobs() {
ThingUID thingUID = getThing().getUID();
logger.debug("Stopping scheduled jobs for thing {}", getThing().getUID());
monitor.lock();
try {
if (scheduledExecutor != null) {
List<Job> jobsToRemove = scheduledJobs.stream().filter(this::isJobAssociatedWithThing)
.collect(toList());
if (jobsToRemove.isEmpty()) {
return;
if (dailyJob != null) {
scheduledExecutor.remove(dailyJob);
}
dailyJob = null;
}
for (ScheduledFuture<?> future : scheduledFutures) {
if (!future.isDone()) {
future.cancel(true);
}
logger.debug("Stopping Scheduled Jobs for thing {}", thingUID);
Consumer<Job> removalFromExecutor = scheduledExecutor::remove;
Consumer<Job> removalFromQueue = scheduledJobs::remove;
jobsToRemove.forEach(removalFromExecutor.andThen(removalFromQueue));
logger.debug("Stopped {} Scheduled Jobs for thing {}", jobsToRemove.size(), thingUID);
}
scheduledFutures.clear();
} catch (Exception ex) {
logger.error("{}", ex.getMessage(), ex);
} finally {
monitor.unlock();
}
}

private boolean isJobAssociatedWithThing(Job job) {
String thingUID = getThing().getUID().getAsString();
String jobThingUID = job.getThingUID();
return Objects.equals(thingUID, jobThingUID);
}

@Override
public void channelLinked(ChannelUID channelUID) {
linkedChannelChange(channelUID, 1);
Expand Down Expand Up @@ -297,23 +269,25 @@ public void triggerEvent(String channelId, String event) {
triggerChannel(getThing().getChannel(channelId).getUID(), event);
}

/**
* Returns the scheduler for the Astro jobs
*/
public ExpressionThreadPoolExecutor getScheduler() {
return scheduledExecutor;
}

/**
* Adds the provided {@link Job} to the queue (cannot be {@code null})
*
* @return {@code true} if the {@code job} is added to the queue, otherwise {@code false}
*/
public boolean addJobToQueue(Job job) {
if (job != null) {
return scheduledJobs.add(job);
public void schedule(Job job, Calendar eventAt) {
long sleepTime;
monitor.lock();
try {
sleepTime = eventAt.getTimeInMillis() - new Date().getTime();
ScheduledFuture<?> future = scheduler.schedule(job, sleepTime, TimeUnit.MILLISECONDS);
scheduledFutures.add(future);
} finally {
monitor.unlock();
}
if (logger.isDebugEnabled()) {
String formattedDate = DateFormatUtils.ISO_DATETIME_FORMAT.format(eventAt);
logger.debug("Scheduled {} in {}ms (at {})", job, sleepTime, formattedDate);
}
return false;
}

/**
Expand Down
Expand Up @@ -8,10 +8,8 @@
package org.eclipse.smarthome.binding.astro.internal.job;

import static java.util.Objects.isNull;
import static org.apache.commons.lang.time.DateFormatUtils.ISO_DATETIME_FORMAT;
import static org.eclipse.smarthome.binding.astro.AstroBindingConstants.*;
import static org.eclipse.smarthome.binding.astro.internal.util.DateTimeUtils.*;
import static org.eclipse.smarthome.core.scheduler.CronHelper.createCronFromCalendar;

import java.lang.invoke.MethodHandles;
import java.util.Calendar;
Expand All @@ -21,9 +19,6 @@
import org.eclipse.smarthome.binding.astro.internal.model.Planet;
import org.eclipse.smarthome.binding.astro.internal.model.Range;
import org.eclipse.smarthome.binding.astro.internal.model.SunPhaseName;
import org.eclipse.smarthome.core.scheduler.CronExpression;
import org.eclipse.smarthome.core.scheduler.Expression;
import org.eclipse.smarthome.core.scheduler.ExpressionThreadPoolManager.ExpressionThreadPoolExecutor;
import org.eclipse.smarthome.core.thing.Channel;
import org.eclipse.smarthome.core.thing.Thing;
import org.eclipse.smarthome.core.thing.binding.ThingHandler;
Expand Down Expand Up @@ -52,15 +47,7 @@ public static void schedule(String thingUID, AstroThingHandler astroHandler, Job
try {
Calendar today = Calendar.getInstance();
if (isSameDay(eventAt, today) && isTimeGreaterEquals(eventAt, today)) {
ExpressionThreadPoolExecutor executor = astroHandler.getScheduler();
if (executor != null) {
Expression cron = new CronExpression(createCronFromCalendar(eventAt));
if (astroHandler.addJobToQueue(job)) {
executor.schedule(job, cron);
String formattedDate = ISO_DATETIME_FORMAT.format(eventAt);
logger.debug("Scheduled astro job for thing {} at {}", thingUID, formattedDate);
}
}
astroHandler.schedule(job, eventAt);
}
} catch (Exception ex) {
logger.error("{}", ex.getMessage(), ex);
Expand Down

0 comments on commit 1c0be53

Please sign in to comment.