Navigation Menu

Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
("Add support for Callable<?> to Broadcaster.schedule")

Add support for Callable to be periodically scheduled.
  • Loading branch information
jfarcand committed Sep 21, 2010
1 parent 4f137ca commit 3c0079d
Showing 1 changed file with 62 additions and 36 deletions.
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

/**
* {@link Broadcaster} implementation.
* <p/>
Expand All @@ -70,8 +71,8 @@
*/
public class DefaultBroadcaster implements Broadcaster {

protected final ConcurrentLinkedQueue<AtmosphereResource<?,?>> events =
new ConcurrentLinkedQueue<AtmosphereResource<?,?>>();
protected final ConcurrentLinkedQueue<AtmosphereResource<?, ?>> events =
new ConcurrentLinkedQueue<AtmosphereResource<?, ?>>();
protected BroadcasterConfig bc = AtmosphereServlet.getBroadcasterConfig();
protected final BlockingQueue<Entry> messages = new LinkedBlockingQueue<Entry>();
protected final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -104,12 +105,12 @@ public void destroy() {
if (notifierFuture != null) {
notifierFuture.cancel(true);
}

if (bc != null && scope != SCOPE.REQUEST) {
bc.destroy();
}
if (broadcasterCache != null){

if (broadcasterCache != null) {
broadcasterCache.stop();
}
events.clear();
Expand All @@ -123,7 +124,7 @@ public void destroy() {
/**
* {@inheritDoc}
*/
public Collection<AtmosphereResource<?,?>> getAtmosphereResources() {
public Collection<AtmosphereResource<?, ?>> getAtmosphereResources() {
return Collections.unmodifiableCollection(events);
}

Expand All @@ -134,7 +135,7 @@ public void setScope(SCOPE scope) {
this.scope = scope;
try {
if (scope == SCOPE.REQUEST) {
for (AtmosphereResource<?,?> r : events) {
for (AtmosphereResource<?, ?> r : events) {
Broadcaster b = BroadcasterFactory.getDefault().get(this.getClass(),
this.getClass().getSimpleName() + "/" + UUID.randomUUID());

Expand All @@ -146,7 +147,7 @@ public void setScope(SCOPE scope) {
b.addAtmosphereResource(r);
}
}

if (!events.isEmpty()) {
this.destroy();
}
Expand Down Expand Up @@ -188,7 +189,7 @@ public String getID() {
* {@inheritDoc}
*/
public void resumeAll() {
for (AtmosphereResource<?,?> r : events) {
for (AtmosphereResource<?, ?> r : events) {
r.resume();
}
}
Expand Down Expand Up @@ -280,21 +281,21 @@ protected void push(Entry msg) {
}

if (msg.eventsToPush == null) {
for (AtmosphereResource<?,?> r : events) {
for (AtmosphereResource<?, ?> r : events) {
push(r, msg.message);
}
} else if (msg.eventsToPush instanceof AtmosphereResource<?,?>) {
push((AtmosphereResource<?,?>) msg.eventsToPush, msg.message);
} else if (msg.eventsToPush instanceof AtmosphereResource<?, ?>) {
push((AtmosphereResource<?, ?>) msg.eventsToPush, msg.message);
} else if (msg.eventsToPush instanceof Set) {
Set<AtmosphereResource<?,?>> sub = (Set<AtmosphereResource<?,?>>) msg.eventsToPush;
for (AtmosphereResource<?,?> r : sub) {
Set<AtmosphereResource<?, ?>> sub = (Set<AtmosphereResource<?, ?>>) msg.eventsToPush;
for (AtmosphereResource<?, ?> r : sub) {
push(r, msg.message);
}
}
msg.message = prevMessage;
}

protected void push(AtmosphereResource<?,?> r, Object msg) {
protected void push(AtmosphereResource<?, ?> r, Object msg) {
AtmosphereResourceEvent e = null;
synchronized (r) {
if (!r.getAtmosphereResourceEvent().isSuspended())
Expand All @@ -310,11 +311,11 @@ protected void push(AtmosphereResource<?,?> r, Object msg) {
if (r.getAtmosphereResourceEvent() != null && !r.getAtmosphereResourceEvent().isCancelled()
&& HttpServletRequest.class.isAssignableFrom(r.getRequest().getClass())) {
try {
HttpServletRequest.class.cast(r.getRequest()).setAttribute(CometSupport.MAX_INACTIVE, (Long)System.currentTimeMillis());
HttpServletRequest.class.cast(r.getRequest()).setAttribute(CometSupport.MAX_INACTIVE, (Long) System.currentTimeMillis());
} catch (Throwable t) {
// Shield us from any corrupted Request
if (LoggerUtils.getLogger().isLoggable(Level.FINE)) {
LoggerUtils.getLogger().log(Level.FINE,"Preventing corruption of a recycled request",e);
LoggerUtils.getLogger().log(Level.FINE, "Preventing corruption of a recycled request", e);
events.remove(r);
return;
}
Expand All @@ -324,14 +325,14 @@ protected void push(AtmosphereResource<?,?> r, Object msg) {
}
}

protected void checkCachedAndPush(AtmosphereResource<?,?> r, AtmosphereResourceEvent e) {
protected void checkCachedAndPush(AtmosphereResource<?, ?> r, AtmosphereResourceEvent e) {
retrieveTrackedBroadcast(r, e);
if (e.getMessage() instanceof List && !((List) e.getMessage()).isEmpty()) {
broadcast(r, e);
}
}

protected boolean retrieveTrackedBroadcast(final AtmosphereResource<?,?> r, final AtmosphereResourceEvent e) {
protected boolean retrieveTrackedBroadcast(final AtmosphereResource<?, ?> r, final AtmosphereResourceEvent e) {
List<Object> missedMsg = broadcasterCache.retrieveFromCache(r);
if (!missedMsg.isEmpty()) {
e.setMessage(missedMsg);
Expand All @@ -340,21 +341,21 @@ protected boolean retrieveTrackedBroadcast(final AtmosphereResource<?,?> r, fina
return false;
}

protected void trackBroadcastMessage(final AtmosphereResource<?,?> r, Object msg) {
protected void trackBroadcastMessage(final AtmosphereResource<?, ?> r, Object msg) {
broadcasterCache.addToCache(r, msg);
}

protected void broadcast(final AtmosphereResource<?,?> r, final AtmosphereResourceEvent e) {
protected void broadcast(final AtmosphereResource<?, ?> r, final AtmosphereResourceEvent e) {
try {
r.getAtmosphereConfig().getAtmosphereHandler(this).onStateChange(e);
} catch (IOException ex) {
onException(ex, r);
} catch (RuntimeException ex) {
onException(ex, r);
}
}
}

protected void onException(Throwable t, AtmosphereResource<?,?> r) {
protected void onException(Throwable t, AtmosphereResource<?, ?> r) {
if (LoggerUtils.getLogger().isLoggable(Level.FINE)) {
LoggerUtils.getLogger().log(Level.FINE, "", t);
}
Expand Down Expand Up @@ -404,7 +405,7 @@ protected Object filter(Object msg) {
/**
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {
public Future<Object> broadcast(Object msg, AtmosphereResource<?, ?> r) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

Expand All @@ -420,9 +421,9 @@ public Future<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {
/**
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset) {
public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?, ?>> subset) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");
if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

start();
msg = filter(msg);
Expand All @@ -436,9 +437,9 @@ public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset)
/**
* {@inheritDoc}
*/
public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> r) {
public AtmosphereResource<?, ?> addAtmosphereResource(AtmosphereResource<?, ?> r) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");
if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

if (scope == SCOPE.REQUEST && requestScoped.getAndSet(true)) {
throw new IllegalStateException("Broadcaster " + this
Expand All @@ -451,7 +452,7 @@ public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> r)
try {
events.poll().resume();
} catch (Throwable t) {
LoggerUtils.getLogger().log(Level.WARNING,"addAtmosphereResource",t);
LoggerUtils.getLogger().log(Level.WARNING, "addAtmosphereResource", t);
}
} else if (policy == POLICY.REJECT) {
throw new RejectedExecutionException(String.format("Maximum suspended AtmosphereResources %s", maxSuspendResource));
Expand All @@ -475,7 +476,7 @@ public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> r)
/**
* {@inheritDoc}
*/
public AtmosphereResource<?,?> removeAtmosphereResource(AtmosphereResource r) {
public AtmosphereResource<?, ?> removeAtmosphereResource(AtmosphereResource r) {
if (!events.contains(r)) {
return null;
}
Expand Down Expand Up @@ -523,24 +524,37 @@ public Future<Object> delayBroadcast(final Object o, long delay, TimeUnit t) {
final Object msg = filter(o);
if (msg == null) return null;

Future<Object> f = new BroadcasterFuture<Object>(msg);
final Entry e = new Entry(msg, null, f);
final Future<Object> future = new BroadcasterFuture<Object>(msg);
final Entry e = new Entry(msg, null, future);
Future<Object> f;
if (delay > 0) {
f = bc.getScheduledExecutorService().schedule(new Callable<Object>() {

public Object call() throws Exception {
delayedBroadcast.remove(e);
if (Callable.class.isAssignableFrom(o.getClass())) {
try {
Object r = Callable.class.cast(o).call();
if (r != null) {
Entry entry = new Entry(r, null, null);
push(entry);
}
return msg;
} catch (Exception e1) {
LoggerUtils.getLogger().log(Level.SEVERE, "", e);
}
}
push(e);
return msg;
}
}, delay, t);
e.future = f;
}
delayedBroadcast.offer(e);
return f;
return future;
}

/**
/**
* {@inheritDoc}
*/
public Future<?> scheduleFixedBroadcast(final Object o, long period, TimeUnit t) {
Expand All @@ -552,7 +566,7 @@ public Future<?> scheduleFixedBroadcast(final Object o, long period, TimeUnit t)
*/
public Future<?> scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");
if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

if (period == 0 || t == null) {
return null;
Expand All @@ -564,12 +578,24 @@ public Future<?> scheduleFixedBroadcast(final Object o, long waitFor, long perio
final Entry e = new Entry(o, null, null);
return bc.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable() {
public void run() {
if (Callable.class.isAssignableFrom(o.getClass())) {
try {
Object r = Callable.class.cast(o).call();
if (r != null) {
Entry entry = new Entry(r, null, null);
push(entry);
}
return;
} catch (Exception e1) {
LoggerUtils.getLogger().log(Level.SEVERE, "", e);
}
}
push(e);
}
}, waitFor, period, t);
}

public String toString(){
public String toString() {
return new StringBuilder(this.getClass().getName()).append("@").append(this.hashCode()).append("\n")
.append("\tName: ").append(name).append("\n")
.append("\tScope: ").append(scope).append("\n")
Expand Down

0 comments on commit 3c0079d

Please sign in to comment.