Skip to content

Commit

Permalink
Merge pull request #381 from Sanne/MessageDispatcherTest
Browse files Browse the repository at this point in the history
Test for MessageDispatcher's synchronous cast to actually block for r…
  • Loading branch information
belaban committed Mar 16, 2018
2 parents e62cbbe + 1e668d4 commit 839f640
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions tests/junit/org/jgroups/tests/MessageDispatcherUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Tests return values from MessageDispatcher.castMessage()
*
Expand Down Expand Up @@ -127,6 +132,41 @@ public void test20000ByteMessageToAll() throws Exception {
sendMessageToBothChannels(20000);
}

/**
* In this scenario we block the second member so to verify
* the sender actually waits: we want to see a timeout
* being triggered.
* It's hard to otherwise make sure casting isn't being
* done asynchronously.
*/
public void testBlockingSecondMember() throws Exception {
RequestOptions requestOptions = RequestOptions.SYNC()
.exclusionList(a.getAddress())//redundant - simplifies debugging
.setMode(ResponseMode.GET_ALL)//redundant - implied by SYNC()
.setTransientFlags(Message.TransientFlag.DONT_LOOPBACK)//redundant - self is excluded
.setTimeout(1000)//Speed up the test execution
;
b = createChannel(a, "B");
BlockableRequestHandler blockableHandler = new BlockableRequestHandler();
d2 = new MessageDispatcher(b);
d2.setRequestHandler(blockableHandler);
boolean exception = false;
b.connect("MessageDispatcherUnitTest");
Assert.assertEquals(2,b.getView().size());
System.out.println("view: " + b.getView());
blockableHandler.installThreadTrap();
try {
RspList<Object> rsps = d1.castMessage(null, buf, requestOptions);
Assert.fail("This is a synchronous message for which no reply was delivered yet: should have timed out!");
} catch (Exception e) {
//expected
exception = true;
blockableHandler.releaseBlockedThreads();
}
Assert.assertTrue(exception);
Assert.assertTrue(blockableHandler.receivedAnything());
}

private void sendMessage(int size) throws Exception {
long start, stop;
MyHandler handler=new MyHandler(new byte[size]);
Expand Down Expand Up @@ -186,4 +226,57 @@ public Object handle(Message msg) throws Exception {
return retval;
}
}

private static final class BlockableRequestHandler implements RequestHandler {

private final AtomicReference<CountDownLatch> threadTrap = new AtomicReference<>();
private final AtomicBoolean receivedAnything = new AtomicBoolean(false);

@Override
public Object handle(Message msg) throws Exception {
receivedAnything.set(true);
countDownAndJoin();
return "ok";
}

public boolean receivedAnything() {
return receivedAnything.get();
}

public void installThreadTrap() {
boolean ok = threadTrap.compareAndSet(null, new CountDownLatch(2));
if (!ok) {
throw new IllegalStateException("Resetting a latch without having released the previous one! Illegal as some threads might be stuck.");
}
}

public void releaseBlockedThreads() {
final CountDownLatch latch = threadTrap.getAndSet(null);
if (latch!=null) {
while (latch.getCount() > 0) {
latch.countDown();
}
}
}

public void countDownAndJoin() {
final CountDownLatch latch = threadTrap.get();
if (latch==null) return;
System.out.println( "Blocking on incoming message [PREJOIN] Timestamp: " + System.nanoTime() );
try {
latch.countDown();
//Wait "forever" until we are awoken;
//cap the definition of "forever" to 2 minutes to abort the test if something goes wrong
//but this should not be necessary if the test is written correctly:
//the main test thread will release the latch sooner so a large timeout should not
//have a negative impact on the testsuite duration.
latch.await( 2, TimeUnit.MINUTES );
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Early termination. Test killed?");
}
System.out.println( "Blocking on incoming message [POSTJOIN] Timestamp: " + System.nanoTime() );
}
}
}

0 comments on commit 839f640

Please sign in to comment.