Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Mar 23, 2020
1 parent 3cf0a1c commit 1d9adb8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
Expand Up @@ -21,7 +21,6 @@

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -71,16 +70,8 @@ protected void handleSubscribe(Subscriber<? super T> nextSubscriber) {

void channelRead(T data) {
assertInEventloop();

if (data instanceof ReferenceCounted) {
/*
* We do not expect ref-counted objects here as ST does not support them and do not take care to clean them
* in error conditions. Hence we fail-fast when we see such objects.
*/
ReferenceCountUtil.release(data);
exceptionCaught(new IllegalArgumentException("Reference counted leaked netty's pipeline. Object: " +
data.getClass().getSimpleName()));
channel.close();
channelReadReferenceCounted((ReferenceCounted) data);
return;
}

Expand All @@ -94,6 +85,20 @@ void channelRead(T data) {
}
}

private void channelReadReferenceCounted(ReferenceCounted data) {
try {
data.release();
} finally {
// We do not expect ref-counted objects here as ST does not support them and do not take care to clean them
// in error conditions. Hence we fail-fast when we see such objects.
pending = null;
fatalError = new IllegalArgumentException("Reference counted leaked netty's pipeline. Object: " +
data.getClass().getSimpleName());
exceptionCaught(fatalError);
channel.close();
}
}

void onReadComplete() {
assertInEventloop();
requested = false;
Expand All @@ -116,10 +121,9 @@ void exceptionCaught(Throwable throwable) {

void channelInboundClosed() {
assertInEventloop();
Throwable error = StacklessClosedChannelException.newInstance(
fatalError = StacklessClosedChannelException.newInstance(
NettyChannelPublisher.class, "channelInboundClosed");
fatalError = error;
exceptionCaught(error);
exceptionCaught(fatalError);
}

// All private methods MUST be invoked from the eventloop.
Expand Down
Expand Up @@ -32,7 +32,6 @@
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
Expand Down Expand Up @@ -74,7 +73,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class NettyChannelPublisherTest {
@Rule
// @Rule
public final Timeout timeout = new ServiceTalkTestTimeout();

private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
Expand Down Expand Up @@ -113,8 +112,18 @@ public void tearDown() throws Exception {
}
}

@Test
public void testCancelThenReadThenResubscribeDeliversErrorAndNotQueuedData() throws InterruptedException {
testCancelThenResubscribeDeliversErrorAndNotQueuedData(true);
}

@Test
public void testCancelThenResubscribeDeliversErrorAndNotQueuedData() throws InterruptedException {
testCancelThenResubscribeDeliversErrorAndNotQueuedData(false);
}

private void testCancelThenResubscribeDeliversErrorAndNotQueuedData(boolean doChannelRead)
throws InterruptedException {
TestCollectingPublisherSubscriber<Integer> subscriber1 = new TestCollectingPublisherSubscriber<>();
TestCollectingPublisherSubscriber<Integer> subscriber2 = new TestCollectingPublisherSubscriber<>();
toSource(publisher).subscribe(subscriber1);
Expand All @@ -128,6 +137,15 @@ public void testCancelThenResubscribeDeliversErrorAndNotQueuedData() throws Inte

subscription1.cancel(); // cancel of active subscription should clear the queue and fail future Subscribers.

if (doChannelRead) {
try {
assertFalse(channel.writeInbound(3));
} catch (Exception e) {
assertThat(e, instanceOf(ClosedChannelException.class));
return;
}
}

toSource(publisher).subscribe(subscriber2);
subscriber2.awaitSubscription().request(Long.MAX_VALUE);
assertThat(subscriber2.pollAllOnNext(), is(empty()));
Expand Down

0 comments on commit 1d9adb8

Please sign in to comment.