diff --git a/rxnetty-examples/src/test/resources/log4j.properties b/rxnetty-examples/src/test/resources/log4j.properties index 70bc4bad..176c09e2 100644 --- a/rxnetty-examples/src/test/resources/log4j.properties +++ b/rxnetty-examples/src/test/resources/log4j.properties @@ -14,7 +14,7 @@ # limitations under the License. # # -log4j.rootLogger=DEBUG, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridge.java b/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridge.java index 69d783eb..cd1d345a 100644 --- a/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridge.java +++ b/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridge.java @@ -268,7 +268,6 @@ private void processNextItemInEventloop(Object nextItem, ConnectionInputSubscrib onContentReceived(); ByteBuf content = ((ByteBufHolder) nextItem).content(); if (nextItem instanceof LastHttpContent) { - state.contentComplete(); /* * Since, LastHttpContent is always received, even if the pipeline does not emit ByteBuf, if * ByteBuf with the LastHttpContent is empty, only trailing headers are emitted. Otherwise, @@ -280,7 +279,7 @@ private void processNextItemInEventloop(Object nextItem, ConnectionInputSubscrib /*Since, the content buffer, was not sent, release it*/ ReferenceCountUtil.release(content); } - + state.contentComplete(); connectionInputSubscriber.contentComplete(); onContentReceiveComplete(state.headerReceivedTimeNanos); } else { @@ -548,7 +547,7 @@ public void call() { if (channel.eventLoop().inEventLoop()) { run(); } else { - channel.eventLoop().execute(ConnectionInputSubscriber.this); + channel.eventLoop().execute(this); } } diff --git a/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridge.java b/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridge.java index 8603ca89..aaaa4ed8 100644 --- a/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridge.java +++ b/rxnetty-http/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridge.java @@ -30,6 +30,8 @@ import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge; import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent; import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.functions.Action0; import rx.subscriptions.Subscriptions; @@ -40,6 +42,8 @@ public class HttpServerToConnectionBridge extends AbstractHttpConnectionBridge { + private static final Logger logger = LoggerFactory.getLogger(HttpServerToConnectionBridge.class); + private volatile boolean activeContentSubscriberExists; private final Object contentSubGuard = new Object(); @@ -99,7 +103,7 @@ public void call() { activeContentSubscriberExists = null != nextSub; if (null != nextSub) { - ctx.fireUserEventTriggered(nextSub); + fireContentSubscriberEvent(ctx, nextSub); } } })); @@ -161,4 +165,16 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); /*This is a no-op if there is nothing to flush but supports HttpServerResponse.flushOnlyOnReadComplete()*/ } } + + private void fireContentSubscriberEvent(ChannelHandlerContext ctx, HttpContentSubscriberEvent event) { + try { + super.userEventTriggered(ctx, event); + } catch (Exception e) { + try { + exceptionCaught(ctx, e); + } catch (Exception e1) { + logger.error("Exception while handling error in handler.", e1); + } + } + } } diff --git a/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridgeTest.java b/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridgeTest.java index 4a73b8ab..68daef6d 100644 --- a/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridgeTest.java +++ b/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/internal/AbstractHttpConnectionBridgeTest.java @@ -254,7 +254,8 @@ public void testClose() throws Exception { assertThat("No error to header subscriber on close.", handlerRule.headerSub.getOnErrorEvents(), hasSize(1)); assertThat("No error to content subscriber on close.", contentSub.getOnErrorEvents(), hasSize(1)); - assertThat("Close before complete did not get invoked.", handlerRule.handler.closedBeforeReceive, is(true)); + assertThat("Close before complete did not get invoked.", + ((AbstractHttpConnectionBridgeMock)handlerRule.handler).closedBeforeReceive, is(true)); } @Test(timeout = 60000) @@ -348,7 +349,7 @@ public static class HandlerRule extends ExternalResource { private Connection connMock; private EmbeddedChannel channel; - private AbstractHttpConnectionBridgeMock handler; + private AbstractHttpConnectionBridge handler; private EventCatcher eventCatcher; private ConnectionInputSubscriber connInSub; private Producer connInputProducerMock; @@ -359,7 +360,7 @@ public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { - handler = new AbstractHttpConnectionBridgeMock(HttpRequest.class); + handler = newAbstractHttpConnectionBridgeMock(); eventCatcher = new EventCatcher(); channel = new EmbeddedChannel(handler, eventCatcher); @SuppressWarnings("unchecked") @@ -372,11 +373,19 @@ public void evaluate() throws Throwable { }; } + protected AbstractHttpConnectionBridge newAbstractHttpConnectionBridgeMock() { + return new AbstractHttpConnectionBridgeMock(HttpRequest.class); + } + + public EmbeddedChannel getChannel() { + return channel; + } + public void simulateHeaderReceive() { connInSub.getState().headerReceived(); } - protected void setupAndAssertConnectionInputSub() { + public void setupAndAssertConnectionInputSub() { headerSub = new ProducerAwareSubscriber<>(); @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridgeTest.java b/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridgeTest.java new file mode 100644 index 00000000..5e6dc001 --- /dev/null +++ b/rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerToConnectionBridgeTest.java @@ -0,0 +1,45 @@ +package io.reactivex.netty.protocol.http.server; + +import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge; +import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridgeTest.AbstractHttpConnectionBridgeMock; +import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridgeTest.HandlerRule; +import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent; +import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher; +import io.reactivex.netty.protocol.tcp.server.events.TcpServerEventPublisher; +import org.junit.Rule; +import org.junit.Test; +import rx.observers.TestSubscriber; + +import java.nio.channels.ClosedChannelException; + +public class HttpServerToConnectionBridgeTest { + + @Rule + public final HandlerRule handlerRule = new HandlerRule() { + @Override + protected AbstractHttpConnectionBridge newAbstractHttpConnectionBridgeMock() { + return new HttpServerToConnectionBridge<>(new HttpServerEventPublisher(new TcpServerEventPublisher())); + } + }; + + @Test(timeout = 60000) + public void testPendingContentSubscriber() throws Exception { + handlerRule.setupAndAssertConnectionInputSub(); + handlerRule.simulateHeaderReceive(); /*Simulate header receive, required for content sub.*/ + TestSubscriber subscriber = new TestSubscriber<>(); + handlerRule.getChannel().pipeline().fireUserEventTriggered(new HttpContentSubscriberEvent<>(subscriber)); + TestSubscriber subscriber1 = new TestSubscriber<>(); + handlerRule.getChannel().pipeline().fireUserEventTriggered(new HttpContentSubscriberEvent<>(subscriber1)); + + subscriber.assertNoErrors(); + subscriber1.assertNoErrors(); + subscriber.unsubscribe(); + + subscriber.assertUnsubscribed(); + + handlerRule.getChannel().close().await(); + + subscriber.assertNoErrors(); + subscriber1.assertError(ClosedChannelException.class); + } +} \ No newline at end of file