Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

Synchronous Execution of Astro Startup Jobs and minor fixes #3746

Merged
merged 1 commit into from Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -71,11 +69,9 @@ public abstract class AstroThingHandler extends BaseThingHandler {
protected AstroThingConfig thingConfig;
private final Lock monitor = new ReentrantLock();
private final Queue<Job> scheduledJobs;
private final List<Future<?>> futures;

public AstroThingHandler(Thing thing) {
super(thing);
futures = new CopyOnWriteArrayList<>();
scheduledExecutor = ExpressionThreadPoolManager.getExpressionScheduledPool("astro");
scheduledJobs = new LinkedBlockingQueue<>(MAX_SCHEDULED_JOBS);
}
Expand Down Expand Up @@ -194,8 +190,8 @@ private void restartJobs() {
scheduledExecutor.schedule(dailyJob, midNightExpression);
logger.info("Scheduled astro job-daily-{} at midnight for thing {}", typeId, thingUID);
}
// Execute startup job immediately
futures.add(scheduler.submit(dailyJob));
// Execute daily startup job immediately
dailyJob.run();

// Repeat scheduled job every configured seconds
LocalDateTime currentTime = LocalDateTime.now();
Expand All @@ -210,8 +206,8 @@ private void restartJobs() {
logger.info("Scheduled astro job-positional with interval of {} seconds for thing {}",
thingConfig.getInterval(), thingUID);
}
// Execute positional job immediately
futures.add(scheduler.submit(positionalJob));
// Execute positional startup job immediately
positionalJob.run();
}
}
} catch (ParseException ex) {
Expand All @@ -228,7 +224,6 @@ private void stopJobs() {
ThingUID thingUID = getThing().getUID();
monitor.lock();
try {
cancelFutures();
if (scheduledExecutor != null) {
List<Job> jobsToRemove = scheduledJobs.stream().filter(this::isJobAssociatedWithThing)
.collect(toList());
Expand All @@ -248,27 +243,6 @@ private void stopJobs() {
}
}

/**
* Cancels all submitted futures
*/
private void cancelFutures() {
ThingUID thingUID = getThing().getUID();
monitor.lock();
try {
if (futures.isEmpty()) {
return;
}
logger.debug("Cancelling futures for thing {}", thingUID);
futures.forEach(f -> f.cancel(true));
futures.clear();
logger.debug("Cancelled futures for thing {}", thingUID);
} catch (Exception ex) {
logger.error("{}", ex.getMessage(), ex);
} finally {
monitor.unlock();
}
}

private boolean isJobAssociatedWithThing(Job job) {
String thingUID = getThing().getUID().getAsString();
String jobThingUID = job.getThingUID();
Expand Down
Expand Up @@ -21,36 +21,6 @@ public String getThingUID() {
return thingUID;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (thingUID == null ? 0 : thingUID.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AbstractJob other = (AbstractJob) obj;
if (thingUID == null) {
if (other.thingUID != null) {
return false;
}
} else if (!thingUID.equals(other.thingUID)) {
return false;
}
return true;
}

/**
* Ensures the truth of an expression involving one or more parameters to the
* calling method.
Expand Down