Skip to content

Commit

Permalink
moving the onNext into the synchronized block to avoid onComplete to …
Browse files Browse the repository at this point in the history
…be called before onNext
  • Loading branch information
fnxrassmate committed Jan 14, 2019
1 parent cdf30e4 commit 5d37d86
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ public void onNext(PooledConnection<R, W> conn) {
onNextArrived = true;
_terminated = terminated;
_error = error;
delegate.onNext(conn);
}

delegate.onNext(conn);

if (_terminated) {
if (null != error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.client.pool.PooledConnection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static rx.Observable.fromCallable;
import static rx.Observable.just;

/**
* This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task
Expand All @@ -49,4 +61,77 @@ public void testReuse() throws Exception {

assertThat("Connection is not reused.", connection2, is(connection));
}

@Test
/**
*
* Load test to prove concurrency issues mainly seen on heavy load.
*
*/
public void testLoad() {
clientRule.startServer(1000);

MockTcpClientEventListener listener = new MockTcpClientEventListener();
clientRule.getClient().subscribe(listener);


int number_of_iterations = 300;
int numberOfRequests = 10;

for(int j = 0; j < number_of_iterations; j++) {

List<Observable<String>> results = new ArrayList<>();

//Just giving the client some time to recover
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

for (int i = 0; i < numberOfRequests; i++) {
results.add(
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() {
@Override
public PooledConnection<ByteBuf, ByteBuf> call() {
return clientRule.connectWithCheck();
}
})
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) {
return connection.writeStringAndFlushOnEach(just("Hello"))
.toCompletable()
.<ByteBuf>toObservable()
.concatWith(connection.getInput())
.take(1)
.single()
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
try {

byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String result = new String(bytes);
return result;
} finally {
byteBuf.release();
}
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Assert.fail("Did not expect exception: " + throwable.getMessage());
throwable.printStackTrace();
}
});
}
}));
}
AssertableSubscriber<String> test = Observable.merge(results).test();
test.awaitTerminalEvent();
test.assertNoErrors();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import rx.Observable;
import rx.Observer;
import rx.functions.Func0;
import rx.observers.TestSubscriber;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
Expand Down Expand Up @@ -83,6 +87,48 @@ public PooledConnection<ByteBuf, ByteBuf> connect() {
return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
}

public PooledConnection<ByteBuf, ByteBuf> connectWithCheck() {

final AtomicBoolean gotOnNext = new AtomicBoolean(false);

Observable<Connection<ByteBuf, ByteBuf>> got_no_connection = client.createConnectionRequest()
.doOnEach(new Observer<Connection<ByteBuf, ByteBuf>>() {
@Override
public void onCompleted() {
if(!gotOnNext.get()) {
//A PooledConnection could sometimes send onCompleted before the onNext event occurred.
Assert.fail("Should not get onCompletedBefore onNext");
}
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Connection<ByteBuf, ByteBuf> byteBufByteBufConnection) {
gotOnNext.set(true);
}
})
.switchIfEmpty(Observable.defer(new Func0<Observable<PooledConnection<ByteBuf, ByteBuf>>>() {
@Override
public Observable<PooledConnection<ByteBuf, ByteBuf>> call() {
return Observable.empty();
}
}));

TestSubscriber<Connection<ByteBuf, ByteBuf>> cSub = new TestSubscriber<>();
got_no_connection.subscribe(cSub);

cSub.awaitTerminalEvent();

cSub.assertNoErrors();

assertThat("No connection received.", cSub.getOnNextEvents(), hasSize(1));

return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
}

private void createClient(final int maxConnections) {
InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort());
client = TcpClient.newClient(SingleHostPoolingProviderFactory.<ByteBuf, ByteBuf>createBounded(maxConnections),
Expand Down

0 comments on commit 5d37d86

Please sign in to comment.