Skip to content

Commit

Permalink
Moved request_msg out of Request into execute(Message req)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 2, 2016
1 parent 65c1eca commit 924f97f
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 61 deletions.
24 changes: 8 additions & 16 deletions src/org/jgroups/blocks/GroupRequest.java
Expand Up @@ -45,40 +45,32 @@ public class GroupRequest<T> extends Request {


/** Correlates requests and responses */ /** Correlates requests and responses */
@GuardedBy("lock") @GuardedBy("lock")
private final Map<Address,Rsp<T>> requests; protected final Map<Address,Rsp<T>> requests;


@GuardedBy("lock") @GuardedBy("lock")
int num_valid; // the number of valid responses (values or exceptions that passed the response filter) protected int num_valid; // the number of valid responses (values or exceptions that passed the response filter)


@GuardedBy("lock") @GuardedBy("lock")
int num_received; // number of responses (values, exceptions or suspicions) protected int num_received; // number of responses (values, exceptions or suspicions)








/** /**
* @param msg The message to be sent
* @param corr The request correlator to be used. A request correlator sends requests tagged with a unique ID and * @param corr The request correlator to be used. A request correlator sends requests tagged with a unique ID and
* notifies the sender when matching responses are received. The reason {@code GroupRequest} uses * notifies the sender when matching responses are received. The reason {@code GroupRequest} uses
* it instead of a {@code Transport} is that multiple requests/responses might be sent/received concurrently * it instead of a {@code Transport} is that multiple requests/responses might be sent/received concurrently
* @param targets The targets, which are supposed to receive the message. Any receiver not in this set will * @param targets The targets, which are supposed to receive the message. Any receiver not in this set will
* discard the message. Targets are always a subset of the current membership * discard the message. Targets are always a subset of the current membership
* @param options The request options to be used for this call * @param options The request options to be used for this call
*/ */
public GroupRequest(Message msg, RequestCorrelator corr, Collection<Address> targets, RequestOptions options) { public GroupRequest(RequestCorrelator corr, Collection<Address> targets, RequestOptions options) {
super(msg, corr, options); super(corr, options);
int size=targets.size(); int size=targets.size();
requests=new HashMap<>(size); requests=new HashMap<>(size);
setTargets(targets); setTargets(targets);
} }


public GroupRequest(Message msg, RequestCorrelator corr, Address target, RequestOptions options) {
super(msg, corr, options);
requests=new HashMap<>(1);
setTarget(target);
}




public boolean getAnycasting() { public boolean getAnycasting() {
return options.getAnycasting(); return options.getAnycasting();
Expand All @@ -89,8 +81,8 @@ public void setAnycasting(boolean anycasting) {
} }




public void sendRequest() throws Exception { public void sendRequest(final Message req) throws Exception {
sendRequest(requests.keySet()); sendRequest(req, requests.keySet());
} }


/* ---------------------- Interface RspCollector -------------------------- */ /* ---------------------- Interface RspCollector -------------------------- */
Expand Down Expand Up @@ -340,7 +332,7 @@ private static int determineMajority(int i) {
} }




private void sendRequest(final Collection<Address> targetMembers) throws Exception { private void sendRequest(final Message request_msg, final Collection<Address> targetMembers) throws Exception {
try { try {
corr.sendRequest(targetMembers, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this, options); corr.sendRequest(targetMembers, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this, options);
} }
Expand Down
12 changes: 6 additions & 6 deletions src/org/jgroups/blocks/MessageDispatcher.java
Expand Up @@ -327,14 +327,14 @@ protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg,
rpc_stats.add(RpcStats.Type.MULTICAST, null, sync, 0); rpc_stats.add(RpcStats.Type.MULTICAST, null, sync, 0);
} }


GroupRequest<T> req=new GroupRequest<>(msg, corr, real_dests, options); GroupRequest<T> req=new GroupRequest<>(corr, real_dests, options);
if(listener != null) if(listener != null)
req.setListener(listener); req.setListener(listener);
req.setResponseFilter(options.getRspFilter()); req.setResponseFilter(options.getRspFilter());
req.setAnycasting(options.getAnycasting()); req.setAnycasting(options.getAnycasting());
req.setBlockForResults(block_for_results); req.setBlockForResults(block_for_results);
long start=non_blocking || !rpc_stats.extendedStats()? 0 : System.nanoTime(); long start=non_blocking || !rpc_stats.extendedStats()? 0 : System.nanoTime();
req.execute(); req.execute(msg);
long time=non_blocking || !rpc_stats.extendedStats()? 0 : System.nanoTime() - start; long time=non_blocking || !rpc_stats.extendedStats()? 0 : System.nanoTime() - start;
if(!non_blocking) { if(!non_blocking) {
if(anycast) if(anycast)
Expand Down Expand Up @@ -379,9 +379,9 @@ public <T> T sendMessage(Message msg, RequestOptions opts) throws Exception {
boolean async_rpc=opts.getMode() == ResponseMode.GET_NONE; boolean async_rpc=opts.getMode() == ResponseMode.GET_NONE;
if(async_rpc) if(async_rpc)
rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0); rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0);
UnicastRequest<T> req=new UnicastRequest<>(msg, corr, dest, opts); UnicastRequest<T> req=new UnicastRequest<>(corr, dest, opts);
long start=async_rpc || !rpc_stats.extendedStats()? 0 : System.nanoTime(); long start=async_rpc || !rpc_stats.extendedStats()? 0 : System.nanoTime();
req.execute(); req.execute(msg);
if(async_rpc) if(async_rpc)
return null; return null;
long time=!rpc_stats.extendedStats()? 0 : System.nanoTime() - start; long time=!rpc_stats.extendedStats()? 0 : System.nanoTime() - start;
Expand Down Expand Up @@ -429,11 +429,11 @@ public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions


msg.setFlag(options.getFlags()).setTransientFlag(options.getTransientFlags()); msg.setFlag(options.getFlags()).setTransientFlag(options.getTransientFlags());
rpc_stats.add(RpcStats.Type.UNICAST, dest, options.getMode() != ResponseMode.GET_NONE, 0); rpc_stats.add(RpcStats.Type.UNICAST, dest, options.getMode() != ResponseMode.GET_NONE, 0);
UnicastRequest<T> req=new UnicastRequest<>(msg, corr, dest, options); UnicastRequest<T> req=new UnicastRequest<>(corr, dest, options);
if(listener != null) if(listener != null)
req.setListener(listener); req.setListener(listener);
req.setBlockForResults(false); req.setBlockForResults(false);
req.execute(); req.execute(msg);
if(options.getMode() == ResponseMode.GET_NONE) if(options.getMode() == ResponseMode.GET_NONE)
return new NullFuture<>(null); return new NullFuture<>(null);
return req; return req;
Expand Down
18 changes: 9 additions & 9 deletions src/org/jgroups/blocks/Request.java
Expand Up @@ -30,7 +30,6 @@ public abstract class Request implements NotifyingFuture {


/** Is set as soon as the request has received all required responses */ /** Is set as soon as the request has received all required responses */
protected final CondVar cond=new CondVar(lock); protected final CondVar cond=new CondVar(lock);
protected final Message request_msg;
protected final RequestCorrelator corr; // either use RequestCorrelator or ... protected final RequestCorrelator corr; // either use RequestCorrelator or ...
protected final RequestOptions options; protected final RequestOptions options;
protected volatile boolean done; protected volatile boolean done;
Expand All @@ -39,39 +38,40 @@ public abstract class Request implements NotifyingFuture {






public Request(Message request, RequestCorrelator corr, RequestOptions options) { public Request(RequestCorrelator corr, RequestOptions options) {
this.request_msg=request;
this.corr=corr; this.corr=corr;
this.options=options; this.options=options;
} }




public void setResponseFilter(RspFilter filter) { public Request setResponseFilter(RspFilter filter) {
options.setRspFilter(filter); options.setRspFilter(filter);
return this;
} }


public boolean getBlockForResults() { public boolean getBlockForResults() {
return block_for_results; return block_for_results;
} }


public void setBlockForResults(boolean block_for_results) { public Request setBlockForResults(boolean block_for_results) {
this.block_for_results=block_for_results; this.block_for_results=block_for_results;
return this;
} }


public NotifyingFuture setListener(FutureListener listener) { public Request setListener(FutureListener listener) {
this.listener=listener; this.listener=listener;
if(done) if(done)
listener.futureDone(this); listener.futureDone(this);
return this; return this;
} }


public boolean execute() throws Exception { public boolean execute(final Message req) throws Exception {
if(corr == null) { if(corr == null) {
log.error(Util.getMessage("CorrIsNullCannotSendRequest")); log.error(Util.getMessage("CorrIsNullCannotSendRequest"));
return false; return false;
} }


sendRequest(); sendRequest(req);
if(!block_for_results || options.getMode() == ResponseMode.GET_NONE) if(!block_for_results || options.getMode() == ResponseMode.GET_NONE)
return true; return true;


Expand All @@ -85,7 +85,7 @@ public boolean execute() throws Exception {
} }
} }


protected abstract void sendRequest() throws Exception; protected abstract void sendRequest(final Message request_msg) throws Exception;


public abstract void receiveResponse(Object response_value, Address sender, boolean is_exception); public abstract void receiveResponse(Object response_value, Address sender, boolean is_exception);


Expand Down
12 changes: 3 additions & 9 deletions src/org/jgroups/blocks/UnicastRequest.java
Expand Up @@ -25,20 +25,14 @@ public class UnicastRequest<T> extends Request {






public UnicastRequest(Message msg, RequestCorrelator corr, Address target, RequestOptions options) { public UnicastRequest(RequestCorrelator corr, Address target, RequestOptions options) {
super(msg, corr, options); super(corr, options);
this.target=target;
result=new Rsp<>(target);
}

public UnicastRequest(Message msg, Address target, RequestOptions options) {
super(msg, null, options);
this.target=target; this.target=target;
result=new Rsp<>(target); result=new Rsp<>(target);
} }




protected void sendRequest() throws Exception { protected void sendRequest(final Message request_msg) throws Exception {
try { try {
corr.sendUnicastRequest(target, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this); corr.sendUnicastRequest(target, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this);
} }
Expand Down
39 changes: 18 additions & 21 deletions tests/junit-functional/org/jgroups/blocks/GroupRequestTest.java
Expand Up @@ -74,7 +74,7 @@ public void testGetFirstWithResponseFilter() throws Exception {
new Message(null,c, (long)3)}; new Message(null,c, (long)3)};
MyCorrelator corr=new MyCorrelator(true, responses, 500); MyCorrelator corr=new MyCorrelator(true, responses, 500);
dests.add(c); dests.add(c);
GroupRequest<Long> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_FIRST, 0)); GroupRequest<Long> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_FIRST, 0));
req.setResponseFilter(new RspFilter() { req.setResponseFilter(new RspFilter() {
int num_rsps=0; int num_rsps=0;


Expand All @@ -91,7 +91,7 @@ public boolean needMoreResponses() {
} }
}); });
corr.setGroupRequest(req); corr.setGroupRequest(req);
boolean rc=req.execute(); boolean rc=req.execute(new Message());
System.out.println("group request is " + req); System.out.println("group request is " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand All @@ -107,7 +107,7 @@ public void testGetAllWithResponseFilter() throws Exception {
new Message(null,c, (long)3)}; new Message(null,c, (long)3)};
MyCorrelator corr=new MyCorrelator(true, responses, 500); MyCorrelator corr=new MyCorrelator(true, responses, 500);
dests.add(c); dests.add(c);
GroupRequest<Long> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0)); GroupRequest<Long> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
req.setResponseFilter(new RspFilter() { req.setResponseFilter(new RspFilter() {
int num_rsps=0; int num_rsps=0;


Expand All @@ -125,7 +125,7 @@ public boolean needMoreResponses() {
} }
}); });
corr.setGroupRequest(req); corr.setGroupRequest(req);
boolean rc=req.execute(); boolean rc=req.execute(new Message());
System.out.println("group request is " + req); System.out.println("group request is " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand All @@ -141,8 +141,7 @@ public boolean needMoreResponses() {
*/ */
public void testAllNullResponsesWithFilter() { public void testAllNullResponsesWithFilter() {
dests.add(c); dests.add(c);
GroupRequest<Boolean> req=new GroupRequest<>(new Message(), null, dests, GroupRequest<Boolean> req=new GroupRequest<>(null, dests, RequestOptions.SYNC());
new RequestOptions(ResponseMode.GET_ALL, 10000));
assert !req.isDone(); assert !req.isDone();


req.setResponseFilter(new NonNullFilter()); req.setResponseFilter(new NonNullFilter());
Expand All @@ -156,12 +155,10 @@ public void testAllNullResponsesWithFilter() {


public void testAllNullResponsesWithFilterGetFirst() { public void testAllNullResponsesWithFilterGetFirst() {
dests.add(c); dests.add(c);
GroupRequest<Boolean> req=new GroupRequest<>(new Message(), null, dests, GroupRequest<Boolean> req=new GroupRequest<>(null, dests, new RequestOptions(ResponseMode.GET_FIRST, 10000));
new RequestOptions(ResponseMode.GET_FIRST, 10000));
assert !req.isDone(); assert !req.isDone();


req.setResponseFilter(new NonNullFilter()); req.setResponseFilter(new NonNullFilter());

req.receiveResponse(null, dests.get(0), false); req.receiveResponse(null, dests.get(0), false);
assert !req.isDone(); assert !req.isDone();


Expand All @@ -174,7 +171,7 @@ public void testAllNullResponsesWithFilterGetFirst() {
* a blocking RPC. https://issues.jboss.org/browse/JGRP-1505 * a blocking RPC. https://issues.jboss.org/browse/JGRP-1505
*/ */
public void testResponsesComplete() { public void testResponsesComplete() {
GroupRequest<Integer> req=new GroupRequest<>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC()); GroupRequest<Integer> req=new GroupRequest<>(null, Arrays.asList(a,b,c), RequestOptions.SYNC());
checkComplete(req, false); checkComplete(req, false);


req.receiveResponse(1, a, false); req.receiveResponse(1, a, false);
Expand All @@ -188,7 +185,7 @@ public void testResponsesComplete() {
checkComplete(req, true); checkComplete(req, true);




req=new GroupRequest<>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC()); req=new GroupRequest<>(null, Arrays.asList(a,b,c), RequestOptions.SYNC());
req.receiveResponse(1, a, false); req.receiveResponse(1, a, false);
checkComplete(req, false); checkComplete(req, false);


Expand All @@ -207,7 +204,7 @@ public void testResponsesComplete() {
* a blocking RPC. https://issues.jboss.org/browse/JGRP-1505 * a blocking RPC. https://issues.jboss.org/browse/JGRP-1505
*/ */
public void testResponsesComplete2() { public void testResponsesComplete2() {
GroupRequest<Integer> req=new GroupRequest<>(null, null, Arrays.asList(a,b,c), RequestOptions.SYNC()); GroupRequest<Integer> req=new GroupRequest<>(null, Arrays.asList(a,b,c), RequestOptions.SYNC());
req.suspect(a); req.suspect(a);
checkComplete(req, false); checkComplete(req, false);


Expand All @@ -230,7 +227,7 @@ public void testResponsesComplete3() {
Address two=new SiteUUID((UUID)Util.createRandomAddress("sfo1"), "sfo1", "SFO"); Address two=new SiteUUID((UUID)Util.createRandomAddress("sfo1"), "sfo1", "SFO");
Address three=new SiteUUID((UUID)Util.createRandomAddress("nyc1"), "nyc1", "NYC"); Address three=new SiteUUID((UUID)Util.createRandomAddress("nyc1"), "nyc1", "NYC");


GroupRequest<Integer> req=new GroupRequest<>(null, null, Arrays.asList(one, two, three), RequestOptions.SYNC()); GroupRequest<Integer> req=new GroupRequest<>(null, Arrays.asList(one, two, three), RequestOptions.SYNC());
req.suspect(one); req.suspect(one);
req.receiveResponse(1, one, false); req.receiveResponse(1, one, false);
req.siteUnreachable("LON"); req.siteUnreachable("LON");
Expand Down Expand Up @@ -302,9 +299,9 @@ private void _testMessageTimeout(boolean async) throws Exception {
MyCorrelator corr = new MyCorrelator(async, responses, delay); MyCorrelator corr = new MyCorrelator(async, responses, delay);


// instantiate request with dummy correlator // instantiate request with dummy correlator
GroupRequest<Long> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, timeout)); GroupRequest<Long> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_ALL, timeout));
corr.setGroupRequest(req); corr.setGroupRequest(req);
boolean rc = req.execute(); boolean rc = req.execute(new Message());
System.out.println("group request is " + req); System.out.println("group request is " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand All @@ -317,9 +314,9 @@ private void _testMessageTimeout(boolean async) throws Exception {
private void _testMessageReception(boolean async) throws Exception { private void _testMessageReception(boolean async) throws Exception {
Object[] responses={new Message(null,a, (long)1),new Message(null,b, (long)2)}; Object[] responses={new Message(null,a, (long)1),new Message(null,b, (long)2)};
MyCorrelator corr=new MyCorrelator(async, responses, 0); MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Object> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0)); GroupRequest<Object> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
corr.setGroupRequest(req); corr.setGroupRequest(req);
boolean rc=req.execute(); boolean rc=req.execute(new Message());
System.out.println("group request is " + req); System.out.println("group request is " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand All @@ -338,9 +335,9 @@ private void _testMessageReceptionWithViewChange(boolean async) throws Exception
new View(Util.createRandomAddress(), 322649, new_dests), new View(Util.createRandomAddress(), 322649, new_dests),
new Message(null,b, (long)2)}; new Message(null,b, (long)2)};
MyCorrelator corr=new MyCorrelator(async, responses, 0); MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Long> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0)); GroupRequest<Long> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));
corr.setGroupRequest(req); corr.setGroupRequest(req);
boolean rc=req.execute(); boolean rc=req.execute(new Message());
System.out.println("group request is " + req); System.out.println("group request is " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand All @@ -355,11 +352,11 @@ private void _testMessageReceptionWithViewChangeMemberLeft(boolean async) throws
Object[] responses={new Message(null,b, (long)1), Object[] responses={new Message(null,b, (long)1),
new View(Util.createRandomAddress(), 322649, new_dests)}; new View(Util.createRandomAddress(), 322649, new_dests)};
MyCorrelator corr=new MyCorrelator(async, responses, 0); MyCorrelator corr=new MyCorrelator(async, responses, 0);
GroupRequest<Object> req=new GroupRequest<>(new Message(), corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0)); GroupRequest<Object> req=new GroupRequest<>(corr, dests, new RequestOptions(ResponseMode.GET_ALL, 0));


corr.setGroupRequest(req); corr.setGroupRequest(req);
System.out.println("group request before execution: " + req); System.out.println("group request before execution: " + req);
boolean rc=req.execute(); boolean rc=req.execute(new Message());
System.out.println("group request after execution: " + req); System.out.println("group request after execution: " + req);
assert rc; assert rc;
assert req.isDone(); assert req.isDone();
Expand Down

0 comments on commit 924f97f

Please sign in to comment.