Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
("Broadcaster#setScope now working as expected")

Maker sure a new instance is always created when changing the scope at runtime.
  • Loading branch information
jfarcand committed Jul 22, 2010
1 parent 2e7871a commit 3396c8a
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 10 deletions.
Expand Up @@ -46,6 +46,7 @@
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
Expand Down Expand Up @@ -98,10 +99,11 @@ public class AtmosphereResourceImpl implements
/**
* Create an {@link AtmosphereResource}.
*
* @param config
* @param broadcaster The {@link Broadcaster}.
* @param req The {@link HttpServletRequest}
* @param res The {@link HttpServletResponse}
* @param config The {@link AtmosphereConfig}
* @param broadcaster The {@link Broadcaster}.
* @param req The {@link HttpServletRequest}
* @param res The {@link HttpServletResponse}
* @param cometSupport The {@link CometSupport}
*/
public AtmosphereResourceImpl(AtmosphereConfig config, Broadcaster broadcaster,
HttpServletRequest req, HttpServletResponse res,
Expand Down Expand Up @@ -184,6 +186,18 @@ public void suspend(long timeout, boolean flushComment) {
req.setAttribute(PRE_SUSPEND, "true");
action.type = AtmosphereServlet.Action.TYPE.SUSPEND;
action.timeout = timeout;

// TODO: We can possibly optimize that call by avoiding creating a Broadcaster if we are sure the Broadcaster
// is unique.
if (broadcaster.getScope() == Broadcaster.SCOPE.REQUEST) {
String id = broadcaster.getID();
Class<? extends Broadcaster> clazz = broadcaster.getClass();
broadcaster = BroadcasterFactory.getDefault().lookup(clazz, id, false);
if (broadcaster == null) {
broadcaster = BroadcasterFactory.getDefault().lookup(clazz, id + "/" + UUID.randomUUID(), true);
}
}

broadcaster.addAtmosphereResource(this);
req.removeAttribute(PRE_SUSPEND);
notifyListeners();
Expand Down
Expand Up @@ -48,6 +48,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -72,13 +73,13 @@ public class DefaultBroadcaster implements Broadcaster {
protected final ConcurrentLinkedQueue<AtmosphereResource<?,?>> events =
new ConcurrentLinkedQueue<AtmosphereResource<?,?>>();
protected BroadcasterConfig bc = AtmosphereServlet.getBroadcasterConfig();
protected final BlockingQueue<Entry> messages =
new LinkedBlockingQueue<Entry>();
protected final BlockingQueue<Entry> messages = new LinkedBlockingQueue<Entry>();
protected final AtomicBoolean started = new AtomicBoolean(false);
protected final AtomicBoolean destroyed = new AtomicBoolean(false);

protected SCOPE scope = SCOPE.APPLICATION;
protected String name = DefaultBroadcaster.class.getSimpleName();
protected final ConcurrentLinkedQueue<Entry> delayedBroadcast =
new ConcurrentLinkedQueue<Entry>();
protected final ConcurrentLinkedQueue<Entry> delayedBroadcast = new ConcurrentLinkedQueue<Entry>();
private Future<?> notifierFuture;
protected BroadcasterCache broadcasterCache;

Expand All @@ -104,7 +105,7 @@ public void destroy() {
notifierFuture.cancel(true);
}

if (bc != null) {
if (bc != null && scope != SCOPE.REQUEST) {
bc.destroy();
}

Expand All @@ -116,6 +117,7 @@ public void destroy() {
delayedBroadcast.clear();
broadcasterCache = null;
started.set(false);
destroyed.set(true);
}

/**
Expand All @@ -132,7 +134,22 @@ public void setScope(SCOPE scope) {
this.scope = scope;
try {
if (scope == SCOPE.REQUEST) {
broadcasterCache = bc.getBroadcasterCache().getClass().newInstance();
for (AtmosphereResource<?,?> r : events) {
Broadcaster b = BroadcasterFactory.getDefault().get(this.getClass(),
this.getClass().getSimpleName() + "/" + UUID.randomUUID());

if (DefaultBroadcaster.class.isAssignableFrom(this.getClass())) {
DefaultBroadcaster.class.cast(b).broadcasterCache = bc.getBroadcasterCache().getClass().newInstance();
}
r.setBroadcaster(b);
if (r.getAtmosphereResourceEvent().isSuspended()) {
b.addAtmosphereResource(r);
}
}

if (!events.isEmpty()) {
this.destroy();
}
}
} catch (Exception e) {
LoggerUtils.getLogger().log(Level.SEVERE, "", e);
Expand Down Expand Up @@ -353,6 +370,9 @@ public void setSuspendPolicy(long maxSuspendResource, POLICY policy) {
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

start();
msg = filter(msg);
if (msg == null) return null;
Expand Down Expand Up @@ -380,6 +400,9 @@ protected Object filter(Object msg) {
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

start();
msg = filter(msg);
if (msg == null) return null;
Expand All @@ -393,6 +416,9 @@ public Future<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

start();
msg = filter(msg);
if (msg == null) return null;
Expand All @@ -407,6 +433,8 @@ public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset)
*/
public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> r) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

if (scope == SCOPE.REQUEST && requestScoped.getAndSet(true)) {
throw new IllegalStateException("Broadcaster " + this
+ " cannot be used as its scope is set to REQUEST");
Expand Down Expand Up @@ -484,6 +512,9 @@ public Future<Object> delayBroadcast(Object o) {
* {@inheritDoc}
*/
public Future<Object> delayBroadcast(final Object o, long delay, TimeUnit t) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

final Object msg = filter(o);
if (msg == null) return null;

Expand Down Expand Up @@ -515,6 +546,9 @@ public Future<?> scheduleFixedBroadcast(final Object o, long period, TimeUnit t)
* {@inheritDoc}
*/
public Future<?> scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t) {

if (destroyed.get()) throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");

if (period == 0 || t == null) {
return null;
}
Expand Down
@@ -0,0 +1,216 @@
package org.atmosphere.tests;

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;
import org.apache.log4j.BasicConfigurator;
import org.atmosphere.container.JettyCometSupport;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventListener;
import org.atmosphere.cpr.AtmosphereServlet;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.DefaultBroadcaster;
import org.atmosphere.cpr.Meteor;
import org.atmosphere.cpr.MeteorServlet;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class BroadcasterScopeTest {

protected MeteorServlet atmoServlet;
protected final static String ROOT = "/*";
protected String urlTarget;
protected Server server;
protected Context root;
private static CountDownLatch servletLatch;
private static final AtomicReference<String> broadcasterId = new AtomicReference<String>();

public static class TestHelper {

public static int getEnvVariable(final String varName, int defaultValue) {
if (null == varName) {
return defaultValue;
}
String varValue = System.getenv(varName);
if (null != varValue) {
try {
return Integer.parseInt(varValue);
} catch (NumberFormatException e) {
// will return default value bellow
}
}
return defaultValue;
}
}

protected int findFreePort() throws IOException {
ServerSocket socket = null;

try {
socket = new ServerSocket(0);

return socket.getLocalPort();
}
finally {
if (socket != null) {
socket.close();
}
}
}

public static class Meteor1 extends HttpServlet {
@Override
public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOException {
final Meteor m = Meteor.build(req);
m.getBroadcaster().setScope(Broadcaster.SCOPE.REQUEST);
req.getSession().setAttribute("meteor", m);

m.suspend(5000, false);
broadcasterId.set(m.getBroadcaster().getID());

res.getOutputStream().write("resume".getBytes());
m.addListener(new AtmosphereResourceEventListener(){

@Override
public void onSuspend(final AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> event){
}

@Override
public void onResume(AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> event) {
}

@Override
public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> event) {
}

@Override
public void onBroadcast(AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> event) {
event.getResource().getRequest().setAttribute(AtmosphereServlet.RESUME_ON_BROADCAST, "true");
}
});

if (servletLatch != null) {
servletLatch.countDown();
}
}
}

@BeforeMethod(alwaysRun = true)
public void startServer() throws Exception {

int port = BaseTest.TestHelper.getEnvVariable("ATMOSPHERE_HTTP_PORT", findFreePort());
urlTarget = "http://127.0.0.1:" + port + "/invoke";

server = new Server(port);
root = new Context(server, "/", Context.SESSIONS);
atmoServlet = new MeteorServlet();
atmoServlet.addInitParameter("org.atmosphere.servlet", Meteor1.class.getName());
configureCometSupport();
root.addServlet(new ServletHolder(atmoServlet), ROOT);
server.start();
}

public void configureCometSupport() {
atmoServlet.setCometSupport(new JettyCometSupport(atmoServlet.getAtmosphereConfig()));
}

@AfterMethod(alwaysRun = true)
public void unsetAtmosphereHandler() throws Exception {
atmoServlet.destroy();
BasicConfigurator.resetConfiguration();
server.stop();
server = null;
}

@Test(timeOut = 60000)
public void testBroadcasterScope() {
System.out.println("Running testBroadcasterScope");
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);

servletLatch = new CountDownLatch(1);
AsyncHttpClient c = new AsyncHttpClient();
try {
long currentTime = System.currentTimeMillis();
final AtomicReference<Response> r = new AtomicReference();
c.prepareGet(urlTarget).execute(new AsyncCompletionHandler<Response>() {

@Override
public Response onCompleted(Response response) throws Exception {
r.set(response);
latch.countDown();
return response;
}
});

servletLatch.await();

String id = broadcasterId.get();

Broadcaster b = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, id);
assertNotNull(b);
b.broadcast("resume").get();

try {
latch.await();
} catch (InterruptedException e) {
fail(e.getMessage());
return;
}

long time = System.currentTimeMillis() - currentTime;
if (time < 5000) {
assertTrue(true);
} else {
assertFalse(false);
}
assertNotNull(r.get());
assertEquals(r.get().getStatusCode(), 200);
String resume = r.get().getResponseBody();
assertEquals(resume, "resumeresume");

c.prepareGet(urlTarget).execute(new AsyncCompletionHandler<Response>() {

@Override
public Response onCompleted(Response response) throws Exception {
r.set(response);
latch2.countDown();
return response;
}
});

try {
latch2.await();
} catch (InterruptedException e) {
fail(e.getMessage());
return;
}

assertFalse(id.equals(broadcasterId.get()));
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
c.close();
}

}

0 comments on commit 3396c8a

Please sign in to comment.