New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for long polling appears broken #78

Closed
SmithKevin opened this Issue Sep 20, 2016 · 1 comment

Comments

Projects
None yet
1 participant
@SmithKevin

SmithKevin commented Sep 20, 2016

Hello. My server is using a the SQS long polling mechanism by using the following

ReceiveMessageRequest rmr = new ReceiveMessageRequest()
                            .withMaxNumberOfMessages(10)
                            .withWaitTimeSeconds(20)
                            .withQueueUrl(scanQueueUrl);
                    ReceiveMessageResult rx = sqs.receiveMessage(rmr);

However when doing this elesticmq throws a timeout exception instead of just returning no messages. The error is as follows

java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
    at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
    at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
    at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
    at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:880)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:723)
    at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:475)
    at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:437)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:386)
    at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1749)
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1719)
    at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1369)

[elasticmq-akka.actor.default-dispatcher-14] ERROR org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1 - Exception when running routes
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://elasticmq/user/$a/$a#-794922759]] after [21000 ms]
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:476)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:282)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:280)
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:689)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:642)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:809)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:812)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:805)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:805)
    at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
    at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:805)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:639)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:639)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Notice that the time seconds in the exception is 20 + 1 extra second.
if i remove the .withWaitTimeSeconds(20) then everything works as expected. Is long polling not supported or is it a bug?
Thanks

@SmithKevin

This comment has been minimized.

Show comment
Hide comment
@SmithKevin

SmithKevin Sep 20, 2016

After looking into this further. This issue is 100% my fault. What was happening is that I was stopping elasticmq while a long polling request was in progress. So since the queue was stopped from underneath it, the polling request threw the exception.

I'm not sure what the proper behavior should be here. Possibly the server waiting for all requests to finish before shutting down. Either way it is easily remedied on my side so its not that important.

Sorry for the bug

SmithKevin commented Sep 20, 2016

After looking into this further. This issue is 100% my fault. What was happening is that I was stopping elasticmq while a long polling request was in progress. So since the queue was stopped from underneath it, the polling request threw the exception.

I'm not sure what the proper behavior should be here. Possibly the server waiting for all requests to finish before shutting down. Either way it is easily remedied on my side so its not that important.

Sorry for the bug

@SmithKevin SmithKevin closed this Sep 20, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment