Navigation Menu

Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
(" Shared BroadcasterConfig instance ")

We are now back with 1-1 relation.
  • Loading branch information
jfarcand committed Oct 22, 2010
1 parent 664a0f5 commit eef217f
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 72 deletions.
Expand Up @@ -448,18 +448,13 @@ void onBroadcast(AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse
@Override
public String toString() {
return "AtmosphereResourceImpl{" +
"req=" + req +
", res=" + res +
", action=" + action +
", broadcaster=" + broadcaster +
", config=" + config +
", broadcaster=" + broadcaster.getClass().getName() +
", cometSupport=" + cometSupport +
", serializer=" + serializer +
", isInScope=" + isInScope +
", event=" + event +
", useWriter=" + useWriter +
", listeners=" + listeners +
", isSuspendEvent=" + isSuspendEvent +
'}';
}

Expand Down
51 changes: 14 additions & 37 deletions modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
Expand Up @@ -166,6 +166,8 @@
* @author Jeanfrancois Arcand
*/
public class AtmosphereServlet extends AbstractAsyncServlet implements CometProcessor, HttpEventServlet {
public final static Logger logger = LoggerUtils.getLogger();

public final static String JERSEY_BROADCASTER = "org.atmosphere.jersey.JerseyBroadcaster";
public final static String REDIS_BROADCASTER = "org.atmosphere.plugin.redis.RedisBroadcaster";
public final static String JMS_BROADCASTER = "org.atmosphere.plugin.jms.JMSBroadcaster";
Expand All @@ -192,7 +194,6 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
public final static String SUPPORT_SESSION = "org.atmosphere.cpr.AsynchronousProcessor.supportSession";
public final static String ATMOSPHERE_HANDLER = AtmosphereHandler.class.getName();
public final static String WEBSOCKET_ATMOSPHEREHANDLER = WebSocketAtmosphereHandler.class.getName();
public final static Logger logger = LoggerUtils.getLogger();
public final static String RESUME_AND_KEEPALIVE = AtmosphereServlet.class.getName() + ".resumeAndKeepAlive";
public final static String RESUMED_ON_TIMEOUT = AtmosphereServlet.class.getName() + ".resumedOnTimeout";
public final static String DEFAULT_NAMED_DISPATCHER = "default";
Expand All @@ -203,6 +204,8 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
protected final AtmosphereConfig config = new AtmosphereConfig();
protected final AtomicBoolean isCometSupportConfigured = new AtomicBoolean(false);
protected final boolean isFilter;
public static String[] broadcasterFilters = new String[0];

/**
* The list of {@link AtmosphereHandler} and their associated mapping.
*/
Expand All @@ -222,8 +225,7 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
protected boolean isBroadcasterSpecified = false;
protected boolean isSessionSupportSpecified = false;
private BroadcasterFactory broadcasterFactory;
private static BroadcasterConfig broadcasterConfig = new BroadcasterConfig();
private String broadcasterCacheClassName;
protected static String broadcasterCacheClassName;
private boolean webSocketEnabled = false;

public final static class AtmosphereHandlerWrapper {
Expand Down Expand Up @@ -563,7 +565,7 @@ protected void configureBroadcaster() throws ClassNotFoundException, Instantiati

if (broadcasterFactory == null) {
broadcasterFactory = new DefaultBroadcasterFactory((Class<Broadcaster>)
Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName), broadcasterConfig);
Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName));
config.broadcasterFactory = broadcasterFactory;
}

Expand All @@ -573,19 +575,20 @@ protected void configureBroadcaster() throws ClassNotFoundException, Instantiati
while (i.hasNext()) {
e = i.next();
w = e.getValue();
BroadcasterConfig broadcasterConfig = new BroadcasterConfig(broadcasterFilters);

if (w.broadcaster == null) {
w.broadcaster = broadcasterFactory.get();
} else {
w.broadcaster.setBroadcasterConfig(broadcasterConfig);
if (broadcasterCacheClassName != null) {
broadcasterConfig.setBroadcasterCache((BroadcasterCache)
Thread.currentThread().getContextClassLoader().loadClass(broadcasterCacheClassName).newInstance());
}
}
w.broadcaster.setID(e.getKey());
}

if (broadcasterCacheClassName != null) {
broadcasterConfig.setBroadcasterCache((BroadcasterCache)
Thread.currentThread().getContextClassLoader().loadClass(broadcasterCacheClassName).newInstance());
}

logger.info("Using " + broadcasterClassName);
}

Expand Down Expand Up @@ -651,7 +654,7 @@ protected void doInitParams(ServletConfig sc) {
}
s = sc.getInitParameter(BROADCAST_FILTER_CLASSES);
if (s != null) {
configureBroadcasterFilter(s.split(","));
broadcasterFilters = s.split(",");
}
}

Expand All @@ -675,21 +678,6 @@ protected void loadConfiguration(ServletConfig sc) throws ServletException {
}
}

void configureBroadcasterFilter(String[] list){
for (String broadcastFilter: list) {
try {
broadcasterConfig.addFilter(BroadcastFilter.class.cast(
Thread.currentThread().getContextClassLoader().loadClass(broadcastFilter).newInstance()));
} catch (InstantiationException e) {
logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e);
} catch (IllegalAccessException e) {
logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e);
} catch (ClassNotFoundException e) {
logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e);
}
}
}

/**
* Auto-detect Jersey when no atmosphere.xml file are specified.
*
Expand Down Expand Up @@ -763,7 +751,6 @@ boolean detectGoogleAppEngine(ServletConfig sc) {
broadcasterClassName = GAE_BROADCASTER;
isBroadcasterSpecified = true;
cometSupport = new GoogleAppEngineCometSupport(config);
broadcasterConfig = new GAEBroadcasterConfig();
return true;
} else {
return false;
Expand Down Expand Up @@ -802,7 +789,6 @@ public void destroy() {
}
BroadcasterFactory.factory = null;
BroadcasterFactory.getDefault().destroy();
broadcasterConfig = new BroadcasterConfig();
}

/**
Expand Down Expand Up @@ -879,7 +865,7 @@ protected void loadAtmosphereDotXml(InputStream stream, URLClassLoader c)
}

if (reader.getBroadcastFilterClasses() != null){
configureBroadcasterFilter(reader.getBroadcastFilterClasses());
broadcasterFilters = reader.getBroadcastFilterClasses();
}

} catch (Throwable t) {
Expand Down Expand Up @@ -1285,15 +1271,6 @@ public AtmosphereServlet setBroadcasterFactory(final BroadcasterFactory broadcas
return this;
}

/**
* Return the {@link BroadcasterConfig} used by this instance.
*
* @return {@link BroadcasterConfig}
*/
public static BroadcasterConfig getBroadcasterConfig() {
return broadcasterConfig;
}

/**
* Return the {@link org.atmosphere.cpr.BroadcasterCache} class name.
*
Expand Down
Expand Up @@ -38,6 +38,7 @@
package org.atmosphere.cpr;

import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;
import org.atmosphere.util.LoggerUtils;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -47,6 +48,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Handle {@link Broadcaster} configuration like {@link ExecutorService} and
Expand All @@ -55,6 +58,7 @@
* @author Jeanfrancois Arcand
*/
public class BroadcasterConfig {
public final static Logger logger = LoggerUtils.getLogger();

final static int numOfProcessor = Runtime.getRuntime().availableProcessors();

Expand All @@ -71,8 +75,27 @@ public class BroadcasterConfig {

private BroadcasterCache broadcasterCache;

public BroadcasterConfig() {
public BroadcasterConfig(String[] list) {
configExecutors();
configureBroadcasterFilter(list);
configureBroadcasterCache();
}

private void configureBroadcasterCache() {
try {
if (AtmosphereServlet.broadcasterCacheClassName != null) {
setBroadcasterCache((BroadcasterCache)
Thread.currentThread().getContextClassLoader()
.loadClass(AtmosphereServlet.broadcasterCacheClassName).newInstance());
}
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}

}

public BroadcasterConfig(ExecutorService executorService, ScheduledExecutorService scheduler) {
Expand All @@ -81,13 +104,13 @@ public BroadcasterConfig(ExecutorService executorService, ScheduledExecutorServi
}

protected void configExecutors() {
executorService = Executors.newCachedThreadPool(new ThreadFactory(){
executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {

private AtomicInteger count = new AtomicInteger();

@Override
public Thread newThread(final Runnable runnable){
return new Thread(runnable,"Atmosphere-BroadcasterConfig-" + count.getAndIncrement());
public Thread newThread(final Runnable runnable) {
return new Thread(runnable, "Atmosphere-BroadcasterConfig-" + count.getAndIncrement());
}
});
defaultExecutorService = executorService;
Expand Down Expand Up @@ -136,7 +159,7 @@ public boolean addFilter(BroadcastFilter e) {
}

public void destroy() {
if (broadcasterCache != null){
if (broadcasterCache != null) {
broadcasterCache.stop();
}

Expand Down Expand Up @@ -177,23 +200,25 @@ public boolean removeFilter(BroadcastFilter e) {
* Remove all {@link BroadcastFilter}
*/
public void removeAllFilters() {
for (BroadcastFilter e: filters) {
for (BroadcastFilter e : filters) {
removeFilter(e);
}
}

/**
* Return true if this object contains {@link BroadcastFilter}
*
* @return true if this object contains {@link BroadcastFilter}
*/
public boolean hasFilters(){
public boolean hasFilters() {
return !filters.isEmpty();
}

/**
* Invoke {@link BroadcastFilter} in the other they were added.
*
* @param object the broadcasted object.
* @return BroadcastAction that tell Atmosphere to invoke the next filter or not.
* @return BroadcastAction that tell Atmosphere to invoke the next filter or not.
*/
protected BroadcastAction filter(Object object) {
BroadcastAction transformed = new BroadcastAction(object);
Expand Down Expand Up @@ -221,7 +246,7 @@ public ExecutorService getDefaultExecutorService() {
* is used if that method is not invoked.
*
* @param scheduler to be used when broadcasting.
* @return this.
* @return this.
*/
public BroadcasterConfig setScheduledExecutorService(ScheduledExecutorService scheduler) {
if (this.scheduler != null) {
Expand Down Expand Up @@ -249,6 +274,7 @@ public ScheduledExecutorService getScheduledExecutorService() {

/**
* Set a {@link BroadcasterCache}
*
* @param broadcasterCache a {@link BroadcasterCache}
* @return this
*/
Expand All @@ -259,6 +285,7 @@ public BroadcasterConfig setBroadcasterCache(BroadcasterCache broadcasterCache)

/**
* Get a {@link BroadcasterCache}
*
* @return this
*/
public BroadcasterCache getBroadcasterCache() {
Expand All @@ -284,4 +311,20 @@ public List<Object> retrieveFromCache(AtmosphereResource r) {
return list;
}
}

void configureBroadcasterFilter(String[] list) {
for (String broadcastFilter : list) {
try {
addFilter(BroadcastFilter.class.cast(
Thread.currentThread().getContextClassLoader().loadClass(broadcastFilter).newInstance()));
} catch (InstantiationException e) {
logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e);
} catch (IllegalAccessException e) {
logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e);
} catch (ClassNotFoundException e) {
logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e);
}
}
}

}
Expand Up @@ -132,7 +132,7 @@ public synchronized static BroadcasterFactory getDefault() {
} catch (ClassNotFoundException e) {
LoggerUtils.getLogger().log(Level.SEVERE,"",e);
}
factory = new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b , AtmosphereServlet.getBroadcasterConfig());
factory = new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b);

}
return factory;
Expand Down
Expand Up @@ -73,7 +73,7 @@ public class DefaultBroadcaster implements Broadcaster {

protected final ConcurrentLinkedQueue<AtmosphereResource<?, ?>> resources =
new ConcurrentLinkedQueue<AtmosphereResource<?, ?>>();
protected BroadcasterConfig bc = AtmosphereServlet.getBroadcasterConfig();
protected BroadcasterConfig bc;
protected final BlockingQueue<Entry> messages = new LinkedBlockingQueue<Entry>();
protected final AtomicBoolean started = new AtomicBoolean(false);
protected final AtomicBoolean destroyed = new AtomicBoolean(false);
Expand All @@ -98,6 +98,7 @@ public DefaultBroadcaster(String name) {
this.name = name;
setID(name);
broadcasterCache = new DefaultBroadcasterCache();
bc = new BroadcasterConfig(AtmosphereServlet.broadcasterFilters);
}

/**
Expand Down Expand Up @@ -257,12 +258,6 @@ public void run() {
protected void start() {
if (!started.getAndSet(true)) {

if (bc == null) {
LoggerUtils.getLogger().log(Level.WARNING, "BroadcasterConfig was null. It is recommended to use a BroadcasterFactory " +
"for creating Broadcaster instead of using new");
bc = new BroadcasterConfig();
}

broadcasterCache = bc.getBroadcasterCache();
broadcasterCache.start();

Expand Down

0 comments on commit eef217f

Please sign in to comment.