Skip to content
This repository has been archived by the owner on Feb 26, 2023. It is now read-only.

Commit

Permalink
Better task manager implementation
Browse files Browse the repository at this point in the history
The previous implementation used a set for serial running and a map of
lists of tasks for managing the serial queues.

In practice, there will be very few parallel tasks, using maps and
creating/destroying lists is complex and unefficient.

A better approach is to use only a list of tasks (only the ones we need
to keep, having a non-null serial) and run through it sequentially for
retrieving tasks.

Moreover, it is more general, and paves the way for adding a task
cancellation feature.
  • Loading branch information
rom1v committed Apr 22, 2013
1 parent 2045e41 commit 5254f56
Showing 1 changed file with 126 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
package org.androidannotations.api;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -30,92 +26,86 @@ public class BackgroundExecutor {

private static Executor executor = Executors.newScheduledThreadPool(2 * Runtime.getRuntime().availableProcessors());

/*
* serialRunning is used as a lock in synchronized blocks for both
* serialRunning and serialQueues access
*/

/* Set of queueIds having a currently running task */
private static final Set<String> serialRunning = new HashSet<String>();

/* Tasks queues for each serial */
private static final Map<String, List<Task>> serialQueues = new HashMap<String, List<Task>>();
private static final List<Task> tasks = new ArrayList<Task>();

/**
* Execute a task after (at least) the given delay <strong>and</strong>
* after all tasks added with the same non-null <code>serial</code> (if any)
* have completed execution.
* Execute a runnable after the given delay.
*
* @param runnable
* the task to execute
* @param delay
* the time from now to delay execution, in milliseconds
* @param serial
* the serial queue to use (<code>null</code> or <code>""</code>
* for no serial execution)
* @throws IllegalArgumentException
* if <code>delay</code> is strictly positive and the current
* executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static void execute(Runnable runnable, int delay, String serial) {
/* "" means null (a default annotation String value cannot be null) */
if (serial == null || serial.isEmpty()) {
if (delay > 0) {
/* no serial, but a delay: schedule the task */
if (!(executor instanceof ScheduledExecutorService)) {
throw new IllegalArgumentException("The executor set does not support scheduling");
}
((ScheduledExecutorService) executor).schedule(runnable, delay, TimeUnit.MILLISECONDS);
} else {
/* no serial, no delay: execute now */
executor.execute(runnable);
private static void directExecute(Runnable runnable, int delay) {
if (delay > 0) {
/* no serial, but a delay: schedule the task */
if (!(executor instanceof ScheduledExecutorService)) {
throw new IllegalArgumentException("The executor set does not support scheduling");
}
((ScheduledExecutorService) executor).schedule(runnable, delay, TimeUnit.MILLISECONDS);
} else {
/* serial is defined, the delay is managed by Task */
Task task = new Task(runnable, delay, serial);

synchronized (serialRunning) {
if (serialRunning.contains(serial)) {
/* a task for this serial is already running, queue this one */
List<Task> queue = serialQueues.get(serial);
if (queue == null) {
/* the queue does not exist yet */
queue = new ArrayList<Task>();
serialQueues.put(serial, queue);
}
/* queue the task for later execution */
queue.add(task);
} else {
/* mark this serial as having a running task */
serialRunning.add(serial);
/* execute the task (a wrapper for runnable) now */
execute(task, delay); /* do not pass serial here */
}
}
/* no serial, no delay: execute now */
executor.execute(runnable);
}
}

/**
* Execute a task.
* Execute a task after (at least) its delay <strong>and</strong> after all
* tasks added with the same non-null <code>serial</code> (if any) have
* completed execution.
*
* Equivalent to {@link #execute(Runnable, int, String) execute(runnable, 0,
* null)}.
* @param task
* the task to execute
* @throws IllegalArgumentException
* if <code>task.delay</code> is strictly positive and the
* current executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static synchronized void execute(Task task) {
if (task.serial == null || !hasSerialRunning(task.serial)) {
task.executionAsked = true;
directExecute(task, task.delay);
}
if (task.serial != null) {
/* keep task */
tasks.add(task);
}
}

/**
* Execute a task.
*
* @param runnable
* the task to execute
* @param delay
* the time from now to delay execution, in milliseconds
* @param serial
* the serial queue (<code>null</code> or <code>""</code> for no
* serial execution)
* @throws IllegalArgumentException
* if <code>delay</code> is strictly positive and the current
* executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static void execute(Runnable runnable) {
execute(runnable, 0, null);
public static void execute(final Runnable runnable, int delay, String serial) {
execute(new Task(delay, serial) {
@Override
public void execute() {
runnable.run();
}
});
}

/**
* Execute a task after the given delay.
*
* Equivalent to {@link #execute(Runnable, int, String) execute(runnable,
* delay, null)}.
*
* @param runnable
* the task to execute
* @param delay
Expand All @@ -127,7 +117,17 @@ public static void execute(Runnable runnable) {
* executor)
*/
public static void execute(Runnable runnable, int delay) {
execute(runnable, delay, null);
directExecute(runnable, delay);
}

/**
* Execute a task.
*
* @param runnable
* the task to execute
*/
public static void execute(Runnable runnable) {
directExecute(runnable, 0);
}

/**
Expand Down Expand Up @@ -160,50 +160,88 @@ public static void setExecutor(Executor executor) {
BackgroundExecutor.executor = executor;
}

private static class Task implements Runnable {
/**
* Indicates whether a task with the specified <code>serial</code> has been
* submitted to the executor.
*
* @param serial
* the serial queue
* @return <code>true</code> if such a task has been submitted,
* <code>false</code> otherwise
*/
private static boolean hasSerialRunning(String serial) {
for (Task task : tasks) {
if (task.executionAsked && serial.equals(task.serial)) {
return true;
}
}
return false;
}

/**
* Retrieve and remove the first task having the specified
* <code>serial</code> (if any).
*
* @param serial
* the serial queue
* @return task if found, <code>null</code> otherwise
*/
private static Task take(String serial) {
int len = tasks.size();
for (int i = 0; i < len; i++) {
if (serial.equals(tasks.get(i).serial)) {
return tasks.remove(i);
}
}
return null;
}

public static abstract class Task implements Runnable {

Runnable runnable;
long targetTime; /* in milliseconds since epoch */
String serial;
private int delay;
private long targetTime; /* in milliseconds since epoch */
private String serial;
private boolean executionAsked;

Task(Runnable runnable, int delay, String serial) {
this.runnable = runnable;
public Task(int delay, String serial) {
if (delay > 0) {
this.delay = delay;
targetTime = System.currentTimeMillis() + delay;
}
this.serial = serial;
if (!"".equals(serial)) {
this.serial = serial;
}
}

@Override
public void run() {
try {
runnable.run();
execute();
} finally {
/* handle next tasks */
postExecute();
}
}

public abstract void execute();

private void postExecute() {
synchronized (serialRunning) {
List<Task> queue = serialQueues.get(serial);
if (queue == null) {
/* no task is queue for this serial, mark it as not running */
serialRunning.remove(serial);
} else {
/* queue is not empty, retrieve the oldest queued task */
Task nextTask = queue.remove(0);

if (queue.isEmpty()) {
/* no more tasks in the queue */
serialQueues.remove(serial);
if (serial == null) {
/* nothing to do */
return;
}
synchronized (BackgroundExecutor.class) {
/* execution complete */
tasks.remove(this);

Task next = take(serial);
if (next != null) {
if (next.delay != 0) {
/* compute remaining delay */
next.delay = Math.max(0, (int) (targetTime - System.currentTimeMillis()));
}

/* compute the remaining delay */
int delay = Math.max(0, (int) (nextTask.targetTime - System.currentTimeMillis()));

/* execute the next task */
execute(nextTask, delay); /* do not pass serial here */
/* a task having the same serial was queued, execute it */
BackgroundExecutor.execute(next);
}
}
}
Expand Down

0 comments on commit 5254f56

Please sign in to comment.