Permalink
Browse files

moved lock up to eliminate race condition (https://issues.jboss.org/b…

  • Loading branch information...
1 parent d796370 commit 7b654a70b3dc1af8e22f7675f6d7e9154de9fb52 Bela Ban committed May 27, 2011
@@ -120,18 +120,19 @@ public void receiveResponse(Object response_value, Address sender, boolean is_ex
RspFilter rsp_filter=options.getRspFilter();
boolean responseReceived=false;
- if(!rsp.wasReceived()) {
- if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
- if(is_exception && response_value instanceof Throwable)
- rsp.setException((Throwable)response_value);
- else
- rsp.setValue((T)response_value);
- }
- rsp.setReceived(responseReceived);
- }
lock.lock();
try {
+ if(!rsp.wasReceived()) {
+ if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
+ if(is_exception && response_value instanceof Throwable)
+ rsp.setException((Throwable)response_value);
+ else
+ rsp.setValue((T)response_value);
+ }
+ rsp.setReceived(responseReceived);
+ }
+
if(responseReceived)
num_received++;
done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses();
@@ -24,16 +24,14 @@
@BeforeClass
void init() throws UnknownHostException {
- a1=Util.createRandomAddress();
- a2=Util.createRandomAddress();
- a3=Util.createRandomAddress();
+ a1=Util.createRandomAddress("A1");
+ a2=Util.createRandomAddress("A2");
+ a3=Util.createRandomAddress("A3");
}
@BeforeMethod
protected void setUp() throws Exception {
dests=new Vector<Address>(Arrays.asList(a1, a2));
- dests.add(a1);
- dests.add(a2);
}
@AfterMethod
@@ -73,7 +71,7 @@ public void testGetFirstWithResponseFilter() throws Exception {
new Message(null, a3, new Long(3))};
MyTransport transport=new MyDelayedTransport(true, responses, 500);
dests.add(a3);
- GroupRequest req=new GroupRequest(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_FIRST, 0));
+ GroupRequest<Long> req=new GroupRequest<Long>(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_FIRST, 0));
req.setResponseFilter(new RspFilter() {
int num_rsps=0;
@@ -94,7 +92,7 @@ public boolean needMoreResponses() {
System.out.println("group request is " + req);
assert rc;
assert req.isDone();
- RspList results=req.getResults();
+ RspList<Long> results=req.getResults();
Assert.assertEquals(3, results.size());
Assert.assertEquals(1, results.numReceived());
}
@@ -107,7 +105,7 @@ public void testGetAllWithResponseFilter() throws Exception {
new Message(null, a3, new Long(3))};
MyTransport transport=new MyDelayedTransport(true, responses, 500);
dests.add(a3);
- GroupRequest req=new GroupRequest(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
+ GroupRequest<Long> req=new GroupRequest<Long>(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
req.setResponseFilter(new RspFilter() {
int num_rsps=0;
@@ -129,15 +127,59 @@ public boolean needMoreResponses() {
System.out.println("group request is " + req);
assert rc;
assert req.isDone();
- RspList results=req.getResults();
+ RspList<Long> results=req.getResults();
Assert.assertEquals(3, results.size());
Assert.assertEquals(2, results.numReceived());
}
+ public void testAllNullResponsesWithFilter() {
+ dests.add(a3);
+ GroupRequest<Boolean> req=new GroupRequest<Boolean>(new Message(), (Transport)null, dests, new RequestOptions(ResponseMode.GET_ALL, 10000));
+ assert !req.isDone();
+
+ req.setResponseFilter(new NonNullFilter());
+
+ for(Address sender: dests)
+ req.receiveResponse(null, sender, false);
+
+ assert !req.isDone();
+ }
+
+
+ public void testAllNullResponsesWithFilterGetFirst() {
+ dests.add(a3);
+ GroupRequest<Boolean> req=new GroupRequest<Boolean>(new Message(), (Transport)null, dests, new RequestOptions(ResponseMode.GET_FIRST, 10000));
+ assert !req.isDone();
+
+ req.setResponseFilter(new NonNullFilter());
+
+ req.receiveResponse(null, dests.get(0), false);
+ assert !req.isDone();
+
+ req.receiveResponse(true, dests.get(1), false);
+ assert req.isDone();
+ }
+
+
+ protected static class NonNullFilter implements RspFilter {
+ private volatile boolean validResponse;
+
+ public boolean isAcceptable(Object response, Address sender) {
+ if(response != null)
+ validResponse=true;
+ return response != null;
+ }
+
+ public boolean needMoreResponses() {
+ return !validResponse;
+ }
+ }
+
+
/**
- * test group timeout. demonstrates that the timeout mechanism times out too
- * quickly as multiple responses are received by the GroupRequest.
+ * test group timeout. demonstrates that the timeout mechanism times out too
+ * quickly as multiple responses are received by the GroupRequest.
* Demonstrates by group request receiving multiple messages in a timeframe
* less than the total timeout. the request will fail, as after each
* received message, the request alters the total timeout.
@@ -167,13 +209,13 @@ private void _testMessageTimeout(boolean async) throws Exception {
MyDelayedTransport tp = new MyDelayedTransport(async, responses, delay);
// instantiate request with dummy correlator
- GroupRequest req=new GroupRequest(new Message(), tp, dests, new RequestOptions(ResponseMode.GET_ALL, timeout));
+ GroupRequest<Long> req=new GroupRequest<Long>(new Message(), tp, dests, new RequestOptions(ResponseMode.GET_ALL, timeout));
tp.setGroupRequest(req);
boolean rc = req.execute();
System.out.println("group request is " + req);
assert rc;
assert req.isDone();
- RspList results = req.getResults();
+ RspList<Long> results = req.getResults();
Assert.assertEquals(dests.size(), results.size());
}
@@ -182,13 +224,13 @@ private void _testMessageTimeout(boolean async) throws Exception {
private void _testMessageReception(boolean async) throws Exception {
Object[] responses=new Message[]{new Message(null, a1, new Long(1)),new Message(null, a2, new Long(2))};
MyTransport transport=new MyTransport(async, responses);
- GroupRequest req=new GroupRequest(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
+ GroupRequest<Object> req=new GroupRequest<Object>(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
transport.setGroupRequest(req);
boolean rc=req.execute();
System.out.println("group request is " + req);
assert rc;
assert req.isDone();
- RspList results=req.getResults();
+ RspList<Object> results=req.getResults();
Assert.assertEquals(2, results.size());
}
@@ -203,13 +245,13 @@ private void _testMessageReceptionWithViewChange(boolean async) throws Exception
new View(Util.createRandomAddress(), 322649, new_dests),
new Message(null, a2, new Long(2))};
MyTransport transport=new MyTransport(async, responses);
- GroupRequest req=new GroupRequest(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
+ GroupRequest<Long> req=new GroupRequest<Long>(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
transport.setGroupRequest(req);
boolean rc=req.execute();
System.out.println("group request is " + req);
assert rc;
assert req.isDone();
- RspList results=req.getResults();
+ RspList<Long> results=req.getResults();
Assert.assertEquals(2, results.size());
}
@@ -220,15 +262,15 @@ private void _testMessageReceptionWithViewChangeMemberLeft(boolean async) throws
Object[] responses=new Object[]{new Message(null, a2, new Long(1)),
new View(Util.createRandomAddress(), 322649, new_dests)};
MyTransport transport=new MyTransport(async, responses);
- GroupRequest req=new GroupRequest(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
+ GroupRequest<Object> req=new GroupRequest<Object>(new Message(), transport, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
transport.setGroupRequest(req);
System.out.println("group request before execution: " + req);
boolean rc=req.execute();
System.out.println("group request after execution: " + req);
assert rc;
assert req.isDone();
- RspList results=req.getResults();
+ RspList<Object> results=req.getResults();
Assert.assertEquals(2, results.size());
}

0 comments on commit 7b654a7

Please sign in to comment.