Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking IO in content.writeString() [0.5.2-SNAPSHOT] #490

Closed
jamesgorman2 opened this issue Apr 7, 2016 · 5 comments
Closed

Blocking IO in content.writeString() [0.5.2-SNAPSHOT] #490

jamesgorman2 opened this issue Apr 7, 2016 · 5 comments

Comments

@jamesgorman2
Copy link
Collaborator

Hi there,

we have code returning the results of a JDBC call (blocking IO) in a reponse.writeString(). Trying to offload this to Schedulers.io() results in an intermittent failure:

2016-04-07T16:44:21.583 [RxCachedThreadScheduler-1] DEBUG trunk.rx.StatementExecutor - Executing NamedParameterPreparedStatement{_SQL here_}
2016-04-07T16:44:22.581 [rxnetty-nio-eventloop-1-2] WARN  trunk.auth.OAuth2Resource - Unexpected error
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: PooledUnsafeDirectByteBuf
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:157)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeWrite(DefaultChannelHandlerInvoker.java:337)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:265)
    at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:157)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeWrite(DefaultChannelHandlerInvoker.java:337)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:265)
    at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.write(AbstractHttpConnectionBridge.java:119)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:157)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeWrite(DefaultChannelHandlerInvoker.java:337)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:265)
    at io.reactivex.netty.channel.BackpressureManagingHandler$WriteInspector.write(BackpressureManagingHandler.java:314)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:157)
    at io.netty.channel.DefaultChannelHandlerInvoker$WriteTask.run(DefaultChannelHandlerInvoker.java:440)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:356)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unexpected message type: PooledUnsafeDirectByteBuf
    at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:97)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
    ... 19 common frames omitted

code (when unwound):

public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
  return response.writeString(
    observableOfJdbcResultsAsAString
      .subscribeOn(Schedulers.io()) // shift blocking IO off nio-eventloop
      .onErrorResumeNext(e -> Observable.empty()) // since we've sent a header, just truncate the response
  );
}

NB - I've include the onErrorResumeNext in this order since it's part of how our composition goes together

When we hit the service in AWS with ~7 concurrent requests we'll get 1-2 failures with the response truncated part way through the result sets (ie it is sending some data that has been returned from the database).
If will repro more intermittently from curl on my dev host.

observableOfJdbcResultsAsAString is a composite Observable, and I can produce the same error consistently by adding .subscribeOn(Schedulers.io()) to an inner Observable.

A couple of questions:

  • am I right in assuming I risk blocking nio-eventloop if I have too many long running queries?
  • am I using subscribeOn correctly here? any idea on where to look to understand the intermittent failure
    ** I am currently assuming (Rx)Netty doesn't like me thread switching during the writeString but I don't think I'm doing it here
  • is there anything I can do to help diagnose this problem? (Unfortunately the data is proprietary and we don't have a data set we can make public yet)

cheers,
James

@NiteshKant
Copy link
Member

@jamesgorman2 apologies for the delayed response.
This is a bug, I can repro it locally. It looks like the pipeline isn't set correctly to transform String to HttpContent in this case, before it reaches the Http encoder.

am I right in assuming I risk blocking nio-eventloop if I have too many long running queries?

Yes in this case, RxNetty subscribes to the passed Observable to write. If the passed Observable blocks inside onSubscribe then yes it will block the eventloop. You are correct in moving the subscribe off to the IO scheduler.

@jamesgorman2
Copy link
Collaborator Author

No worries @NiteshKant, saw you were on holiday recently and expected a bit of a delay. Good to hear you have a repro for this. (and thanks for all the work on RxNetty)

@NiteshKant
Copy link
Member

Ok so an update to the issue. The problem here is of ordering of the items written. For HTTP, RxNetty adds a LastHttpContent at the end of the content stream to mark the end of chunked content. In this particular case, the LastHttpContent gets written before the content stream completes which causes the illegal state in the HTTP encoder.

The reason for the wrong ordering of writes is as follows:

  • Netty enqueues any writes that happen from outside the eventloop on to the eventloop task queue.
  • It writes any write that happens from the eventloop, directly without enqueuing it to the task queue.

In this case, request(n) calls to the content Observable happen on the eventloop, so if the producer(source Observable) is faster than the consumer (writes on channel) then request(n) may immediately produce an item which gets written on the channel as opposed to an item produced on the subscribeOn scheduler that gets enqueued to the task queue. This race results in messing up the order of the writes. In the worst case, if the order happens to be such that the LastHttpContent is written before the content stream completion, the HTTP response is malformed.

I have the fix for the issue but I am struggling to write a unit test that can deterministically reproduce this situation.

@jamesgorman2
Copy link
Collaborator Author

Good work on spotting that. Sounds like your in maths-land with respect to proving correctness over non-deterministic parallel systems. Is the problem setting up a harness in the Netty layer rather than testing that the RxNetty layer is emitting in the correct sequence?

@NiteshKant
Copy link
Member

The problem is to deterministically reproduce that race-condition which I describe 😄

NiteshKant added a commit to NiteshKant/RxNetty that referenced this issue May 16, 2016
Ordering issue happens when a write happens from a thread other than the eventloop.

Reported issue was for HTTP, RxNetty adds a LastHttpContent at the end of the content stream to mark the end of chunked content. In this particular case, the LastHttpContent gets written before the content stream completes which causes the illegal state in the HTTP encoder.

The reason for the wrong ordering of writes is as follows:

Netty enqueues any writes that happen from outside the eventloop on to the eventloop task queue.
It writes any write that happens from the eventloop, directly without enqueuing it to the task queue.
In this case, request(n) calls to the content Observable happen on the eventloop, so if the producer(source Observable) is faster than the consumer (writes on channel) then request(n) may immediately produce an item which gets written on the channel as opposed to an item produced on the subscribeOn scheduler that gets enqueued to the task queue. This race results in messing up the order of the writes. In the worst case, if the order happens to be such that the LastHttpContent is written before the content stream completion, the HTTP response is malformed.

The fix is to not have write happening on the eventloop, happen before the writes from outside the eventloop. Thus, if there is any write happening from an `Observable` outside the eventloop. Writes on the eventloop also will be enqueued in the queue. This preserves the ordering.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants