Skip to content

Commit

Permalink
Introduce SmartFuture will callbacks, and use it for Controller.waitF…
Browse files Browse the repository at this point in the history
…or(Future f)
  • Loading branch information
guillaumebort committed Jan 23, 2011
1 parent 7d2697d commit 00e2aa8
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 53 deletions.
84 changes: 45 additions & 39 deletions framework/src/play/Invoker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import play.classloading.enhancers.LocalvariablesNamesEnhancer.LocalVariablesNamesTracer;
import play.exceptions.PlayException;
import play.exceptions.UnexpectedException;
import play.utils.Action;
import play.utils.SmartFuture;

/**
* Run some code in a Play! context
Expand Down Expand Up @@ -66,10 +68,8 @@ public static void invokeInThread(DirectInvocation invocation) {
retry = false;
} else {
try {
if (invocation.retry.tasks != null) {
for (Future<?> f : invocation.retry.tasks) {
f.get();
}
if (invocation.retry.task != null) {
invocation.retry.task.get();
} else {
Thread.sleep(invocation.retry.timeout);
}
Expand All @@ -91,8 +91,8 @@ public static class InvocationContext {

public static InvocationContext current() {
return current.get();
}
}

public InvocationContext(List<Annotation> annotations) {
this.annotations = annotations;
}
Expand All @@ -102,7 +102,7 @@ public InvocationContext(Annotation[] annotations) {
}

public InvocationContext(Annotation[]... annotations) {
for(Annotation[] some : annotations) {
for (Annotation[] some : annotations) {
this.annotations.addAll(Arrays.asList(some));
}
}
Expand All @@ -113,17 +113,17 @@ public List<Annotation> getAnnotations() {

@SuppressWarnings("unchecked")
public <T extends Annotation> T getAnnotation(Class<T> clazz) {
for(Annotation annotation : annotations) {
if(annotation.annotationType().isAssignableFrom(clazz)) {
return (T)annotation;
for (Annotation annotation : annotations) {
if (annotation.annotationType().isAssignableFrom(clazz)) {
return (T) annotation;
}
}
return null;
}

public <T extends Annotation> boolean isAnnotationPresent(Class<T> clazz) {
for(Annotation annotation : annotations) {
if(annotation.annotationType().isAssignableFrom(clazz)) {
for (Annotation annotation : annotations) {
if (annotation.annotationType().isAssignableFrom(clazz)) {
return true;
}
}
Expand All @@ -133,12 +133,11 @@ public <T extends Annotation> boolean isAnnotationPresent(Class<T> clazz) {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for(Annotation annotation : annotations) {
for (Annotation annotation : annotations) {
builder.append(annotation.toString()).append(",");
}
return builder.toString();
}

}

/**
Expand Down Expand Up @@ -228,8 +227,8 @@ public void onException(Throwable e) {
* @param suspendRequest
*/
public void suspend(Suspend suspendRequest) {
if (suspendRequest.tasks != null) {
WaitForTasksCompletion.waitFor(suspendRequest.tasks, this);
if (suspendRequest.task != null) {
WaitForTasksCompletion.waitFor(suspendRequest.task, this);
} else {
Invoker.invoke(this, suspendRequest.timeout);
}
Expand Down Expand Up @@ -305,17 +304,18 @@ public static class Suspend extends PlayException {
* Suspend for a timeout (in milliseconds).
*/
long timeout;

/**
* Wait for task execution.
*/
List<Future<?>> tasks;
Future<?> task;

public Suspend(long timeout) {
this.timeout = timeout;
}

public Suspend(Future<?>... tasks) {
this.tasks = Arrays.asList(tasks);
public Suspend(Future<?> task) {
this.task = task;
}

@Override
Expand All @@ -325,8 +325,8 @@ public String getErrorTitle() {

@Override
public String getErrorDescription() {
if (tasks != null) {
return "Wait for " + tasks;
if (task != null) {
return "Wait for " + task;
}
return "Retry in " + timeout + " ms.";
}
Expand All @@ -337,44 +337,50 @@ public String getErrorDescription() {
*/
static class WaitForTasksCompletion extends Thread {

Map<List<Future<?>>, Invocation> queue;
static WaitForTasksCompletion instance;
Map<Future<?>, Invocation> queue;

public WaitForTasksCompletion() {
queue = new ConcurrentHashMap<List<Future<?>>, Invocation>();
queue = new ConcurrentHashMap<Future<?>, Invocation>();
setName("WaitForTasksCompletion");
setDaemon(true);
start();
}

public static void waitFor(List<Future<?>> tasks, Invocation invocation) {
if (instance == null) {
instance = new WaitForTasksCompletion();
public static void waitFor(Future<?> task, final Invocation invocation) {
if (task instanceof SmartFuture) {
SmartFuture smartFuture = (SmartFuture)task;
smartFuture.onCompletion(new Action() {
public void invoke(Object result) {
executor.submit(invocation);
}
});
} else {
synchronized (WaitForTasksCompletion.class) {
if (instance == null) {
instance = new WaitForTasksCompletion();
Logger.warn("Start WaitForTasksCompletion");
instance.start();
}
instance.queue.put(task, invocation);
}
}
instance.queue.put(tasks, invocation);
}

@Override
public void run() {
while (true) {
try {
if (!queue.isEmpty()) {
for (List<Future<?>> tasks : new HashSet<List<Future<?>>>(queue.keySet())) {
boolean allDone = true;
for (Future<?> f : tasks) {
if (!f.isDone()) {
allDone = false;
}
}
if (allDone) {
executor.submit(queue.get(tasks));
queue.remove(tasks);
for (Future<?> task : new HashSet<Future<?>>(queue.keySet())) {
if (task.isDone()) {
executor.submit(queue.get(task));
queue.remove(task);
}
}
}
Thread.sleep(50);
} catch (InterruptedException ex) {
Logger.warn(ex, "");
Logger.warn(ex, "While waiting for task completions");
}
}
}
Expand Down
20 changes: 18 additions & 2 deletions framework/src/play/jobs/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import play.exceptions.JavaExecutionException;
import play.exceptions.PlayException;
import play.libs.Time;
import play.utils.SmartFuture;

/**
* A job is an asynchronously executed unit of work
Expand Down Expand Up @@ -54,8 +55,21 @@ public void execute() throws Exception {
* Start this job now (well ASAP)
* @return the job completion
*/
public Future<V> now() {
return JobsPlugin.executor.submit((Callable<V>) this);
public SmartFuture<V> now() {
final SmartFuture<V> smartFuture = new SmartFuture<V>();

Future<V> realFuture = JobsPlugin.executor.submit(new Callable<V>() {

public V call() throws Exception {
V result = Job.this.call();
smartFuture.invoke(result);
return result;
}

});

smartFuture.wrap(realFuture);
return smartFuture;
}

/**
Expand Down Expand Up @@ -154,4 +168,6 @@ public void _finally() {
public String toString() {
return this.getClass().getName();
}


}
28 changes: 18 additions & 10 deletions framework/src/play/libs/ws/WSAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.ning.http.client.ProxyServer;
import com.ning.http.client.Response;
import com.ning.http.client.StringPart;
import play.utils.SmartFuture;

/**
* Simple HTTP client to make webservices requests.
Expand Down Expand Up @@ -121,7 +122,7 @@ public HttpResponse get() {

/** Execute a GET request asynchronously. */
@Override
public Future<HttpResponse> getAsync() {
public SmartFuture<HttpResponse> getAsync() {
this.type = "GET";
sign();
return execute(httpClient.prepareGet(url));
Expand All @@ -141,7 +142,7 @@ public HttpResponse post() {

/** Execute a POST request asynchronously.*/
@Override
public Future<HttpResponse> postAsync() {
public SmartFuture<HttpResponse> postAsync() {
this.type = "POST";
sign();
return execute(httpClient.preparePost(url));
Expand All @@ -160,7 +161,7 @@ public HttpResponse put() {

/** Execute a PUT request asynchronously.*/
@Override
public Future<HttpResponse> putAsync() {
public SmartFuture<HttpResponse> putAsync() {
this.type = "PUT";
return execute(httpClient.preparePut(url));
}
Expand All @@ -178,7 +179,7 @@ public HttpResponse delete() {

/** Execute a DELETE request asynchronously.*/
@Override
public Future<HttpResponse> deleteAsync() {
public SmartFuture<HttpResponse> deleteAsync() {
this.type = "DELETE";
return execute(httpClient.prepareDelete(url));
}
Expand All @@ -196,7 +197,7 @@ public HttpResponse options() {

/** Execute a OPTIONS request asynchronously.*/
@Override
public Future<HttpResponse> optionsAsync() {
public SmartFuture<HttpResponse> optionsAsync() {
this.type = "OPTIONS";
return execute(httpClient.prepareOptions(url));
}
Expand All @@ -214,7 +215,7 @@ public HttpResponse head() {

/** Execute a HEAD request asynchronously.*/
@Override
public Future<HttpResponse> headAsync() {
public SmartFuture<HttpResponse> headAsync() {
this.type = "HEAD";
return execute(httpClient.prepareHead(url));
}
Expand All @@ -228,7 +229,7 @@ public HttpResponse trace() {

/** Execute a TRACE request asynchronously.*/
@Override
public Future<HttpResponse> traceAsync() {
public SmartFuture<HttpResponse> traceAsync() {
this.type = "TRACE";
throw new NotImplementedException();
}
Expand Down Expand Up @@ -260,18 +261,25 @@ private BoundRequestBuilder prepare(BoundRequestBuilder builder) {
return builder;
}

private Future<HttpResponse> execute(BoundRequestBuilder builder) {
private SmartFuture<HttpResponse> execute(BoundRequestBuilder builder) {
try {
return prepare(builder).execute(new AsyncCompletionHandler<HttpResponse>() {
final SmartFuture<HttpResponse> smartFuture = new SmartFuture<HttpResponse>();

Future<HttpResponse> realFuture = prepare(builder).execute(new AsyncCompletionHandler<HttpResponse>() {
@Override
public HttpResponse onCompleted(Response response) throws Exception {
return new HttpAsyncResponse(response);
HttpResponse httpResponse = new HttpAsyncResponse(response);
smartFuture.invoke(httpResponse);
return httpResponse;
}
@Override
public void onThrowable(Throwable t) {
throw new RuntimeException(t);
}
});

smartFuture.wrap(realFuture);
return smartFuture;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
4 changes: 2 additions & 2 deletions framework/src/play/mvc/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,9 @@ protected static void suspend(int millis) {
*
* @param tasks
*/
protected static void waitFor(Future<?>... tasks) {
protected static void waitFor(Future<?> task) {
Request.current().isNew = false;
throw new Suspend(tasks);
throw new Suspend(task);
}

/**
Expand Down
6 changes: 6 additions & 0 deletions framework/src/play/utils/Action.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package play.utils;

public interface Action<T> {

void invoke(T result);
}
Loading

0 comments on commit 00e2aa8

Please sign in to comment.