Skip to content

Commit

Permalink
SingleThreadedScheduler impl and test
Browse files Browse the repository at this point in the history
  • Loading branch information
keznikl committed Nov 6, 2013
1 parent 07036c8 commit a0c52ee
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void stopTask(final Task task) {
TaskInfo ti = tasks.get(task);

if( ti != null && ti.state != States.RUNNING ){
task.setTriggerListener(null);
task.unsetTriggerListener();


ti.timer.cancel();
Expand Down
46 changes: 23 additions & 23 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package cz.cuni.mff.d3s.deeco.scheduler;

import cz.cuni.mff.d3s.deeco.executor.ExecutionListener;
import cz.cuni.mff.d3s.deeco.executor.Executor;
import cz.cuni.mff.d3s.deeco.task.Task;


/**
* Interface Scheduler for LocalTimeScheduler(and others if needed)
*
* @author Andranik Muradyan <muradian@d3s.mff.cuni.cz>
*
*/
public interface Scheduler extends ExecutionListener {
public void start();
public void stop();
public void addTask( Task task );
public void removeTask( Task task );

public void executionFailed(Task task, Exception e);
public void executionCompleted( Task task );
public void setExecutor(Executor executor);

package cz.cuni.mff.d3s.deeco.scheduler;

import cz.cuni.mff.d3s.deeco.executor.ExecutionListener;
import cz.cuni.mff.d3s.deeco.executor.Executor;
import cz.cuni.mff.d3s.deeco.task.Task;


/**
* Interface Scheduler for LocalTimeScheduler(and others if needed)
*
* @author Andranik Muradyan <muradian@d3s.mff.cuni.cz>
*
*/
public interface Scheduler extends ExecutionListener {
public void start();
public void stop();
public void addTask( Task task );
public void removeTask( Task task );

public void executionFailed(Task task, Exception e);
public void executionCompleted( Task task );
public void setExecutor(Executor executor);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
/*
* parts taken from java.util.Timer
*
* Copyright 1999-2007 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Sun designates this
* particular file as subject to the "Classpath" exception as provided
* by Sun in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
package cz.cuni.mff.d3s.deeco.scheduler;

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.junit.internal.runners.statements.RunAfters;
import java.util.Set;

import cz.cuni.mff.d3s.deeco.executor.Executor;
import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger;
Expand All @@ -19,7 +45,9 @@ public class SingleThreadedScheduler implements Scheduler {
* mostly important for postponing the Tasks' execution when the previous
* execution took longer than the period.
*/
Map<Task, SchedulerEvent> periodicEvents;
Map<Task, SchedulerEvent> periodicEvents = new HashMap<>();

Set<Task> allTasks = new HashSet<>();

/**
* The scheduler task queue. This data structure is shared with the scheduler
Expand Down Expand Up @@ -56,23 +84,24 @@ protected void finalize() throws Throwable {
*/
@Override
public void executionCompleted(Task task) {

if (task instanceof InvokeAndWaitTask)
task.notify();

synchronized (queue) {
SchedulerEvent sTask = periodicEvents.get(task);
SchedulerEvent event = periodicEvents.get(task);
// continue only for periodic tasks
if (sTask == null)
if (event == null)
return;

synchronized (sTask.lock) {
synchronized (event.lock) {
// if the periodic task execution took more than it remained till the next period
if (sTask.nextExecutionTime < System.currentTimeMillis()) {
queue.rescheduleTask(sTask, System.currentTimeMillis() + sTask.period);
if (event.nextExecutionTime < System.currentTimeMillis()) {
queue.rescheduleTask(event, System.currentTimeMillis() + event.period);
}
}
}

if (task instanceof InvokeAndWaitTask)
task.notify();

}

@Override
Expand Down Expand Up @@ -107,31 +136,45 @@ public void stop() {
* @throws IllegalArgumentException of a null task is passed as an argument.
*/
@Override
public void addTask(Task task) {
public void addTask(Task task) {
if (task == null)
throw new IllegalArgumentException("The task cannot be null");
synchronized (queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException(
"Scheduler already terminated.");

synchronized (allTasks) {
if (allTasks.contains(task))
return;

if (task.getPeriodicTrigger() != null) {
SchedulerEvent sTask = new SchedulerEvent(task, task.getPeriodicTrigger());
scheduleNow(sTask, task.getPeriodicTrigger().getPeriod());
periodicEvents.put(task, sTask);
}
}
task.setTriggerListener(new TaskTriggerListener() {
@Override
public void triggered(Task task, Trigger trigger) {
synchronized (queue) {
if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted)
return;

scheduleNow(new SchedulerEvent(task, trigger), 0);
synchronized (queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException(
"Scheduler already terminated.");

if (task.getPeriodicTrigger() != null) {
SchedulerEvent sTask = new SchedulerEvent(task, task.getPeriodicTrigger());
scheduleNow(sTask, task.getPeriodicTrigger().getPeriod());
periodicEvents.put(task, sTask);
}
}
});
task.setTriggerListener(new TaskTriggerListener() {
@Override
public void triggered(Task task, Trigger trigger) {
synchronized (queue) {
if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted)
return;

boolean isScheduled;
synchronized (allTasks) {
isScheduled = allTasks.contains(task);
}
if (isScheduled) {
scheduleNow(new SchedulerEvent(task, trigger), 0);
}
}
}
});

allTasks.add(task);
}
}

/**
Expand All @@ -152,12 +195,18 @@ private void scheduleNow(SchedulerEvent sTask, long period) {
*/
@Override
public void removeTask(Task task) {
task.unsetTriggerListener();
synchronized(queue) {
// cancel all the periodic/triggered schedules of the task
queue.cancelAll(task);
periodicEvents.remove(task);
}
synchronized (allTasks) {
if (!allTasks.contains(task))
return;

task.unsetTriggerListener();
synchronized(queue) {
// cancel all the periodic/triggered schedules of the task
queue.cancelAll(task);
periodicEvents.remove(task);
}
allTasks.remove(task);
}
}

@Override
Expand Down Expand Up @@ -199,7 +248,7 @@ class SchedulerThread extends Thread {
* temporarily stopped and all the scheduled tasks have to bee ignored. Note
* that this field is protected by queue's monitor!
*/
boolean tasksMayBeExecuted = true;
boolean tasksMayBeExecuted = false;

/**
* Our Scheduler's queue. We store this reference in preference to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import cz.cuni.mff.d3s.deeco.executor.Executor;

/**
* Factory for Scheduler implementation tests
*
*
* @author Jaroslav Keznikl <keznikl@d3s.mff.cuni.cz>
*
Expand Down
Loading

0 comments on commit a0c52ee

Please sign in to comment.