Skip to content

Commit

Permalink
Bug fixes
Browse files Browse the repository at this point in the history
- Issue 509: `HttpServerToConnectionBridge` was calling `ctx.fireUserEvent..` method instead of `super.userEv..` which will make the second subscriber unregistered. Now calling `super.userEven..`. Also added a test for this.

- `AbstractHttpConnectionBridge` was setting the status as complete before invoking the subscribers. This will cause issues for concatenated subscribers as the second subscriber to the content will see that the content is already completed.
  • Loading branch information
NiteshKant committed May 31, 2016
1 parent 103b21f commit 8c70db9
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 9 deletions.
2 changes: 1 addition & 1 deletion rxnetty-examples/src/test/resources/log4j.properties
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
#
log4j.rootLogger=DEBUG, stdout
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
Expand Down
Expand Up @@ -268,7 +268,6 @@ private void processNextItemInEventloop(Object nextItem, ConnectionInputSubscrib
onContentReceived();
ByteBuf content = ((ByteBufHolder) nextItem).content();
if (nextItem instanceof LastHttpContent) {
state.contentComplete();
/*
* Since, LastHttpContent is always received, even if the pipeline does not emit ByteBuf, if
* ByteBuf with the LastHttpContent is empty, only trailing headers are emitted. Otherwise,
Expand All @@ -280,7 +279,7 @@ private void processNextItemInEventloop(Object nextItem, ConnectionInputSubscrib
/*Since, the content buffer, was not sent, release it*/
ReferenceCountUtil.release(content);
}

state.contentComplete();
connectionInputSubscriber.contentComplete();
onContentReceiveComplete(state.headerReceivedTimeNanos);
} else {
Expand Down Expand Up @@ -548,7 +547,7 @@ public void call() {
if (channel.eventLoop().inEventLoop()) {
run();
} else {
channel.eventLoop().execute(ConnectionInputSubscriber.this);
channel.eventLoop().execute(this);
}
}

Expand Down
Expand Up @@ -30,6 +30,8 @@
import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge;
import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent;
import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

Expand All @@ -40,6 +42,8 @@

public class HttpServerToConnectionBridge<C> extends AbstractHttpConnectionBridge<C> {

private static final Logger logger = LoggerFactory.getLogger(HttpServerToConnectionBridge.class);

private volatile boolean activeContentSubscriberExists;

private final Object contentSubGuard = new Object();
Expand Down Expand Up @@ -99,7 +103,7 @@ public void call() {

activeContentSubscriberExists = null != nextSub;
if (null != nextSub) {
ctx.fireUserEventTriggered(nextSub);
fireContentSubscriberEvent(ctx, nextSub);
}
}
}));
Expand Down Expand Up @@ -161,4 +165,16 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); /*This is a no-op if there is nothing to flush but supports HttpServerResponse.flushOnlyOnReadComplete()*/
}
}

private void fireContentSubscriberEvent(ChannelHandlerContext ctx, HttpContentSubscriberEvent<?> event) {
try {
super.userEventTriggered(ctx, event);
} catch (Exception e) {
try {
exceptionCaught(ctx, e);
} catch (Exception e1) {
logger.error("Exception while handling error in handler.", e1);
}
}
}
}
Expand Up @@ -254,7 +254,8 @@ public void testClose() throws Exception {
assertThat("No error to header subscriber on close.", handlerRule.headerSub.getOnErrorEvents(), hasSize(1));
assertThat("No error to content subscriber on close.", contentSub.getOnErrorEvents(), hasSize(1));

assertThat("Close before complete did not get invoked.", handlerRule.handler.closedBeforeReceive, is(true));
assertThat("Close before complete did not get invoked.",
((AbstractHttpConnectionBridgeMock)handlerRule.handler).closedBeforeReceive, is(true));
}

@Test(timeout = 60000)
Expand Down Expand Up @@ -348,7 +349,7 @@ public static class HandlerRule extends ExternalResource {

private Connection<String, String> connMock;
private EmbeddedChannel channel;
private AbstractHttpConnectionBridgeMock handler;
private AbstractHttpConnectionBridge<String> handler;
private EventCatcher eventCatcher;
private ConnectionInputSubscriber connInSub;
private Producer connInputProducerMock;
Expand All @@ -359,7 +360,7 @@ public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
handler = new AbstractHttpConnectionBridgeMock(HttpRequest.class);
handler = newAbstractHttpConnectionBridgeMock();
eventCatcher = new EventCatcher();
channel = new EmbeddedChannel(handler, eventCatcher);
@SuppressWarnings("unchecked")
Expand All @@ -372,11 +373,19 @@ public void evaluate() throws Throwable {
};
}

protected AbstractHttpConnectionBridge<String> newAbstractHttpConnectionBridgeMock() {
return new AbstractHttpConnectionBridgeMock(HttpRequest.class);
}

public EmbeddedChannel getChannel() {
return channel;
}

public void simulateHeaderReceive() {
connInSub.getState().headerReceived();
}

protected void setupAndAssertConnectionInputSub() {
public void setupAndAssertConnectionInputSub() {
headerSub = new ProducerAwareSubscriber<>();

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down
@@ -0,0 +1,45 @@
package io.reactivex.netty.protocol.http.server;

import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge;
import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridgeTest.AbstractHttpConnectionBridgeMock;
import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridgeTest.HandlerRule;
import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent;
import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher;
import io.reactivex.netty.protocol.tcp.server.events.TcpServerEventPublisher;
import org.junit.Rule;
import org.junit.Test;
import rx.observers.TestSubscriber;

import java.nio.channels.ClosedChannelException;

public class HttpServerToConnectionBridgeTest {

@Rule
public final HandlerRule handlerRule = new HandlerRule() {
@Override
protected AbstractHttpConnectionBridge<String> newAbstractHttpConnectionBridgeMock() {
return new HttpServerToConnectionBridge<>(new HttpServerEventPublisher(new TcpServerEventPublisher()));
}
};

@Test(timeout = 60000)
public void testPendingContentSubscriber() throws Exception {
handlerRule.setupAndAssertConnectionInputSub();
handlerRule.simulateHeaderReceive(); /*Simulate header receive, required for content sub.*/
TestSubscriber<String> subscriber = new TestSubscriber<>();
handlerRule.getChannel().pipeline().fireUserEventTriggered(new HttpContentSubscriberEvent<>(subscriber));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
handlerRule.getChannel().pipeline().fireUserEventTriggered(new HttpContentSubscriberEvent<>(subscriber1));

subscriber.assertNoErrors();
subscriber1.assertNoErrors();
subscriber.unsubscribe();

subscriber.assertUnsubscribed();

handlerRule.getChannel().close().await();

subscriber.assertNoErrors();
subscriber1.assertError(ClosedChannelException.class);
}
}

0 comments on commit 8c70db9

Please sign in to comment.