From cc748717839f167f5b9e0e898c3b0f8c54095db9 Mon Sep 17 00:00:00 2001 From: Jaroslav Keznikl Date: Wed, 6 Nov 2013 15:37:26 +0100 Subject: [PATCH 1/5] single threaded scheduler with a queue (beta) --- .../scheduler/SingleThreadedScheduler.java | 521 ++++++++++++++++++ 1 file changed, 521 insertions(+) create mode 100644 jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java new file mode 100644 index 000000000..a829d50f2 --- /dev/null +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java @@ -0,0 +1,521 @@ +package cz.cuni.mff.d3s.deeco.scheduler; + +import java.util.Arrays; +import java.util.Date; +import java.util.Map; + +import cz.cuni.mff.d3s.deeco.executor.Executor; +import cz.cuni.mff.d3s.deeco.task.Task; +import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; + +public class SingleThreadedScheduler implements Scheduler { + Map periodicTasks; + + /** + * The scheduler task queue. This data structure is shared with the scheduler + * thread. The scheduler produces tasks, via its various schedule calls, + * and the scheduler thread consumes, executing scheduler tasks as appropriate, + * and removing them from the queue when they're obsolete. + */ + private TaskQueue queue = new TaskQueue(); + + /** + * The scheduler thread. + */ + private SchedulerThread thread = new SchedulerThread(queue); + + /** + * This object causes the scheduler's task execution thread to exit + * gracefully when there are no live references to the Scheduler object and no + * tasks in the scheduler queue. It is used in preference to a finalizer on + * Scheduler as such a finalizer would be susceptible to a subclass's + * finalizer forgetting to call it. + */ + private Object threadReaper = new Object() { + protected void finalize() throws Throwable { + synchronized(queue) { + thread.newTasksMayBeScheduled = false; + queue.notify(); // In case queue is empty. + } + } + }; + + @Override + public void executionCompleted(Task task) { + + synchronized (queue) { + SchedulerTask sTask = periodicTasks.get(task); + // continue only for periodic tasks + if (sTask == null) + return; + + // if the periodic task execution took more than it remained till the next period + if (sTask.nextExecutionTime < System.currentTimeMillis()) { + queue.rescheduleTask(sTask, System.currentTimeMillis() + sTask.period); + } + } + + } + + @Override + public void executionFailed(Task task, Exception e) { + executionCompleted(task); + } + + @Override + public void start() { + + if (!thread.isAlive()) + thread.start(); + + synchronized(queue) { + thread.tasksMayBeExecuted = true; + queue.notify(); // In case queue is empty. + } + } + + /** + * Temporarily stop the scheduler. + */ + @Override + public void stop() { + synchronized(queue) { + thread.tasksMayBeExecuted = false; + } + } + + /** + * @throws IllegalStateException if scheduler thread already terminated. + * @throws IllegalArgumentException of a null task is passed as an argument. + */ + @Override + public void addTask(Task task) { + if (task == null) + throw new IllegalArgumentException("The task cannot be null"); + synchronized (queue) { + if (!thread.newTasksMayBeScheduled) + throw new IllegalStateException( + "Scheduler already terminated."); + + if (task.getSchedulingPeriod() > 0) { + SchedulerTask sTask = new SchedulerTask(task); + scheduleNow(sTask, task.getSchedulingPeriod()); + periodicTasks.put(task, sTask); + } + } + task.setTriggerListener(new TaskTriggerListener() { + @Override + public void triggered(Task task) { + synchronized (queue) { + if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted) + return; + + scheduleNow(new SchedulerTask(task), 0); + } + } + }); + } + + /** + * Note that this method has to be explicitly protected by queue's monitor! + */ + private void scheduleNow(SchedulerTask sTask, long period) { + sTask.nextExecutionTime = System.currentTimeMillis(); // start immediately + sTask.period = period; + sTask.state = SchedulerTask.SCHEDULED; + + queue.add(sTask); + if (queue.getMin() == sTask) + queue.notify(); + } + + /** + * + */ + @Override + public void removeTask(Task task) { + task.unsetTriggerListener(); + synchronized(queue) { + // remove all the periodic/triggered schedules of the task + queue.removeAll(task); + periodicTasks.remove(task); + } + } + + @Override + public void setExecutor(Executor executor) { + synchronized (thread.executorLock) { + thread.executor = executor; + } + } +} + + +/** + * This "helper class" implements the scheduler's task execution thread, which + * waits for tasks on the scheduler queue, executions them when they fire, + * reschedules repeating tasks, and removes cancelled tasks and spent + * non-repeating tasks from the queue. + */ +class SchedulerThread extends Thread { + /** + * This flag is set to false by the reaper to inform us that there + * are no more live references to our Scheduler object. Once this flag + * is true and there are no more tasks in our queue, there is no + * work left for us to do, so we terminate gracefully. Note that + * this field is protected by queue's monitor! + */ + boolean newTasksMayBeScheduled = true; + + + /** + * This flag is set to false by scheduler to inform us that the scheduler is + * temporarily stopped and all the scheduled tasks have to bee ignored. Note + * that this field is protected by queue's monitor! + */ + boolean tasksMayBeExecuted = true; + + /** + * Our Scheduler's queue. We store this reference in preference to + * a reference to the Scheduler so the reference graph remains acyclic. + * Otherwise, the Scheduler would never be garbage-collected and this + * thread would never go away. + */ + private TaskQueue queue; + + + Object executorLock = new Object(); + + Executor executor; + + SchedulerThread(TaskQueue queue) { + this.queue = queue; + } + + public void run() { + try { + mainLoop(); + } finally { + // Someone killed this Thread, behave as if Scheduler cancelled + synchronized(queue) { + newTasksMayBeScheduled = false; + queue.clear(); // Eliminate obsolete references + } + } + } + + /** + * The main scheduler loop. (See class comment.) + */ + private void mainLoop() { + while (true) { + try { + SchedulerTask task; + boolean taskFired; + synchronized(queue) { + // Wait for queue to become non-empty + while (queue.isEmpty() && newTasksMayBeScheduled) + queue.wait(); + if (queue.isEmpty()) + break; // Queue is empty and will forever remain; die + + // Queue nonempty; look at first evt and do the right thing + long currentTime, executionTime; + task = queue.getMin(); + synchronized(task.lock) { + if (task.state == SchedulerTask.CANCELLED) { + queue.removeMin(); + continue; // No action required, poll queue again + } + currentTime = System.currentTimeMillis(); + executionTime = task.nextExecutionTime; + if (taskFired = (executionTime<=currentTime)) { + if (task.period == 0) { // Non-repeating, remove + queue.removeMin(); + task.state = SchedulerTask.EXECUTED; + } else { // Repeating task, reschedule + queue.rescheduleMin(executionTime + task.period); + } + } + } + if (!taskFired) // Task hasn't yet fired; wait + queue.wait(executionTime - currentTime); + + // make sure the fire task can be executed + taskFired = taskFired && tasksMayBeExecuted; + } + + + if (taskFired) { // Task fired; run it + synchronized (executorLock) { + executor.execute(task.executable); + } + } + } catch(InterruptedException e) { + } + } + } +} + +/** + * This class represents a scheduler task queue: a priority queue of SchedulerTasks, + * ordered on nextExecutionTime. Each Scheduler object has one of these, which it + * shares with its SchedulerThread. Internally this class uses a heap, which + * offers log(n) performance for the add, removeMin and rescheduleMin + * operations, and constant time performance for the getMin operation. + */ +class TaskQueue { + /** + * Priority queue represented as a balanced binary heap: the two children + * of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is + * ordered on the nextExecutionTime field: The SchedulerTask with the lowest + * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For + * each node n in the heap, and each descendant of n, d, + * n.nextExecutionTime <= d.nextExecutionTime. + */ + private SchedulerTask[] queue = new SchedulerTask[128]; + + /** + * The number of tasks in the priority queue. (The tasks are stored in + * queue[1] up to queue[size]). + */ + private int size = 0; + + /** + * Returns the number of tasks currently on the queue. + */ + int size() { + return size; + } + + + /** + * Adds a new task to the priority queue. + */ + void add(SchedulerTask task) { + // Grow backing store if necessary + if (size + 1 == queue.length) + queue = Arrays.copyOf(queue, 2*queue.length); + + queue[++size] = task; + fixUp(size); + } + + /** + * Return the "head task" of the priority queue. (The head task is an + * task with the lowest nextExecutionTime.) + */ + SchedulerTask getMin() { + return queue[1]; + } + + /** + * Return the ith task in the priority queue, where i ranges from 1 (the + * head task, which is returned by getMin) to the number of tasks on the + * queue, inclusive. + */ + SchedulerTask get(int i) { + return queue[i]; + } + + /** + * Remove the head task from the priority queue. + */ + void removeMin() { + queue[1] = queue[size]; + queue[size--] = null; // Drop extra reference to prevent memory leak + fixDown(1); + } + + /** + * Removes all the scheduler tasks holding the given executable task from + * queue. There can be many of them due to multiple triggers firing at a + * rapid succession. Recall that queue is one-based, so 1 <= i <= size. + */ + void removeAll(Task executable) { + int i = 1; + while (i <= size) { + for (;i <= size; ++i) { + if (queue[i].executable.equals(executable)) + break; + } + + // no more occurences found + if (i > size) + return; + + queue[i] = queue[size]; + queue[size--] = null; // Drop extra ref to prevent memory leak + + // if it wasn't the last element + if (i <= size) { + fixDown(i); + fixUp(i); + } + } + } + + /** + * Sets the nextExecutionTime associated with the head task to the + * specified value, and adjusts priority queue accordingly. + */ + void rescheduleMin(long newTime) { + queue[1].nextExecutionTime = newTime; + fixDown(1); + } + + /** + * Sets the nextExecutionTime associated with the given scheduler task to + * the specified value, and adjusts priority queue accordingly. + */ + public void rescheduleTask(SchedulerTask task, long newTime) { + int i = 1; + for (;i <= size; ++i) { + if (queue[i].equals(task)) + break; + } + // no more occurences found + if (i > size) + return; + + assert queue[i].nextExecutionTime <= newTime; + + queue[i].nextExecutionTime = newTime; + fixDown(i); + } + + /** + * Returns true if the priority queue contains no elements. + */ + boolean isEmpty() { + return size==0; + } + + /** + * Removes all elements from the priority queue. + */ + void clear() { + // Null out task references to prevent memory leak + for (int i=1; i<=size; i++) + queue[i] = null; + + size = 0; + } + + /** + * Establishes the heap invariant (described above) assuming the heap + * satisfies the invariant except possibly for the leaf-node indexed by k + * (which may have a nextExecutionTime less than its parent's). + * + * This method functions by "promoting" queue[k] up the hierarchy + * (by swapping it with its parent) repeatedly until queue[k]'s + * nextExecutionTime is greater than or equal to that of its parent. + */ + private void fixUp(int k) { + while (k > 1) { + int j = k >> 1; + if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) + break; + SchedulerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + k = j; + } + } + + /** + * Establishes the heap invariant (described above) in the subtree + * rooted at k, which is assumed to satisfy the heap invariant except + * possibly for node k itself (which may have a nextExecutionTime greater + * than its children's). + * + * This method functions by "demoting" queue[k] down the hierarchy + * (by swapping it with its smaller child) repeatedly until queue[k]'s + * nextExecutionTime is less than or equal to those of its children. + */ + private void fixDown(int k) { + int j; + while ((j = k << 1) <= size && j > 0) { + if (j < size && + queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) + j++; // j indexes smallest kid + if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) + break; + SchedulerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + k = j; + } + } + + /** + * Establishes the heap invariant (described above) in the entire tree, + * assuming nothing about the order of the elements prior to the call. + */ + void heapify() { + for (int i = size/2; i >= 1; i--) + fixDown(i); + } +} + + +class SchedulerTask { + /** + * This object is used to control access to the SchedulerTask internals. + */ + final Object lock = new Object(); + + /** + * The state of this task, chosen from the constants below. + */ + int state = VIRGIN; + + /** + * This task has not yet been scheduled. + */ + static final int VIRGIN = 0; + + /** + * This task is scheduled for execution. If it is a non-repeating task, + * it has not yet been executed. + */ + static final int SCHEDULED = 1; + + /** + * This non-repeating task has already executed (or is currently + * executing) and has not been cancelled. + */ + static final int EXECUTED = 2; + + /** + * This task has been cancelled (with a call to SchedulerTask.cancel). + */ + static final int CANCELLED = 3; + + /** + * Next execution time for this task in the format returned by + * System.currentTimeMillis, assuming this task is scheduled for execution. + * For repeating tasks, this field is updated prior to each task execution. + */ + long nextExecutionTime; + + /** + * Period in milliseconds for repeating tasks. A positive value indicates + * fixed-rate execution. A value of 0 indicates a non-repeating task. + */ + long period = 0; + + /** + * The actual task to be executed. + */ + Task executable; + + + /** + * Creates a new scheduler task. + */ + protected SchedulerTask(Task task) { + this.executable = task; + } + + + +} + + From ec299956efa3e8496296acd040e7072adcffadf3 Mon Sep 17 00:00:00 2001 From: Jaroslav Keznikl Date: Wed, 6 Nov 2013 16:35:44 +0100 Subject: [PATCH 2/5] worikng on the single threaded scheduler --- .../scheduler/SingleThreadedScheduler.java | 136 ++++++++++-------- 1 file changed, 76 insertions(+), 60 deletions(-) diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java index a829d50f2..b48dc994b 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java @@ -9,7 +9,12 @@ import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; public class SingleThreadedScheduler implements Scheduler { - Map periodicTasks; + /** + * Map capturing the SchedulerTaskWrappers for periodic Tasks. These are + * mostly important for postponing the Tasks' execution when the previous + * execution took longer than the period. + */ + Map periodicTasks; /** * The scheduler task queue. This data structure is shared with the scheduler @@ -40,18 +45,24 @@ protected void finalize() throws Throwable { } }; + /** + * If the completed task is periodic and the completion time is greater than the intended start of the next period, + * then the next period is moved. + */ @Override public void executionCompleted(Task task) { synchronized (queue) { - SchedulerTask sTask = periodicTasks.get(task); + SchedulerTaskWrapper sTask = periodicTasks.get(task); // continue only for periodic tasks if (sTask == null) return; - // if the periodic task execution took more than it remained till the next period - if (sTask.nextExecutionTime < System.currentTimeMillis()) { - queue.rescheduleTask(sTask, System.currentTimeMillis() + sTask.period); + synchronized (sTask.lock) { + // if the periodic task execution took more than it remained till the next period + if (sTask.nextExecutionTime < System.currentTimeMillis()) { + queue.rescheduleTask(sTask, System.currentTimeMillis() + sTask.period); + } } } @@ -98,7 +109,7 @@ public void addTask(Task task) { "Scheduler already terminated."); if (task.getSchedulingPeriod() > 0) { - SchedulerTask sTask = new SchedulerTask(task); + SchedulerTaskWrapper sTask = new SchedulerTaskWrapper(task); scheduleNow(sTask, task.getSchedulingPeriod()); periodicTasks.put(task, sTask); } @@ -110,7 +121,7 @@ public void triggered(Task task) { if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted) return; - scheduleNow(new SchedulerTask(task), 0); + scheduleNow(new SchedulerTaskWrapper(task), 0); } } }); @@ -119,10 +130,10 @@ public void triggered(Task task) { /** * Note that this method has to be explicitly protected by queue's monitor! */ - private void scheduleNow(SchedulerTask sTask, long period) { + private void scheduleNow(SchedulerTaskWrapper sTask, long period) { sTask.nextExecutionTime = System.currentTimeMillis(); // start immediately sTask.period = period; - sTask.state = SchedulerTask.SCHEDULED; + sTask.state = SchedulerTaskWrapper.SCHEDULED; queue.add(sTask); if (queue.getMin() == sTask) @@ -136,8 +147,8 @@ private void scheduleNow(SchedulerTask sTask, long period) { public void removeTask(Task task) { task.unsetTriggerListener(); synchronized(queue) { - // remove all the periodic/triggered schedules of the task - queue.removeAll(task); + // cancel all the periodic/triggered schedules of the task + queue.cancelAll(task); periodicTasks.remove(task); } } @@ -210,36 +221,50 @@ public void run() { private void mainLoop() { while (true) { try { - SchedulerTask task; + SchedulerTaskWrapper task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); - if (queue.isEmpty()) + if (queue.isEmpty() && !newTasksMayBeScheduled) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); + synchronized(task.lock) { - if (task.state == SchedulerTask.CANCELLED) { + if (task.state == SchedulerTaskWrapper.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again - } + } + currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; - if (taskFired = (executionTime<=currentTime)) { - if (task.period == 0) { // Non-repeating, remove - queue.removeMin(); - task.state = SchedulerTask.EXECUTED; - } else { // Repeating task, reschedule - queue.rescheduleMin(executionTime + task.period); - } + taskFired = (executionTime<=currentTime); + + if (!taskFired) { // Task hasn't yet fired; wait + queue.wait(executionTime - currentTime); + + // restart, since the current task might have been + // deleted, or some other task that has to be + // scheduled earlier might have been added, or all + // tasks might have been deleted + if (queue.getMin() != task) + continue; } + + assert taskFired; + + if (task.period == 0) { // Non-repeating, remove + queue.removeMin(); + task.state = SchedulerTaskWrapper.EXECUTED; + } else { // Repeating task, reschedule + queue.rescheduleMin(executionTime + task.period); + } } - if (!taskFired) // Task hasn't yet fired; wait - queue.wait(executionTime - currentTime); + // make sure the fire task can be executed taskFired = taskFired && tasksMayBeExecuted; @@ -268,12 +293,12 @@ class TaskQueue { /** * Priority queue represented as a balanced binary heap: the two children * of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is - * ordered on the nextExecutionTime field: The SchedulerTask with the lowest + * ordered on the nextExecutionTime field: The SchedulerTaskWrapper with the lowest * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For * each node n in the heap, and each descendant of n, d, * n.nextExecutionTime <= d.nextExecutionTime. */ - private SchedulerTask[] queue = new SchedulerTask[128]; + private SchedulerTaskWrapper[] queue = new SchedulerTaskWrapper[128]; /** * The number of tasks in the priority queue. (The tasks are stored in @@ -292,7 +317,7 @@ int size() { /** * Adds a new task to the priority queue. */ - void add(SchedulerTask task) { + void add(SchedulerTaskWrapper task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); @@ -305,8 +330,11 @@ void add(SchedulerTask task) { * Return the "head task" of the priority queue. (The head task is an * task with the lowest nextExecutionTime.) */ - SchedulerTask getMin() { - return queue[1]; + SchedulerTaskWrapper getMin() { + if (size > 0) + return queue[1]; + else + return null; } /** @@ -314,7 +342,7 @@ SchedulerTask getMin() { * head task, which is returned by getMin) to the number of tasks on the * queue, inclusive. */ - SchedulerTask get(int i) { + SchedulerTaskWrapper get(int i) { return queue[i]; } @@ -328,31 +356,19 @@ void removeMin() { } /** - * Removes all the scheduler tasks holding the given executable task from + * Cancels all the scheduler tasks holding the given executable task from * queue. There can be many of them due to multiple triggers firing at a - * rapid succession. Recall that queue is one-based, so 1 <= i <= size. + * rapid succession. The cancelled tasks will be removed automatically by + * the SchedulerThreat. */ - void removeAll(Task executable) { - int i = 1; - while (i <= size) { - for (;i <= size; ++i) { - if (queue[i].executable.equals(executable)) - break; - } - - // no more occurences found - if (i > size) - return; - - queue[i] = queue[size]; - queue[size--] = null; // Drop extra ref to prevent memory leak - - // if it wasn't the last element - if (i <= size) { - fixDown(i); - fixUp(i); - } - } + void cancelAll(Task executable) { + for (int i=1; i <= size; ++i) { + synchronized(queue[i].lock) { + if (queue[i].executable.equals(executable)) { + queue[i].state = SchedulerTaskWrapper.CANCELLED; + } + } + } } /** @@ -368,7 +384,7 @@ void rescheduleMin(long newTime) { * Sets the nextExecutionTime associated with the given scheduler task to * the specified value, and adjusts priority queue accordingly. */ - public void rescheduleTask(SchedulerTask task, long newTime) { + public void rescheduleTask(SchedulerTaskWrapper task, long newTime) { int i = 1; for (;i <= size; ++i) { if (queue[i].equals(task)) @@ -416,7 +432,7 @@ private void fixUp(int k) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; - SchedulerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + SchedulerTaskWrapper tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } @@ -439,7 +455,7 @@ private void fixDown(int k) { j++; // j indexes smallest kid if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break; - SchedulerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + SchedulerTaskWrapper tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } @@ -455,9 +471,9 @@ void heapify() { } -class SchedulerTask { +class SchedulerTaskWrapper { /** - * This object is used to control access to the SchedulerTask internals. + * This object is used to control access to the SchedulerTaskWrapper internals. */ final Object lock = new Object(); @@ -484,7 +500,7 @@ class SchedulerTask { static final int EXECUTED = 2; /** - * This task has been cancelled (with a call to SchedulerTask.cancel). + * This task has been cancelled (with a call to SchedulerTaskWrapper.cancel). */ static final int CANCELLED = 3; @@ -510,7 +526,7 @@ class SchedulerTask { /** * Creates a new scheduler task. */ - protected SchedulerTask(Task task) { + protected SchedulerTaskWrapper(Task task) { this.executable = task; } From 2f0e0eb12df512f3891605fb258c010341c4497a Mon Sep 17 00:00:00 2001 From: Jaroslav Keznikl Date: Wed, 6 Nov 2013 17:23:25 +0100 Subject: [PATCH 3/5] Improved impl. of the SingleTreadScheduler --- .../scheduler/SingleThreadedScheduler.java | 142 +++++++++++------- 1 file changed, 84 insertions(+), 58 deletions(-) diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java index b48dc994b..bb2cebfbc 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java @@ -5,6 +5,7 @@ import java.util.Map; import cz.cuni.mff.d3s.deeco.executor.Executor; +import cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger; import cz.cuni.mff.d3s.deeco.task.Task; import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; @@ -14,7 +15,7 @@ public class SingleThreadedScheduler implements Scheduler { * mostly important for postponing the Tasks' execution when the previous * execution took longer than the period. */ - Map periodicTasks; + Map periodicEvents; /** * The scheduler task queue. This data structure is shared with the scheduler @@ -53,7 +54,7 @@ protected void finalize() throws Throwable { public void executionCompleted(Task task) { synchronized (queue) { - SchedulerTaskWrapper sTask = periodicTasks.get(task); + SchedulerEvent sTask = periodicEvents.get(task); // continue only for periodic tasks if (sTask == null) return; @@ -108,20 +109,20 @@ public void addTask(Task task) { throw new IllegalStateException( "Scheduler already terminated."); - if (task.getSchedulingPeriod() > 0) { - SchedulerTaskWrapper sTask = new SchedulerTaskWrapper(task); - scheduleNow(sTask, task.getSchedulingPeriod()); - periodicTasks.put(task, sTask); + if (task.getPeriodicTrigger() != null) { + SchedulerEvent sTask = new SchedulerEvent(task, task.getPeriodicTrigger()); + scheduleNow(sTask, task.getPeriodicTrigger().getPeriod()); + periodicEvents.put(task, sTask); } } - task.setTriggerListener(new TaskTriggerListener() { + task.setTriggerListener(new TaskTriggerListener() { @Override - public void triggered(Task task) { + public void triggered(Task task, Trigger trigger) { synchronized (queue) { if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted) return; - scheduleNow(new SchedulerTaskWrapper(task), 0); + scheduleNow(new SchedulerEvent(task, trigger), 0); } } }); @@ -130,10 +131,10 @@ public void triggered(Task task) { /** * Note that this method has to be explicitly protected by queue's monitor! */ - private void scheduleNow(SchedulerTaskWrapper sTask, long period) { + private void scheduleNow(SchedulerEvent sTask, long period) { sTask.nextExecutionTime = System.currentTimeMillis(); // start immediately sTask.period = period; - sTask.state = SchedulerTaskWrapper.SCHEDULED; + sTask.state = SchedulerEvent.SCHEDULED; queue.add(sTask); if (queue.getMin() == sTask) @@ -149,7 +150,7 @@ public void removeTask(Task task) { synchronized(queue) { // cancel all the periodic/triggered schedules of the task queue.cancelAll(task); - periodicTasks.remove(task); + periodicEvents.remove(task); } } @@ -215,14 +216,48 @@ public void run() { } } + /** + * Wait on the queue till the nextExecutionTime of the event, or till + * the event is removed from the top of the queue (either by some new event + * that has to be executed sooner, or by removing the current event). + * + * @throws InterruptedException + * if the thread gets interrupted during waiting + */ + private boolean waitTaskFired(SchedulerEvent event) throws InterruptedException { + while (true) { + long currentTime = System.currentTimeMillis(); + long executionTime = event.nextExecutionTime; + boolean taskFired = (executionTime<=currentTime); + + if (!taskFired) { // Task hasn't yet fired; wait + queue.wait(executionTime - currentTime); + + // restart the main loop, since the current event might have been + // deleted, or some other event that has to be + // scheduled earlier might have been added, or all + // tasks might have been deleted + if (queue.getMin() != event) + return false; + + // check taskFired again, i.e., that we were + // waken up by waiting long enough (and not by + // some other notify, e.g., when adding a task) + } else { + // we have waited long enough + return true; + } + } + } + /** * The main scheduler loop. (See class comment.) */ private void mainLoop() { - while (true) { + while (true) { try { - SchedulerTaskWrapper task; - boolean taskFired; + SchedulerEvent event; + boolean canExecute; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) @@ -230,50 +265,35 @@ private void mainLoop() { if (queue.isEmpty() && !newTasksMayBeScheduled) break; // Queue is empty and will forever remain; die - // Queue nonempty; look at first evt and do the right thing - long currentTime, executionTime; - task = queue.getMin(); + // Queue nonempty; look at first event and do the right thing + event = queue.getMin(); - synchronized(task.lock) { - if (task.state == SchedulerTaskWrapper.CANCELLED) { + synchronized(event.lock) { + if (event.state == SchedulerEvent.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } + + canExecute = waitTaskFired(event); - currentTime = System.currentTimeMillis(); - executionTime = task.nextExecutionTime; - taskFired = (executionTime<=currentTime); - - if (!taskFired) { // Task hasn't yet fired; wait - queue.wait(executionTime - currentTime); - - // restart, since the current task might have been - // deleted, or some other task that has to be - // scheduled earlier might have been added, or all - // tasks might have been deleted - if (queue.getMin() != task) - continue; - } - - assert taskFired; + if (!canExecute) + continue; // The event cannot continue with execution, poll queue again - if (task.period == 0) { // Non-repeating, remove + if (event.period == 0) { // Non-repeating, remove queue.removeMin(); - task.state = SchedulerTaskWrapper.EXECUTED; + event.state = SchedulerEvent.EXECUTED; } else { // Repeating task, reschedule - queue.rescheduleMin(executionTime + task.period); + queue.rescheduleMin(event.nextExecutionTime + event.period); } } - // make sure the fire task can be executed - taskFired = taskFired && tasksMayBeExecuted; - } - + canExecute = canExecute && tasksMayBeExecuted; + } - if (taskFired) { // Task fired; run it + if (canExecute) { // Task fired and can execute; run it synchronized (executorLock) { - executor.execute(task.executable); + executor.execute(event.executable, event.trigger); } } } catch(InterruptedException e) { @@ -293,12 +313,12 @@ class TaskQueue { /** * Priority queue represented as a balanced binary heap: the two children * of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is - * ordered on the nextExecutionTime field: The SchedulerTaskWrapper with the lowest + * ordered on the nextExecutionTime field: The SchedulerEvent with the lowest * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For * each node n in the heap, and each descendant of n, d, * n.nextExecutionTime <= d.nextExecutionTime. */ - private SchedulerTaskWrapper[] queue = new SchedulerTaskWrapper[128]; + private SchedulerEvent[] queue = new SchedulerEvent[128]; /** * The number of tasks in the priority queue. (The tasks are stored in @@ -317,7 +337,7 @@ int size() { /** * Adds a new task to the priority queue. */ - void add(SchedulerTaskWrapper task) { + void add(SchedulerEvent task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); @@ -330,7 +350,7 @@ void add(SchedulerTaskWrapper task) { * Return the "head task" of the priority queue. (The head task is an * task with the lowest nextExecutionTime.) */ - SchedulerTaskWrapper getMin() { + SchedulerEvent getMin() { if (size > 0) return queue[1]; else @@ -342,7 +362,7 @@ SchedulerTaskWrapper getMin() { * head task, which is returned by getMin) to the number of tasks on the * queue, inclusive. */ - SchedulerTaskWrapper get(int i) { + SchedulerEvent get(int i) { return queue[i]; } @@ -365,7 +385,7 @@ void cancelAll(Task executable) { for (int i=1; i <= size; ++i) { synchronized(queue[i].lock) { if (queue[i].executable.equals(executable)) { - queue[i].state = SchedulerTaskWrapper.CANCELLED; + queue[i].state = SchedulerEvent.CANCELLED; } } } @@ -384,7 +404,7 @@ void rescheduleMin(long newTime) { * Sets the nextExecutionTime associated with the given scheduler task to * the specified value, and adjusts priority queue accordingly. */ - public void rescheduleTask(SchedulerTaskWrapper task, long newTime) { + public void rescheduleTask(SchedulerEvent task, long newTime) { int i = 1; for (;i <= size; ++i) { if (queue[i].equals(task)) @@ -432,7 +452,7 @@ private void fixUp(int k) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; - SchedulerTaskWrapper tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + SchedulerEvent tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } @@ -455,7 +475,7 @@ private void fixDown(int k) { j++; // j indexes smallest kid if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break; - SchedulerTaskWrapper tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; + SchedulerEvent tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } @@ -471,9 +491,9 @@ void heapify() { } -class SchedulerTaskWrapper { +class SchedulerEvent { /** - * This object is used to control access to the SchedulerTaskWrapper internals. + * This object is used to control access to the SchedulerEvent internals. */ final Object lock = new Object(); @@ -500,7 +520,7 @@ class SchedulerTaskWrapper { static final int EXECUTED = 2; /** - * This task has been cancelled (with a call to SchedulerTaskWrapper.cancel). + * This task has been cancelled (with a call to SchedulerEvent.cancel). */ static final int CANCELLED = 3; @@ -522,12 +542,18 @@ class SchedulerTaskWrapper { */ Task executable; + /** + * The trigger associated with this event. + */ + Trigger trigger; + /** * Creates a new scheduler task. */ - protected SchedulerTaskWrapper(Task task) { + protected SchedulerEvent(Task task, Trigger trigger) { this.executable = task; + this.trigger = trigger; } From 07036c878028c98656d9bfac8c3dc25ca3e27cb4 Mon Sep 17 00:00:00 2001 From: Jaroslav Keznikl Date: Wed, 6 Nov 2013 17:47:31 +0100 Subject: [PATCH 4/5] invokeAndWait --- .../scheduler/SingleThreadedScheduler.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java index bb2cebfbc..45af39222 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java @@ -4,9 +4,13 @@ import java.util.Date; import java.util.Map; +import org.junit.internal.runners.statements.RunAfters; + import cz.cuni.mff.d3s.deeco.executor.Executor; +import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger; import cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger; import cz.cuni.mff.d3s.deeco.task.Task; +import cz.cuni.mff.d3s.deeco.task.TaskInvocationException; import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; public class SingleThreadedScheduler implements Scheduler { @@ -67,6 +71,8 @@ public void executionCompleted(Task task) { } } + if (task instanceof InvokeAndWaitTask) + task.notify(); } @Override @@ -160,9 +166,17 @@ public void setExecutor(Executor executor) { thread.executor = executor; } } + + public void invokeAndWait(Runnable doRun) throws InterruptedException { + InvokeAndWaitTask task = new InvokeAndWaitTask(this, doRun); + addTask(task); + task.wait(); + } } + + /** * This "helper class" implements the scheduler's task execution thread, which * waits for tasks on the scheduler queue, executions them when they fire, @@ -560,4 +574,39 @@ protected SchedulerEvent(Task task, Trigger trigger) { } +/** + * Ad-hoc tasks for one-time execution of a runnable within the context of the scheduler thread. + * + * @author Jaroslav Keznikl + * + */ +class InvokeAndWaitTask extends Task { + + Runnable runnable; + + public InvokeAndWaitTask(Scheduler scheduler, Runnable runnable) { + super(scheduler); + this.runnable = runnable; + } + @Override + public void invoke(Trigger trigger) throws TaskInvocationException { + if (runnable != null) { + runnable.run(); + } + } + + @Override + protected void registerTriggers() { + } + + @Override + protected void unregisterTriggers() { + } + + @Override + public PeriodicTrigger getPeriodicTrigger() { + return null; + } + +} \ No newline at end of file From a0c52ee7e7062b3fc4fc1624d6b7ee79a75837dc Mon Sep 17 00:00:00 2001 From: Jaroslav Keznikl Date: Wed, 6 Nov 2013 18:21:37 +0100 Subject: [PATCH 5/5] SingleThreadedScheduler impl and test --- .../deeco/scheduler/LocalTimeScheduler.java | 2 +- .../mff/d3s/deeco/scheduler/Scheduler.java | 46 +- .../scheduler/SingleThreadedScheduler.java | 127 +++-- .../scheduler/LocalTimeSchedulerTest.java | 2 +- .../d3s/deeco/scheduler/SchedulerTest.java | 435 +++++++++--------- .../SingleThreadedSchedulerTest.java | 19 + 6 files changed, 352 insertions(+), 279 deletions(-) create mode 100644 jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedSchedulerTest.java diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeScheduler.java index f3fa2a015..693165a5a 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeScheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeScheduler.java @@ -114,7 +114,7 @@ private void stopTask(final Task task) { TaskInfo ti = tasks.get(task); if( ti != null && ti.state != States.RUNNING ){ - task.setTriggerListener(null); + task.unsetTriggerListener(); ti.timer.cancel(); diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java index 198b81edf..497990771 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java @@ -1,24 +1,24 @@ -package cz.cuni.mff.d3s.deeco.scheduler; - -import cz.cuni.mff.d3s.deeco.executor.ExecutionListener; -import cz.cuni.mff.d3s.deeco.executor.Executor; -import cz.cuni.mff.d3s.deeco.task.Task; - - -/** - * Interface Scheduler for LocalTimeScheduler(and others if needed) - * - * @author Andranik Muradyan - * - */ -public interface Scheduler extends ExecutionListener { - public void start(); - public void stop(); - public void addTask( Task task ); - public void removeTask( Task task ); - - public void executionFailed(Task task, Exception e); - public void executionCompleted( Task task ); - public void setExecutor(Executor executor); - +package cz.cuni.mff.d3s.deeco.scheduler; + +import cz.cuni.mff.d3s.deeco.executor.ExecutionListener; +import cz.cuni.mff.d3s.deeco.executor.Executor; +import cz.cuni.mff.d3s.deeco.task.Task; + + +/** + * Interface Scheduler for LocalTimeScheduler(and others if needed) + * + * @author Andranik Muradyan + * + */ +public interface Scheduler extends ExecutionListener { + public void start(); + public void stop(); + public void addTask( Task task ); + public void removeTask( Task task ); + + public void executionFailed(Task task, Exception e); + public void executionCompleted( Task task ); + public void setExecutor(Executor executor); + } \ No newline at end of file diff --git a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java index 45af39222..064405deb 100644 --- a/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java +++ b/jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedScheduler.java @@ -1,10 +1,36 @@ +/* + * parts taken from java.util.Timer + * + * Copyright 1999-2007 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ package cz.cuni.mff.d3s.deeco.scheduler; import java.util.Arrays; -import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; - -import org.junit.internal.runners.statements.RunAfters; +import java.util.Set; import cz.cuni.mff.d3s.deeco.executor.Executor; import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger; @@ -19,7 +45,9 @@ public class SingleThreadedScheduler implements Scheduler { * mostly important for postponing the Tasks' execution when the previous * execution took longer than the period. */ - Map periodicEvents; + Map periodicEvents = new HashMap<>(); + + Set allTasks = new HashSet<>(); /** * The scheduler task queue. This data structure is shared with the scheduler @@ -56,23 +84,24 @@ protected void finalize() throws Throwable { */ @Override public void executionCompleted(Task task) { - + if (task instanceof InvokeAndWaitTask) + task.notify(); + synchronized (queue) { - SchedulerEvent sTask = periodicEvents.get(task); + SchedulerEvent event = periodicEvents.get(task); // continue only for periodic tasks - if (sTask == null) + if (event == null) return; - synchronized (sTask.lock) { + synchronized (event.lock) { // if the periodic task execution took more than it remained till the next period - if (sTask.nextExecutionTime < System.currentTimeMillis()) { - queue.rescheduleTask(sTask, System.currentTimeMillis() + sTask.period); + if (event.nextExecutionTime < System.currentTimeMillis()) { + queue.rescheduleTask(event, System.currentTimeMillis() + event.period); } } } - if (task instanceof InvokeAndWaitTask) - task.notify(); + } @Override @@ -107,31 +136,45 @@ public void stop() { * @throws IllegalArgumentException of a null task is passed as an argument. */ @Override - public void addTask(Task task) { + public void addTask(Task task) { if (task == null) throw new IllegalArgumentException("The task cannot be null"); - synchronized (queue) { - if (!thread.newTasksMayBeScheduled) - throw new IllegalStateException( - "Scheduler already terminated."); + + synchronized (allTasks) { + if (allTasks.contains(task)) + return; - if (task.getPeriodicTrigger() != null) { - SchedulerEvent sTask = new SchedulerEvent(task, task.getPeriodicTrigger()); - scheduleNow(sTask, task.getPeriodicTrigger().getPeriod()); - periodicEvents.put(task, sTask); - } - } - task.setTriggerListener(new TaskTriggerListener() { - @Override - public void triggered(Task task, Trigger trigger) { - synchronized (queue) { - if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted) - return; - - scheduleNow(new SchedulerEvent(task, trigger), 0); + synchronized (queue) { + if (!thread.newTasksMayBeScheduled) + throw new IllegalStateException( + "Scheduler already terminated."); + + if (task.getPeriodicTrigger() != null) { + SchedulerEvent sTask = new SchedulerEvent(task, task.getPeriodicTrigger()); + scheduleNow(sTask, task.getPeriodicTrigger().getPeriod()); + periodicEvents.put(task, sTask); } } - }); + task.setTriggerListener(new TaskTriggerListener() { + @Override + public void triggered(Task task, Trigger trigger) { + synchronized (queue) { + if (!thread.newTasksMayBeScheduled || !thread.tasksMayBeExecuted) + return; + + boolean isScheduled; + synchronized (allTasks) { + isScheduled = allTasks.contains(task); + } + if (isScheduled) { + scheduleNow(new SchedulerEvent(task, trigger), 0); + } + } + } + }); + + allTasks.add(task); + } } /** @@ -152,12 +195,18 @@ private void scheduleNow(SchedulerEvent sTask, long period) { */ @Override public void removeTask(Task task) { - task.unsetTriggerListener(); - synchronized(queue) { - // cancel all the periodic/triggered schedules of the task - queue.cancelAll(task); - periodicEvents.remove(task); - } + synchronized (allTasks) { + if (!allTasks.contains(task)) + return; + + task.unsetTriggerListener(); + synchronized(queue) { + // cancel all the periodic/triggered schedules of the task + queue.cancelAll(task); + periodicEvents.remove(task); + } + allTasks.remove(task); + } } @Override @@ -199,7 +248,7 @@ class SchedulerThread extends Thread { * temporarily stopped and all the scheduled tasks have to bee ignored. Note * that this field is protected by queue's monitor! */ - boolean tasksMayBeExecuted = true; + boolean tasksMayBeExecuted = false; /** * Our Scheduler's queue. We store this reference in preference to diff --git a/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeSchedulerTest.java b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeSchedulerTest.java index 095f3382d..14b3b87c5 100644 --- a/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeSchedulerTest.java +++ b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/LocalTimeSchedulerTest.java @@ -3,7 +3,7 @@ import cz.cuni.mff.d3s.deeco.executor.Executor; /** - * Factory for Scheduler implementation tests + * * * @author Jaroslav Keznikl * diff --git a/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SchedulerTest.java b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SchedulerTest.java index 729f32da3..9bfd81c94 100644 --- a/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SchedulerTest.java +++ b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SchedulerTest.java @@ -1,25 +1,26 @@ -package cz.cuni.mff.d3s.deeco.scheduler; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import cz.cuni.mff.d3s.deeco.executor.Executor; -import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger; -import cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger; -import cz.cuni.mff.d3s.deeco.task.Task; -import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; - +package cz.cuni.mff.d3s.deeco.scheduler; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import cz.cuni.mff.d3s.deeco.executor.Executor; +import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger; +import cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger; +import cz.cuni.mff.d3s.deeco.task.Task; +import cz.cuni.mff.d3s.deeco.task.TaskTriggerListener; + /** * Scheduler test suite * @@ -27,199 +28,203 @@ * @author Andranik Muradyan * * */ -public abstract class SchedulerTest { - - protected Scheduler tested; - protected Executor executor; - protected TaskTriggerListener testListener; - - protected abstract Scheduler setUpTested(Executor executor2); - - - @Before - public void setUp() throws Exception{ - executor = mock(Executor.class); - tested = setUpTested(executor); - testListener = mock(TaskTriggerListener.class); - } - - @After - public void tearDown() throws Exception{ - if (tested!=null) - tested.stop(); - } - - // TODO TB: Shouldn't we have also a test that tests repeated execution? For instnce to start the scheduler, let it run for 2 secs and see that a periodic - // task with a period of 100 ms got scheduled approx. 200 times? - - @Test - public void testPeriodicTaskScheduledWhenSchedulerStarted() throws InterruptedException { - Task t = mock(Task.class); - PeriodicTrigger p = mock(PeriodicTrigger.class); - when(p.getPeriod()).thenReturn(11L); - when(t.getPeriodicTrigger()).thenReturn(p); - - // WHEN a periodic task is added to a new (stopped) scheduler - tested.addTask(t); - // THEN the task is not scheduled - verify(executor, timeout(10).never()).execute(t, p); - - // WHEN the scheduler is started, runs for a while (longer than the - // period) and then stopped - tested.start(); - Thread.sleep(10); - tested.stop(); - // THEN the task gets eventually scheduled - verify(executor, atLeastOnce()).execute(t, p); - - reset(executor); - - // WHEN the running scheduler is stopped a bit longer (FIXME TB: not sure what it means) - // THEN the task is no longer scheduled - verify(executor, timeout(10).never()).execute(t, p); - - } - - @Test - public void testPeriodicTaskAutomaticallyScheduledWhenAddedToRunningScheduler() throws InterruptedException { - Task t = mock(Task.class); - PeriodicTrigger p = mock(PeriodicTrigger.class); - when(p.getPeriod()).thenReturn(11L); - when(t.getPeriodicTrigger()).thenReturn(p); - tested.start(); - - // WHEN a task is added to a running scheduler - tested.addTask(t); - // THEN it gets eventually scheduled - verify(executor, timeout(10).atLeastOnce()).execute(t, p); - } - - @Test - public void testPeriodicTaskNotScheduledWhenRemovedRunningScheduler() throws InterruptedException { - Task t = mock(Task.class); - PeriodicTrigger p = mock(PeriodicTrigger.class); - when(p.getPeriod()).thenReturn(11L); - when(t.getPeriodicTrigger()).thenReturn(p); - - tested.addTask(t); - tested.start(); - - // WHEN a task is removed from a running scheduler - tested.removeTask(t); - // THEN it gets eventually un-scheduled - reset(executor); - verify(executor, timeout(10).never()).execute(t, p); - } - - @Test - public void testTriggeredTaskScheduledOnlyWhenTriggered() throws InterruptedException { - Task t = createTriggeredTask(); - Trigger tr = mock(Trigger.class); - - // WHEN a triggered task is added to a stopped scheduler and the trigger is triggered - tested.addTask(t); - testListener.triggered(t, tr); - // THEN the process in not scheduled - verify(executor, never()).execute(t, tr); - - // WHEN the scheduler is started with a registered triggered task - tested.start(); - // THEN it is not scheduled if no trigger is triggered - verify(executor, timeout(10).never()).execute(t, tr); - - // WHEN the corresponding trigger is triggered - testListener.triggered(t, tr); - // THEN the process is scheduled (exactly once) - verify(executor, times(1)).execute(t, tr); - - // WHEN the scheduler is stopped and the trigger is triggered - reset(executor); - tested.stop(); - testListener.triggered(t, tr); - // THEN the process in not scheduled anymore - verify(executor, never()).execute(t, tr); - - // WHEN the task is removed from a running scheduler and the trigger is triggered - tested.start(); - tested.removeTask(t); - testListener.triggered(t, tr); - // THEN the process in not scheduled - verify(executor, never()).execute(t, tr); - } - - @Test - public void testTriggerListenerRegisteredAfterAddWhenRunning() { - Task t = mock(Task.class); - - tested.start(); - - // WHEN a task is added to a running scheduler - tested.addTask(t); - // THEN the scheduler registers a trigger listener for the task - verify(t, times(1)).setTriggerListener(any(TaskTriggerListener.class)); - - // WHEN repeating the action - reset(t); - tested.addTask(t); - // THEN nothing happens anymore - verify(t, never()).setTriggerListener(any(TaskTriggerListener.class)); - } - - @Test - public void testTriggerListenerUnregisteredAfterRemoveWhenRunning() { - Task t = mock(Task.class); - tested.start(); - tested.addTask(t); - - // WHEN a task is removed from a running scheduler - tested.removeTask(t); - - // THEN the scheduler unregisters its trigger listener for the task - verify(t, times(1)).setTriggerListener(null); - - // WHEN repeating the action - reset(t); - tested.removeTask(t); - // THEN nothing happens anymore - verify(t, never()).setTriggerListener(null); - } - - @Test - public void testTriggerListenerRegisteredAfterStartWhenAdded() { - Task t = mock(Task.class); - tested.addTask(t); - - // WHEN a scheduler with a single added task is started - tested.start(); - // THEN the scheduler registers a trigger listener for the task - verify(t, times(1)).setTriggerListener(any(TaskTriggerListener.class)); - - // WHEN repeating the action - reset(t); - tested.start(); - // THEN nothing happens anymore - verify(t, never()).setTriggerListener(any(TaskTriggerListener.class)); - } - - @Test - public void testTriggerListenerUnregisteredAfterStopWhenAdded() { - Task t = mock(Task.class); - tested.start(); - tested.addTask(t); - - // WHEN a scheduler with a single added task is stopped - tested.stop(); - - // THEN the scheduler unregisters its trigger listener for the task - verify(t, times(1)).setTriggerListener(null); - - // WHEN repeating the action - reset(t); - tested.stop(); - // THEN nothing happens anymore - verify(t, never()).setTriggerListener(null); - } - +public abstract class SchedulerTest { + + protected Scheduler tested; + protected Executor executor; + protected TaskTriggerListener testListener; + + protected abstract Scheduler setUpTested(Executor executor2); + + + @Before + public void setUp() throws Exception{ + executor = mock(Executor.class); + tested = setUpTested(executor); + testListener = mock(TaskTriggerListener.class); + } + + @After + public void tearDown() throws Exception{ + if (tested!=null) + tested.stop(); + } + + // TODO TB: Shouldn't we have also a test that tests repeated execution? For instnce to start the scheduler, let it run for 2 secs and see that a periodic + // task with a period of 100 ms got scheduled approx. 200 times? + + @Test + public void testPeriodicTaskScheduledWhenSchedulerStarted() throws InterruptedException { + Task t = mock(Task.class); + PeriodicTrigger p = mock(PeriodicTrigger.class); + when(p.getPeriod()).thenReturn(11L); + when(t.getPeriodicTrigger()).thenReturn(p); + + // WHEN a periodic task is added to a new (stopped) scheduler + tested.addTask(t); + // THEN the task is not scheduled + verify(executor, timeout(10).never()).execute(t, p); + + // WHEN the scheduler is started, runs for a while (longer than the + // period) and then stopped + tested.start(); + Thread.sleep(10); + tested.stop(); + // THEN the task gets eventually scheduled + verify(executor, atLeastOnce()).execute(t, p); + + reset(executor); + + // WHEN the running scheduler is stopped a bit longer (FIXME TB: not sure what it means) + // THEN the task is no longer scheduled + verify(executor, timeout(10).never()).execute(t, p); + + } + + @Test + public void testPeriodicTaskAutomaticallyScheduledWhenAddedToRunningScheduler() throws InterruptedException { + Task t = mock(Task.class); + PeriodicTrigger p = mock(PeriodicTrigger.class); + when(p.getPeriod()).thenReturn(11L); + when(t.getPeriodicTrigger()).thenReturn(p); + tested.start(); + + // WHEN a task is added to a running scheduler + tested.addTask(t); + // THEN it gets eventually scheduled + verify(executor, timeout(10).atLeastOnce()).execute(t, p); + } + + @Test + public void testPeriodicTaskNotScheduledWhenRemovedRunningScheduler() throws InterruptedException { + Task t = mock(Task.class); + PeriodicTrigger p = mock(PeriodicTrigger.class); + when(p.getPeriod()).thenReturn(11L); + when(t.getPeriodicTrigger()).thenReturn(p); + + tested.addTask(t); + tested.start(); + + // WHEN a task is removed from a running scheduler + tested.removeTask(t); + // THEN it gets eventually un-scheduled + reset(executor); + verify(executor, timeout(10).never()).execute(t, p); + } + + @Test + public void testTriggeredTaskScheduledOnlyWhenTriggered() throws InterruptedException { + Task t = createTriggeredTask(); + Trigger tr = mock(Trigger.class); + + // WHEN a triggered task is added to a stopped scheduler and the trigger is triggered + tested.addTask(t); + testListener.triggered(t, tr); + // THEN the process in not scheduled + verify(executor, never()).execute(t, tr); + + // WHEN the scheduler is started with a registered triggered task + tested.start(); + // THEN it is not scheduled if no trigger is triggered + verify(executor, timeout(100).never()).execute(t, tr); + + // WHEN the corresponding trigger is triggered + testListener.triggered(t, tr); + // THEN the process is scheduled (exactly once) + // (we use a small timeout because the scheduler might have a separate thread for scheduling) + verify(executor, timeout(20).times(1)).execute(t, tr); + + + // WHEN the scheduler is stopped and the trigger is triggered + tested.stop(); + reset(executor); + testListener.triggered(t, tr); + // THEN the process in not scheduled anymore + verify(executor, never()).execute(t, tr); + + // WHEN the task is removed from a running scheduler and the trigger is triggered + tested.start(); + tested.removeTask(t); + testListener.triggered(t, tr); + // THEN the process in not scheduled + verify(executor, never()).execute(t, tr); + } + + @Test + public void testTriggerListenerRegisteredAfterAddWhenRunning() { + Task t = mock(Task.class); + + tested.start(); + + // WHEN a task is added to a running scheduler + tested.addTask(t); + // THEN the scheduler registers a trigger listener for the task + verify(t, times(1)).setTriggerListener(any(TaskTriggerListener.class)); + + // WHEN repeating the action + reset(t); + tested.addTask(t); + // THEN nothing happens anymore + verify(t, never()).setTriggerListener(any(TaskTriggerListener.class)); + } + + @Test + public void testTriggerListenerUnregisteredAfterRemoveWhenRunning() { + Task t = mock(Task.class); + tested.start(); + tested.addTask(t); + + // WHEN a task is removed from a running scheduler + tested.removeTask(t); + + // THEN the scheduler unregisters its trigger listener for the task + verify(t, times(1)).unsetTriggerListener(); + + // WHEN repeating the action + reset(t); + tested.removeTask(t); + // THEN nothing happens anymore + verify(t, never()).unsetTriggerListener(); + } + + @Test + public void testTriggerListenerRegisteredAfterStartWhenAdded() { + Task t = mock(Task.class); + tested.addTask(t); + + // WHEN a scheduler with a single added task is started + tested.start(); + // THEN the scheduler registers a trigger listener for the task + verify(t, times(1)).setTriggerListener(any(TaskTriggerListener.class)); + + // WHEN repeating the action + reset(t); + tested.start(); + // THEN nothing happens anymore + verify(t, never()).setTriggerListener(any(TaskTriggerListener.class)); + } + + // TODO: decide whether this is really needed + @Test + @Ignore + public void testTriggerListenerUnregisteredAfterStopWhenAdded() { + Task t = mock(Task.class); + tested.start(); + tested.addTask(t); + + // WHEN a scheduler with a single added task is stopped + tested.stop(); + + // THEN the scheduler unregisters its trigger listener for the task + verify(t, times(1)).unsetTriggerListener(); + + // WHEN repeating the action + reset(t); + tested.stop(); + // THEN nothing happens anymore + verify(t, never()).unsetTriggerListener(); + } + /** * Creates a purely triggered task which stores the given trigger listener * into {@link #testListener}. diff --git a/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedSchedulerTest.java b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedSchedulerTest.java new file mode 100644 index 000000000..72602ec8b --- /dev/null +++ b/jdeeco-core/test/cz/cuni/mff/d3s/deeco/scheduler/SingleThreadedSchedulerTest.java @@ -0,0 +1,19 @@ +package cz.cuni.mff.d3s.deeco.scheduler; + +import cz.cuni.mff.d3s.deeco.executor.Executor; + +/** + * + * + * @author Jaroslav Keznikl + * + */ +public class SingleThreadedSchedulerTest extends SchedulerTest { + + @Override + protected Scheduler setUpTested(Executor executor) { + Scheduler s = new SingleThreadedScheduler(); + s.setExecutor(executor); + return s; + } +} \ No newline at end of file