Skip to content

Commit

Permalink
- Fixed counting both received *and* suspected flags (https://issues.…
Browse files Browse the repository at this point in the history
…jboss.org/browse/JGRP-1505)

- Added test case GroupRequest.testResponsesComplete()
  • Loading branch information
belaban committed Aug 28, 2012
1 parent 9b0d0f8 commit 316175e
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 30 deletions.
20 changes: 13 additions & 7 deletions src/org/jgroups/blocks/GroupRequest.java
Expand Up @@ -57,7 +57,10 @@ public class GroupRequest<T> extends Request {
private final Map<Address,Rsp<T>> requests;

@GuardedBy("lock")
int num_valid, num_received, num_suspected;
int num_valid; // the number of valid responses (values or exceptions that passed the response filter)

@GuardedBy("lock")
int num_received; // number of responses (values, exceptions or suspicions)



Expand Down Expand Up @@ -119,7 +122,8 @@ public void receiveResponse(Object response_value, Address sender, boolean is_ex
lock.lock();
try {
if(!rsp.wasReceived()) {
num_received++;
if(!rsp.wasSuspected())
num_received++;
if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
if(is_exception && response_value instanceof Throwable)
rsp.setException((Throwable)response_value);
Expand Down Expand Up @@ -160,7 +164,8 @@ public void suspect(Address suspected_member) {
changed=true;
lock.lock();
try {
num_suspected++;
if(!rsp.wasReceived())
num_received++;
completed.signalAll();
}
finally {
Expand Down Expand Up @@ -208,7 +213,8 @@ public void viewChange(View new_view) {
if(!mbrs.contains(mbr)) {
Rsp<T> rsp=entry.getValue();
if(rsp.setSuspected()) {
num_suspected++;
if(!rsp.wasReceived())
num_received++;
changed=true;
}
}
Expand Down Expand Up @@ -314,12 +320,12 @@ protected boolean responsesComplete() {

switch(options.getMode()) {
case GET_FIRST:
return num_valid >= 1 || num_suspected >= num_total || num_received >= num_total;
return num_valid >= 1 || num_received >= num_total;
case GET_ALL:
return num_valid + num_suspected >= num_total || num_received >= num_total;
return num_valid >= num_total || num_received >= num_total;
case GET_MAJORITY:
int majority=determineMajority(num_total);
return num_valid + num_suspected >= majority || num_received >= num_total;
return num_valid >= majority || num_received >= num_total;
case GET_NONE:
return true;
default:
Expand Down
107 changes: 84 additions & 23 deletions tests/junit-functional/org/jgroups/blocks/GroupRequestTest.java
Expand Up @@ -24,19 +24,19 @@
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
public class GroupRequestTest {
Address a1, a2, a3;
Address a, b, c;
List<Address> dests=null;

@BeforeClass
void init() throws UnknownHostException {
a1=Util.createRandomAddress("A1");
a2=Util.createRandomAddress("A2");
a3=Util.createRandomAddress("A3");
a=Util.createRandomAddress("A");
b=Util.createRandomAddress("B");
c=Util.createRandomAddress("C");
}

@BeforeMethod
protected void setUp() throws Exception {
dests=new ArrayList<Address>(Arrays.asList(a1, a2));
dests=new ArrayList<Address>(Arrays.asList(a,b));
}

@AfterMethod
Expand Down Expand Up @@ -69,11 +69,11 @@ public void testMessageReceptionWithViewChangeMemberLeft() throws Exception {


public void testGetFirstWithResponseFilter() throws Exception {
Object[] responses={new Message(null, a1, new Long(1)),
new Message(null, a2, new Long(2)),
new Message(null, a3, new Long(3))};
Object[] responses={new Message(null,a, new Long(1)),
new Message(null,b, new Long(2)),
new Message(null,c, new Long(3))};
MyCorrelator corr=new MyCorrelator(true, responses, 500);
dests.add(a3);
dests.add(c);
GroupRequest<Long> req=new GroupRequest<Long>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_FIRST, 0));
req.setResponseFilter(new RspFilter() {
int num_rsps=0;
Expand Down Expand Up @@ -102,11 +102,11 @@ public boolean needMoreResponses() {


public void testGetAllWithResponseFilter() throws Exception {
Object[] responses={new Message(null, a1, new Long(1)),
new Message(null, a2, new Long(2)),
new Message(null, a3, new Long(3))};
Object[] responses={new Message(null,a, new Long(1)),
new Message(null,b, new Long(2)),
new Message(null,c, new Long(3))};
MyCorrelator corr=new MyCorrelator(true, responses, 500);
dests.add(a3);
dests.add(c);
GroupRequest<Long> req=new GroupRequest<Long>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
req.setResponseFilter(new RspFilter() {
int num_rsps=0;
Expand Down Expand Up @@ -140,7 +140,7 @@ public boolean needMoreResponses() {
* we'd block until we run into the timeout. See https://issues.jboss.org/browse/JGRP-1330 for details.
*/
public void testAllNullResponsesWithFilter() {
dests.add(a3);
dests.add(c);
GroupRequest<Boolean> req=new GroupRequest<Boolean>(new Message(), null, dests,
new RequestOptions(ResponseMode.GET_ALL, 10000));
assert !req.isDone();
Expand All @@ -155,7 +155,7 @@ public void testAllNullResponsesWithFilter() {


public void testAllNullResponsesWithFilterGetFirst() {
dests.add(a3);
dests.add(c);
GroupRequest<Boolean> req=new GroupRequest<Boolean>(new Message(), null, dests,
new RequestOptions(ResponseMode.GET_FIRST, 10000));
assert !req.isDone();
Expand All @@ -169,6 +169,67 @@ public void testAllNullResponsesWithFilterGetFirst() {
assert req.isDone();
}

/**
* Verifies that a received *and* suspected flag on a Rsp counts only once, to prevent premature termination of
* a blocking RPC. https://issues.jboss.org/browse/JGRP-1505
*/
public void testResponsesComplete() {
GroupRequest<Integer> req=new GroupRequest<Integer>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC());
checkComplete(req, false);

req.receiveResponse(1, a, false);
req.receiveResponse(2, b, true);
checkComplete(req, false);

req.receiveResponse(3, b, false);
checkComplete(req, false);

req.receiveResponse(4, c, false);
checkComplete(req, true);


req=new GroupRequest<Integer>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC());
req.receiveResponse(1, a, false);
checkComplete(req, false);

req.receiveResponse(2, b, false);
checkComplete(req, false);

req.suspect(b);
checkComplete(req, false);

req.receiveResponse(3, c, false);
checkComplete(req, true);
}

/**
* Verifies that a received *and* suspected flag on a Rsp counts only once, to prevent premature termination of
* a blocking RPC. https://issues.jboss.org/browse/JGRP-1505
*/
public void testResponsesComplete2() {
GroupRequest<Integer> req=new GroupRequest<Integer>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC());
req.suspect(a);
checkComplete(req, false);

req.receiveResponse(1, a, false);
checkComplete(req, false);

req.receiveResponse(2, b, false);
checkComplete(req, false);

req.suspect(b);
checkComplete(req, false);

req.receiveResponse(3, c, false);
req.suspect(c);
checkComplete(req, true);
}

protected static void checkComplete(Request req, boolean expect) {
System.out.println("req = " + req);
assert req.getResponsesComplete() == expect;
}


protected static class NonNullFilter implements RspFilter {
private volatile boolean validResponse;
Expand Down Expand Up @@ -230,7 +291,7 @@ private void _testMessageTimeout(boolean async) throws Exception {


private void _testMessageReception(boolean async) throws Exception {
Object[] responses={new Message(null, a1, new Long(1)),new Message(null, a2, new Long(2))};
Object[] responses={new Message(null,a, new Long(1)),new Message(null,b, new Long(2))};
MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Object> req=new GroupRequest<Object>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
corr.setGroupRequest(req);
Expand All @@ -246,12 +307,12 @@ private void _testMessageReception(boolean async) throws Exception {

private void _testMessageReceptionWithViewChange(boolean async) throws Exception {
List<Address> new_dests=new ArrayList<Address>();
new_dests.add(a1);
new_dests.add(a2);
new_dests.add(a1);
Object[] responses={new Message(null, a1, new Long(1)),
new_dests.add(a);
new_dests.add(b);
new_dests.add(a);
Object[] responses={new Message(null,a, new Long(1)),
new View(Util.createRandomAddress(), 322649, new_dests),
new Message(null, a2, new Long(2))};
new Message(null,b, new Long(2))};
MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Long> req=new GroupRequest<Long>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
corr.setGroupRequest(req);
Expand All @@ -266,8 +327,8 @@ private void _testMessageReceptionWithViewChange(boolean async) throws Exception

private void _testMessageReceptionWithViewChangeMemberLeft(boolean async) throws Exception {
List<Address> new_dests=new ArrayList<Address>();
new_dests.add(a2);
Object[] responses={new Message(null, a2, new Long(1)),
new_dests.add(b);
Object[] responses={new Message(null,b, new Long(1)),
new View(Util.createRandomAddress(), 322649, new_dests)};
MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Object> req=new GroupRequest<Object>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
Expand Down

0 comments on commit 316175e

Please sign in to comment.