Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf #71

Closed
zcourts opened this issue Oct 9, 2014 · 3 comments

Comments

@zcourts
Copy link
Contributor

zcourts commented Oct 9, 2014

See #47 (comment)

The stack trace is: 
2014-10-01 18:00:52.158 [nioEventLoopGroup-2-2] ERROR c.s.e.datasift.stream.ErrorHandler -Error in Datasift stream\
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf\
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:193) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:657) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:715) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:650) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:657) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:715) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:705) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:740) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:241) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.higgs.ws.client.WebSocketStream.send(WebSocketStream.java:51) ~[ws-client-0.0.8-1.jar:na]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.Defau\
t io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise$LateListeners.run(DefaultPromise.java:847) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise$LateListenerNotifier.run(DefaultPromise.java:875) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_07]\
Caused by: java.lang.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf\
@nishant-kumar12
Copy link

I am having this for last 1 month. I keep on updating csdl and then do reconnect. I face this issue 1/5 times
My code looks like
try{
//trying to unsubscribe
datasiftClient.liveStream().unsubscribe(currentStream);
HttpRequestBuilder.restart();
//TODO need to check if we need to wait here till http client restarts.
}
catch(Exception e)
{
e.printStackTrace();
}
}

    datasiftClient.liveStream().subscribe(new StreamSubscription(stream) {
        public void onDataSiftLogMessage(DataSiftMessage di) {
            //di.isWarning() is also available
            System.out.println((di.isError() ? "Error" : di.isInfo() ? "Info" : "Warning") + ":\n" + di);
        }

        public void onMessage(Interaction i) {
            System.out.println("INTERACTION:\n" + i);

            //deserializing incoming data to DataSiftData
            Gson gson = new GsonBuilder().excludeFieldsWithModifiers(Modifier.TRANSIENT).create();
            DataSiftData data = gson.fromJson(i.toString(), DataSiftData.class);

            if(data.interaction.type.compareTo("facebook_page") == 0 && data.interaction.subtype.compareTo("page_like") == 0)
            {
                //ignoring the page like as we dont have any information of who liked the page.
                Log.debug("ignoring page like");
            }
            else{
                //updating interaction author.id with interaction.author.idstr
                data.interaction.author.idStr = String.valueOf(data.interaction.author.id); 
                PostData postWithAlertObj = new PostData();
                postWithAlertObj.setPostData(data);
                postWithAlertObj.setProfileID(data.interaction.author.idStr);
                postWithAlertObj.setInteractionID(data.interaction.id);

                queue.offer(postWithAlertObj);
            }
        }
    });

@zcourts zcourts reopened this Jan 22, 2015
@zcourts
Copy link
Contributor Author

zcourts commented Jan 22, 2015

Why?

HttpRequestBuilder.restart();

Just doing

datasiftClient.liveStream().unsubscribe(currentStream);

is enough to unsubscribe from a stream. If you still receive interactions for a stream after unsubscribing then this is probably a bug (bare in mind that when you unsubscribe there may have been interactions already matched and need to be delivered).

@zcourts
Copy link
Contributor Author

zcourts commented Jan 22, 2015

@nishant-kumar12 I'll try to reproduce this issue again. This time using HttpRequestBuilder.restart(); to see if that helps reproduce the exception.

@zcourts zcourts self-assigned this Jan 22, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants