Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filter not called when message are cached and retrieved] Cast Exception when DefaultBroadcaster tries to send a cached message #986

Closed
antonio-marrero opened this issue Mar 27, 2013 · 17 comments
Labels

Comments

@antonio-marrero
Copy link

version:1.1.0.beta3
transport: long-polling
server: tomcat 6.0.36
browser: Firefox 19.0.2

  1. Enable EventCacheBroadcasterCache
  2. With javascript subscribe a socket to a broadcaster using long-polling
  3. Client 1: In the browser go to files and click on "work offline" (see log-extract-1 below)
  4. Server: broadcast a message for client 1
  5. Client1: In the browser go to files and toggle on "work offline" so the browser is reconnected
  6. Check the exception in the server log (see log-extract-2 below)

log-extract-1 (disconnect client with no error):

18:00:28,544 DEBUG AsynchronousProcessor:535 - Cancelling the connection for AtmosphereResource null
18:00:34,419 DEBUG EventCacheBroadcasterCache:296 - Retrieved for AtmosphereResource 8acd74f4-5a06-4dbe-9b94-db3c864ad255 cached messages []
18:00:34,422 DEBUG EventCacheBroadcasterCache:297 - Available cached message {=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@aee392, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@1e2d159, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@198339a}
18:00:40,069 DEBUG EventCacheBroadcasterCache:176 - Adding for AtmosphereResource null cached messages sport.model.Match@76feadfa
18:00:40,070 DEBUG EventCacheBroadcasterCache:177 - Active clients {=21145927224383, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=19955065291904, 8acd74f4-5a06-4dbe-9b94-db3c864ad255=21319577271917, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=19724112335592, dbfce16a-42c9-4567-8eb5-6051f9daba70=21309966680585}
18:00:40,072 DEBUG EventCacheBroadcasterCache:257 - Adding message  for client sport.model.Match@76feadfa
18:00:40,073 DEBUG EventCacheBroadcasterCache:257 - Adding message b5771e00-2f1a-4c68-98c8-64f6a57b0a38 for client sport.model.Match@76feadfa
18:00:40,074 DEBUG EventCacheBroadcasterCache:257 - Adding message 8acd74f4-5a06-4dbe-9b94-db3c864ad255 for client sport.model.Match@76feadfa
18:00:40,075 DEBUG EventCacheBroadcasterCache:257 - Adding message c1ca0c41-5b09-4287-aa4d-30cb120b8e1b for client sport.model.Match@76feadfa
18:00:40,076 DEBUG EventCacheBroadcasterCache:257 - Adding message dbfce16a-42c9-4567-8eb5-6051f9daba70 for client sport.model.Match@76feadfa
18:00:40,078 DEBUG EventCacheBroadcasterCache:309 - Removing for AtmosphereResource 8acd74f4-5a06-4dbe-9b94-db3c864ad255 cached message sport.model.Match@76feadfa
Hibernate: update iSportsDB.MATCH set gender_code=?, phase_code=?, group_id=?, status_code=?, winner_id=?, court_code=?, match_code=?, date_time=?, duration=?, spectators=?, timestamp=? where match_id=?
18:00:40,329 DEBUG EventCacheBroadcasterCache:296 - Retrieved for AtmosphereResource 8acd74f4-5a06-4dbe-9b94-db3c864ad255 cached messages []
18:00:40,339 DEBUG EventCacheBroadcasterCache:297 - Available cached message {=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@aee392, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@1e2d159, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@198339a, dbfce16a-42c9-4567-8eb5-6051f9daba70=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@b3826f}

log-extract-2 (reconnect client after a message has been broadcasted):

18:00:50,744 DEBUG AsynchronousProcessor:116 - Close detector disconnecting AtmosphereResourceImpl{
 uuid=8acd74f4-5a06-4dbe-9b94-db3c864ad255,
 transport=LONG_POLLING,
 action=Action{timeout=-1, type=SUSPEND},
 isResumed=false,
 isCancelled=false,
 isSuspended=true,
 broadcaster=2,
 isInScope=true,
 listeners=[org.atmosphere.interceptor.DefaultHeadersInterceptor$1@79f0ec]}. Current size 1
18:00:50,745 DEBUG AsynchronousProcessor:535 - Cancelling the connection for AtmosphereResource null
18:01:00,230 DEBUG EventCacheBroadcasterCache:296 - Retrieved for AtmosphereResource 8acd74f4-5a06-4dbe-9b94-db3c864ad255 cached messages []
18:01:00,231 DEBUG EventCacheBroadcasterCache:297 - Available cached message {=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@aee392, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@1e2d159, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@198339a, dbfce16a-42c9-4567-8eb5-6051f9daba70=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@b3826f}
18:01:04,823 DEBUG EventCacheBroadcasterCache:296 - Retrieved for AtmosphereResource dbfce16a-42c9-4567-8eb5-6051f9daba70 cached messages [sport.model.Match@76feadfa]
18:01:04,825 DEBUG EventCacheBroadcasterCache:297 - Available cached message {=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@aee392, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@1e2d159, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@198339a}
18:01:04,827 DEBUG DefaultBroadcaster:942 - Sending cached message [sport.model.Match@76feadfa] to dbfce16a-42c9-4567-8eb5-6051f9daba70
18:01:04,829 DEBUG DefaultBroadcaster:1018 - onException()
java.lang.ClassCastException: sport.model.Match cannot be cast to java.lang.String
    at org.atmosphere.handler.AbstractReflectorAtmosphereHandler.onStateChange(AbstractReflectorAtmosphereHandler.java:118)
    at org.atmosphere.cpr.DefaultBroadcaster.invokeOnStateChange(DefaultBroadcaster.java:1001)
    at org.atmosphere.cpr.DefaultBroadcaster.checkCachedAndPush(DefaultBroadcaster.java:954)
    at org.atmosphere.cpr.DefaultBroadcaster.addAtmosphereResource(DefaultBroadcaster.java:1275)
    at org.atmosphere.cpr.AtmosphereResourceImpl.suspend(AtmosphereResourceImpl.java:405)
    at org.atmosphere.cpr.AtmosphereResourceImpl.suspend(AtmosphereResourceImpl.java:321)
    at sport.push.config.AtmosphereUtils.suspend(AtmosphereUtils.java:25)
    at sport.push.LiveSubscribeCtrl.subscribe(LiveSubscribeCtrl.java:34)
    at sun.reflect.GeneratedMethodAccessor95.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.web.method.support.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:213)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:126)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:96)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:617)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:578)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:80)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:923)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:852)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:882)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:778)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
    at org.atmosphere.util.AtmosphereFilterChain.doFilter(AtmosphereFilterChain.java:171)
    at org.atmosphere.util.AtmosphereFilterChain.invokeFilterChain(AtmosphereFilterChain.java:132)
    at org.atmosphere.handler.ReflectorServletProcessor$FilterChainServletWrapper.service(ReflectorServletProcessor.java:337)
    at org.atmosphere.handler.ReflectorServletProcessor.onRequest(ReflectorServletProcessor.java:196)
    at org.atmosphere.cpr.AsynchronousProcessor.action(AsynchronousProcessor.java:266)
    at org.atmosphere.cpr.AsynchronousProcessor.suspended(AsynchronousProcessor.java:165)
    at org.atmosphere.container.TomcatCometSupport.service(TomcatCometSupport.java:112)
    at org.atmosphere.cpr.AtmosphereFramework.doCometSupport(AtmosphereFramework.java:1403)
    at org.atmosphere.cpr.AtmosphereServlet.event(AtmosphereServlet.java:327)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilterEvent(ApplicationFilterChain.java:470)
    at org.apache.catalina.core.ApplicationFilterChain.doFilterEvent(ApplicationFilterChain.java:363)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:231)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
    at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:879)
    at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:600)
    at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1703)
    at java.lang.Thread.run(Thread.java:662)
18:01:04,847 DEBUG DefaultBroadcaster:1280 - Unable to add AtmosphereResource dbfce16a-42c9-4567-8eb5-6051f9daba70
18:01:05,085 DEBUG EventCacheBroadcasterCache:296 - Retrieved for AtmosphereResource dbfce16a-42c9-4567-8eb5-6051f9daba70 cached messages []
18:01:05,087 DEBUG EventCacheBroadcasterCache:297 - Available cached message {=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@aee392, b5771e00-2f1a-4c68-98c8-64f6a57b0a38=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@1e2d159, c1ca0c41-5b09-4287-aa4d-30cb120b8e1b=org.atmosphere.cache.EventCacheBroadcasterCache$ClientQueue@198339a}
Javascript Configuration:
url: "/iSports/atmosphere/subscribe/",
    transport : "long-polling",
    enableXDR: true,
    reconnectInterval: -1,
    timeout: 10000,
    withCredentials: true,
    trackMessageLength: true,
    enableProtocol: true,
    messageDelimiter: "|"
Web.xml:
    <servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.MeteorServlet</servlet-class>

        <init-param>
            <param-name>org.atmosphere.servlet</param-name>
            <param-value>org.springframework.web.servlet.DispatcherServlet</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.recoverFromDestroyedBroadcaster</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.CometSupport.maxInactiveActivity</param-name>
            <param-value>10000</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.AtmosphereResource.uniqueUUID</param-name>
            <param-value>true</param-value>
        </init-param>

         <init-param>
            <param-name>org.atmosphere.cpr.AtmosphereInterceptor</param-name>
            <param-value>org.atmosphere.client.TrackMessageSizeInterceptor</param-value>
        </init-param>

        <init-param>
          <param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
          <param-value>org.atmosphere.cache.EventCacheBroadcasterCache</param-value>
        </init-param>

         <init-param>
          <param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
          <param-value>10</param-value>
        </init-param>
        <init-param>
          <param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
          <param-value>10</param-value>
        </init-param>

        <init-param>
           <param-name>org.atmosphere.cpr.BroadcasterCache.strategy</param-name>
           <param-value>beforeFilter</param-value>
        </init-param>

        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>/WEB-INF/config/atmosphere-servlet.xml</param-value>
        </init-param>

        <load-on-startup>0</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>
@jfarcand
Copy link
Member

Please use 1.0.0.RC1 with UUIDBroadcasterCache. This issue has been already fixed.

@antonio-marrero
Copy link
Author

Thanks for the reply.

I couldn't find 1.0.0.RC1 and just downloaded 1.1.0-SNAPSHOT from the repository but the exception is still happening.

@jfarcand
Copy link
Member

Do you have BroadcastFilter defined by any chance? If yes, make sure you set the set the BroadcasterCache stategy to beforeFilter, as described here

@antonio-marrero
Copy link
Author

I'm using the TrackMessageSizeInterceptor instead of the filter and the BroadcasterCache.strategy set to beforeFilter

@antonio-marrero
Copy link
Author

Sorry to bother again but in AbstractReflectorAtmosphereHandler.onStateChange there is a check in the type of the message but maybe the generic is missing (List <String>)

    if (message instanceof List) {
                for (String s : (List<String>) message) {
                    if (isUsingStream) {
                       r.getOutputStream().write(s.getBytes(r.getCharacterEncoding()));
                       r.getOutputStream().flush();
                    } else {
                       r.getWriter().write(s);
                       r.getWriter().flush();
                    }
                }
            } else {
                if (isUsingStream) {
                   r.getOutputStream().write(message.toString().getBytes(r.getCharacterEncoding()));
                   r.getOutputStream().flush();
                } else {
                   r.getWriter().write(message.toString());
                   r.getWriter().flush();
                }
            }
        }

@jfarcand
Copy link
Member

I don't think that's the issue. The issue is your object, sport.model.Match, et propagated inside Atmosphere instead of a String representation. Where the Match -> String conversion occurs without broadcasterCache?

@antonio-marrero
Copy link
Author

It makes sense what you say. I'm using a filter (jsonOutputFilter)

public void createChannel(String channelId) {

    if(broadcasterFactory==null) broadcasterFactory = BroadcasterFactory.getDefault(); 
    Broadcaster broadcaster = broadcasterFactory.lookup(channelId);
    if(broadcaster == null){
        broadcaster = broadcasterFactory.get(channelId);
        broadcaster.getBroadcasterConfig().addFilter(jsonOutputFilter);

        logger.info("Broadcaster created with id:"+channelId);
    }else{
        logger.info("Broadcaster with id:"+channelId + " already existed. ");
    }

}

@antonio-marrero
Copy link
Author

And if it helps , this is the filter:

import org.atmosphere.cpr.BroadcastFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import sport.model.Action;
import sport.web.json.CustomJsonConverter;
public class JSONOutputFilter implements BroadcastFilter{
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@Autowired
CustomJsonConverter jsonConverter;
@OverRide
public BroadcastAction filter(Object originalMessage, Object message) {
if (message instanceof String) {
return new BroadcastAction(message);
}
logger.info("Broadcasting message class: " + message.getClass().getSimpleName());
String jsonView = "broadcast"+ message.getClass().getSimpleName();
return new BroadcastAction(jsonConverter.writeString(message, jsonView));
}

@antonio-marrero
Copy link
Author

No way , I've changed the TrackMessageSizeInterceptor for the TrackMessageSizeFilter and called my JSONOutputFilter from the web.xml but the result is the same.

This is my new configuration:

<init-param>
<param-name>org.atmosphere.cpr.broadcastFilterClasses</param-name>
<param-value>sport.push.config.JSONOutputFilter,org.atmosphere.client.TrackMessageSizeFilter</param-value>
</init-param>

but unfortunately the result is the same

@jfarcand
Copy link
Member

Hard to see what wrong...for sure the wrond message is queue to be writen. Does it work without BroadcasterCache installed?

@antonio-marrero
Copy link
Author

yes, without the Cache I can broadcast messages and when I set the client offline I get a WARN (see below) but not a cast exception when sending a new message

Warning when disconnecting the browser:
AsynchronousProcessor:119 - closedDetector
java.lang.NullPointerException
at org.apache.catalina.connector.CometEventImpl.close(CometEventImpl.java:99)

@antonio-marrero
Copy link
Author

It's late night here in Europe, I have to stop. Tomorrow I'll add traces to the DefaultBroadcaster to see what's going on and let you know.
Merci pour l'aide.

@antonio-marrero
Copy link
Author

I've done one more test that might help. When I want to delete something from the customers' screens I'm broadcasting a String message that doesn't need to be converted to JSON. In this case, when I reconnect the client after being offline I got no cast exception.

I think this has nothing to do with the issue but it looks like the cached message is pushed twice and the second time is getting the result of the first as input. It maybe the expected behaviour as the cached message will also be cached when sent.

10:34:05,177 DEBUG DefaultBroadcaster:1012 - Sending cached message [{"Action":{"id":35 , "delete":true}}] to dd425be7-c442-4277-9854-ad487a0d4fa1
10:34:05,179 INFO AbstractReflectorAtmosphereHandler:67 - *************** AbstractReflector:[36|{"Action":{"id":35 , "delete":true}}]
10:34:05,184 DEBUG DefaultBroadcaster:1012 - Sending cached message [36|{"Action":{"id":35 , "delete":true}}] to dd425be7-c442-4277-9854-ad487a0d4fa1
10:34:05,186 INFO AbstractReflectorAtmosphereHandler:67 - *************** AbstractReflector:[39|36|{"Action":{"id":35 , "delete":true}}]

@antonio-marrero
Copy link
Author

The problem is that in DefaultBroadcaster.checkCachedAndPush() when the cache strategy is STRATEGY.BEFORE_FILTER only the PerRequestFilters are applied and my JSONOutputFilter was a standard filter. I don't know if this should be the proper behaviour or also the standard filters have to be applied.

After changing it to a PerRequestFilter the cached messages are pushed to the client without problems.

Still there is a remaining question. Cached Messages are broadcast twice and if you check the traces of my previous comment the 2nd time the message is sending the size twice. Is this the correct behaviour? Do I have something doggy in my configuration?

Thanks again

@jfarcand
Copy link
Member

Sound like a bug to me for the filter not being invoked. I will do some testing on my side...for the double message, I will need a test case. Thanks!!

@jfarcand jfarcand reopened this Mar 28, 2013
jfarcand added a commit that referenced this issue Mar 28, 2013
@jfarcand
Copy link
Member

OK fixed the first issue, e.g filters not being invoked. Can you file an issue for the second problem? This one is harder to reproduce. Thanks for your patience!

@antonio-marrero
Copy link
Author

No problem, I'll try to find a pattern

jfarcand added a commit that referenced this issue Apr 2, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants