From eef217ffbefe7a10a8df1d37680348dabe69c5af Mon Sep 17 00:00:00 2001 From: jfarcand Date: Fri, 22 Oct 2010 17:57:23 -0400 Subject: [PATCH] Fix for https://atmosphere.dev.java.net/issues/show_bug.cgi?id=58 (" Shared BroadcasterConfig instance ") We are now back with 1-1 relation. --- .../cpr/AtmosphereResourceImpl.java | 7 +-- .../org/atmosphere/cpr/AtmosphereServlet.java | 51 +++++----------- .../org/atmosphere/cpr/BroadcasterConfig.java | 61 ++++++++++++++++--- .../atmosphere/cpr/BroadcasterFactory.java | 2 +- .../atmosphere/cpr/DefaultBroadcaster.java | 9 +-- .../cpr/DefaultBroadcasterFactory.java | 15 ++--- .../util/gae/GAEBroadcasterConfig.java | 4 +- .../util/gae/GAEDefaultBroadcaster.java | 3 + .../plugin/bayeux/AtmosphereBayeuxClient.java | 2 +- 9 files changed, 82 insertions(+), 72 deletions(-) diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java index 88e3c97ad1b..9d7658b5d86 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java @@ -448,18 +448,13 @@ void onBroadcast(AtmosphereResourceEvent) - Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName), broadcasterConfig); + Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName)); config.broadcasterFactory = broadcasterFactory; } @@ -573,19 +575,20 @@ protected void configureBroadcaster() throws ClassNotFoundException, Instantiati while (i.hasNext()) { e = i.next(); w = e.getValue(); + BroadcasterConfig broadcasterConfig = new BroadcasterConfig(broadcasterFilters); + if (w.broadcaster == null) { w.broadcaster = broadcasterFactory.get(); } else { w.broadcaster.setBroadcasterConfig(broadcasterConfig); + if (broadcasterCacheClassName != null) { + broadcasterConfig.setBroadcasterCache((BroadcasterCache) + Thread.currentThread().getContextClassLoader().loadClass(broadcasterCacheClassName).newInstance()); + } } w.broadcaster.setID(e.getKey()); } - if (broadcasterCacheClassName != null) { - broadcasterConfig.setBroadcasterCache((BroadcasterCache) - Thread.currentThread().getContextClassLoader().loadClass(broadcasterCacheClassName).newInstance()); - } - logger.info("Using " + broadcasterClassName); } @@ -651,7 +654,7 @@ protected void doInitParams(ServletConfig sc) { } s = sc.getInitParameter(BROADCAST_FILTER_CLASSES); if (s != null) { - configureBroadcasterFilter(s.split(",")); + broadcasterFilters = s.split(","); } } @@ -675,21 +678,6 @@ protected void loadConfiguration(ServletConfig sc) throws ServletException { } } - void configureBroadcasterFilter(String[] list){ - for (String broadcastFilter: list) { - try { - broadcasterConfig.addFilter(BroadcastFilter.class.cast( - Thread.currentThread().getContextClassLoader().loadClass(broadcastFilter).newInstance())); - } catch (InstantiationException e) { - logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e); - } catch (IllegalAccessException e) { - logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e); - } catch (ClassNotFoundException e) { - logger.log(Level.WARNING,String.format("Error trying to instanciate BroadcastFilter %s",broadcastFilter),e); - } - } - } - /** * Auto-detect Jersey when no atmosphere.xml file are specified. * @@ -763,7 +751,6 @@ boolean detectGoogleAppEngine(ServletConfig sc) { broadcasterClassName = GAE_BROADCASTER; isBroadcasterSpecified = true; cometSupport = new GoogleAppEngineCometSupport(config); - broadcasterConfig = new GAEBroadcasterConfig(); return true; } else { return false; @@ -802,7 +789,6 @@ public void destroy() { } BroadcasterFactory.factory = null; BroadcasterFactory.getDefault().destroy(); - broadcasterConfig = new BroadcasterConfig(); } /** @@ -879,7 +865,7 @@ protected void loadAtmosphereDotXml(InputStream stream, URLClassLoader c) } if (reader.getBroadcastFilterClasses() != null){ - configureBroadcasterFilter(reader.getBroadcastFilterClasses()); + broadcasterFilters = reader.getBroadcastFilterClasses(); } } catch (Throwable t) { @@ -1285,15 +1271,6 @@ public AtmosphereServlet setBroadcasterFactory(final BroadcasterFactory broadcas return this; } - /** - * Return the {@link BroadcasterConfig} used by this instance. - * - * @return {@link BroadcasterConfig} - */ - public static BroadcasterConfig getBroadcasterConfig() { - return broadcasterConfig; - } - /** * Return the {@link org.atmosphere.cpr.BroadcasterCache} class name. * diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java index 4f6deeffa83..51a7bb32afc 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java @@ -38,6 +38,7 @@ package org.atmosphere.cpr; import org.atmosphere.cpr.BroadcastFilter.BroadcastAction; +import org.atmosphere.util.LoggerUtils; import java.util.ArrayList; import java.util.List; @@ -47,6 +48,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Handle {@link Broadcaster} configuration like {@link ExecutorService} and @@ -55,6 +58,7 @@ * @author Jeanfrancois Arcand */ public class BroadcasterConfig { + public final static Logger logger = LoggerUtils.getLogger(); final static int numOfProcessor = Runtime.getRuntime().availableProcessors(); @@ -71,8 +75,27 @@ public class BroadcasterConfig { private BroadcasterCache broadcasterCache; - public BroadcasterConfig() { + public BroadcasterConfig(String[] list) { configExecutors(); + configureBroadcasterFilter(list); + configureBroadcasterCache(); + } + + private void configureBroadcasterCache() { + try { + if (AtmosphereServlet.broadcasterCacheClassName != null) { + setBroadcasterCache((BroadcasterCache) + Thread.currentThread().getContextClassLoader() + .loadClass(AtmosphereServlet.broadcasterCacheClassName).newInstance()); + } + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } public BroadcasterConfig(ExecutorService executorService, ScheduledExecutorService scheduler) { @@ -81,13 +104,13 @@ public BroadcasterConfig(ExecutorService executorService, ScheduledExecutorServi } protected void configExecutors() { - executorService = Executors.newCachedThreadPool(new ThreadFactory(){ + executorService = Executors.newSingleThreadExecutor(new ThreadFactory() { private AtomicInteger count = new AtomicInteger(); @Override - public Thread newThread(final Runnable runnable){ - return new Thread(runnable,"Atmosphere-BroadcasterConfig-" + count.getAndIncrement()); + public Thread newThread(final Runnable runnable) { + return new Thread(runnable, "Atmosphere-BroadcasterConfig-" + count.getAndIncrement()); } }); defaultExecutorService = executorService; @@ -136,7 +159,7 @@ public boolean addFilter(BroadcastFilter e) { } public void destroy() { - if (broadcasterCache != null){ + if (broadcasterCache != null) { broadcasterCache.stop(); } @@ -177,23 +200,25 @@ public boolean removeFilter(BroadcastFilter e) { * Remove all {@link BroadcastFilter} */ public void removeAllFilters() { - for (BroadcastFilter e: filters) { + for (BroadcastFilter e : filters) { removeFilter(e); } } /** * Return true if this object contains {@link BroadcastFilter} + * * @return true if this object contains {@link BroadcastFilter} */ - public boolean hasFilters(){ + public boolean hasFilters() { return !filters.isEmpty(); } /** * Invoke {@link BroadcastFilter} in the other they were added. + * * @param object the broadcasted object. - * @return BroadcastAction that tell Atmosphere to invoke the next filter or not. + * @return BroadcastAction that tell Atmosphere to invoke the next filter or not. */ protected BroadcastAction filter(Object object) { BroadcastAction transformed = new BroadcastAction(object); @@ -221,7 +246,7 @@ public ExecutorService getDefaultExecutorService() { * is used if that method is not invoked. * * @param scheduler to be used when broadcasting. - * @return this. + * @return this. */ public BroadcasterConfig setScheduledExecutorService(ScheduledExecutorService scheduler) { if (this.scheduler != null) { @@ -249,6 +274,7 @@ public ScheduledExecutorService getScheduledExecutorService() { /** * Set a {@link BroadcasterCache} + * * @param broadcasterCache a {@link BroadcasterCache} * @return this */ @@ -259,6 +285,7 @@ public BroadcasterConfig setBroadcasterCache(BroadcasterCache broadcasterCache) /** * Get a {@link BroadcasterCache} + * * @return this */ public BroadcasterCache getBroadcasterCache() { @@ -284,4 +311,20 @@ public List retrieveFromCache(AtmosphereResource r) { return list; } } + + void configureBroadcasterFilter(String[] list) { + for (String broadcastFilter : list) { + try { + addFilter(BroadcastFilter.class.cast( + Thread.currentThread().getContextClassLoader().loadClass(broadcastFilter).newInstance())); + } catch (InstantiationException e) { + logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e); + } catch (IllegalAccessException e) { + logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e); + } catch (ClassNotFoundException e) { + logger.log(Level.WARNING, String.format("Error trying to instanciate BroadcastFilter %s", broadcastFilter), e); + } + } + } + } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java index 0a02df13f9b..6f65d29fe69 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java @@ -132,7 +132,7 @@ public synchronized static BroadcasterFactory getDefault() { } catch (ClassNotFoundException e) { LoggerUtils.getLogger().log(Level.SEVERE,"",e); } - factory = new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b , AtmosphereServlet.getBroadcasterConfig()); + factory = new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b); } return factory; diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java index f0350908e66..b6d5808c0c3 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java @@ -73,7 +73,7 @@ public class DefaultBroadcaster implements Broadcaster { protected final ConcurrentLinkedQueue> resources = new ConcurrentLinkedQueue>(); - protected BroadcasterConfig bc = AtmosphereServlet.getBroadcasterConfig(); + protected BroadcasterConfig bc; protected final BlockingQueue messages = new LinkedBlockingQueue(); protected final AtomicBoolean started = new AtomicBoolean(false); protected final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -98,6 +98,7 @@ public DefaultBroadcaster(String name) { this.name = name; setID(name); broadcasterCache = new DefaultBroadcasterCache(); + bc = new BroadcasterConfig(AtmosphereServlet.broadcasterFilters); } /** @@ -257,12 +258,6 @@ public void run() { protected void start() { if (!started.getAndSet(true)) { - if (bc == null) { - LoggerUtils.getLogger().log(Level.WARNING, "BroadcasterConfig was null. It is recommended to use a BroadcasterFactory " + - "for creating Broadcaster instead of using new"); - bc = new BroadcasterConfig(); - } - broadcasterCache = bc.getBroadcasterCache(); broadcasterCache.start(); diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java index 382a5b0de5c..9a8bf2a7126 100755 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java @@ -65,12 +65,9 @@ public class DefaultBroadcasterFactory extends BroadcasterFactory { private final Class clazz; - private final BroadcasterConfig config; - - protected DefaultBroadcasterFactory(Class clazz, BroadcasterConfig config) { + protected DefaultBroadcasterFactory(Class clazz) { this.clazz = clazz; - this.config = config; - + if (factory == null) { this.factory = this; } @@ -81,7 +78,7 @@ protected DefaultBroadcasterFactory(Class clazz, Broadcas */ public Broadcaster get() throws IllegalAccessException, InstantiationException { Broadcaster b = clazz.newInstance(); - b.setBroadcasterConfig(config); + b.setBroadcasterConfig(new BroadcasterConfig(AtmosphereServlet.broadcasterFilters)); b.setID(clazz.getSimpleName() + "-" + new Random().nextInt()); store.put(b.getID(), b); return b; @@ -96,7 +93,7 @@ public final Broadcaster get(Class c, Object id) throws I if (c == null) throw new NullPointerException("Class is null"); Broadcaster b = c.newInstance(); - b.setBroadcasterConfig(config); + b.setBroadcasterConfig(new BroadcasterConfig(AtmosphereServlet.broadcasterFilters)); b.setID(id.toString()); store.put(id, b); @@ -192,7 +189,7 @@ public void destroy() { */ public static BroadcasterFactory build(Class clazz, BroadcasterConfig config) throws InstantiationException, IllegalAccessException { - return new DefaultBroadcasterFactory(clazz, config); + return new DefaultBroadcasterFactory(clazz); } /** @@ -207,7 +204,7 @@ public static BroadcasterFactory build(Class clazz, Broad public static BroadcasterFactory buildAndReplaceDefaultfactory(Class clazz, BroadcasterConfig config) throws InstantiationException, IllegalAccessException { - factory = new DefaultBroadcasterFactory(clazz, config); + factory = new DefaultBroadcasterFactory(clazz); return factory; } diff --git a/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEBroadcasterConfig.java b/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEBroadcasterConfig.java index 25cd63114a8..5c590d3e343 100755 --- a/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEBroadcasterConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEBroadcasterConfig.java @@ -37,7 +37,6 @@ package org.atmosphere.util.gae; - import org.atmosphere.cpr.BroadcastFilter; import org.atmosphere.cpr.BroadcastFilterLifecycle; import org.atmosphere.cpr.BroadcasterConfig; @@ -54,7 +53,8 @@ public class GAEBroadcasterConfig extends BroadcasterConfig { final static String NOT_SUPPORTED = "ExecutorService not supported with Google App Engine"; - public GAEBroadcasterConfig() { + public GAEBroadcasterConfig(String[] list) { + super(list); } @Override diff --git a/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEDefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEDefaultBroadcaster.java index e3b125769c1..6ac485cace8 100755 --- a/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEDefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/util/gae/GAEDefaultBroadcaster.java @@ -39,6 +39,7 @@ import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereServlet; import org.atmosphere.cpr.Broadcaster; import org.atmosphere.cpr.BroadcasterFuture; import org.atmosphere.cpr.DefaultBroadcaster; @@ -59,10 +60,12 @@ public class GAEDefaultBroadcaster extends DefaultBroadcaster { public GAEDefaultBroadcaster() { super(); + bc = new GAEBroadcasterConfig(AtmosphereServlet.broadcasterFilters); } public GAEDefaultBroadcaster(String name) { super(name); + bc = new GAEBroadcasterConfig(AtmosphereServlet.broadcasterFilters); } /** diff --git a/plugins/bayeux/src/main/java/org/atmosphere/plugin/bayeux/AtmosphereBayeuxClient.java b/plugins/bayeux/src/main/java/org/atmosphere/plugin/bayeux/AtmosphereBayeuxClient.java index 7f71ead22fe..e0ba832d59e 100644 --- a/plugins/bayeux/src/main/java/org/atmosphere/plugin/bayeux/AtmosphereBayeuxClient.java +++ b/plugins/bayeux/src/main/java/org/atmosphere/plugin/bayeux/AtmosphereBayeuxClient.java @@ -150,7 +150,7 @@ public void resume() { synchronized (this) { if (_continuation != null) { try { - AtmosphereServlet.getBroadcasterConfig().getExecutorService().submit(new Runnable() { + _continuation.getBroadcaster().getBroadcasterConfig().getExecutorService().submit(new Runnable() { @Override public void run() { _continuation.resume();