Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'JGRP-1141' (typed RpcDispatcher methods)

  • Loading branch information...
commit 63307a835da0a62bf271d2991393e9a7fffbd105 2 parents 492a6dc + a310c3f
@belaban authored
Showing with 360 additions and 613 deletions.
  1. +46 −0 doc/API_Changes.txt
  2. +25 −21 src/org/jgroups/blocks/GroupRequest.java
  3. +14 −10 src/org/jgroups/blocks/MessageDispatcher.java
  4. +0 −378 src/org/jgroups/blocks/MultiRequest.java
  5. +1 −1  src/org/jgroups/blocks/ReplCache.java
  6. +1 −1  src/org/jgroups/blocks/Request.java
  7. +25 −11 src/org/jgroups/blocks/RequestCorrelator.java
  8. +1 −1  src/org/jgroups/blocks/RequestHandler.java
  9. +27 −41 src/org/jgroups/blocks/RpcDispatcher.java
  10. +1 −1  src/org/jgroups/blocks/RspCollector.java
  11. +18 −17 src/org/jgroups/blocks/UnicastRequest.java
  12. +1 −1  src/org/jgroups/blocks/mux/MuxMessageDispatcher.java
  13. +1 −1  src/org/jgroups/blocks/mux/MuxRpcDispatcher.java
  14. +10 −12 src/org/jgroups/demos/QuoteClient.java
  15. +1 −1  src/org/jgroups/demos/RelayDemoRpc.java
  16. +43 −13 src/org/jgroups/util/Rsp.java
  17. +38 −40 src/org/jgroups/util/RspList.java
  18. +2 −2 tests/junit-functional/org/jgroups/blocks/GroupRequestTest.java
  19. +1 −1  tests/junit-functional/org/jgroups/tests/RspListTest.java
  20. +2 −2 tests/junit/org/jgroups/blocks/MuxMessageDispatcherTest.java
  21. +2 −2 tests/junit/org/jgroups/blocks/MuxRpcDispatcherTest.java
  22. +4 −4 tests/junit/org/jgroups/blocks/RpcDispatcherSerializationTest.java
  23. +80 −40 tests/junit/org/jgroups/blocks/RpcDispatcherTest.java
  24. +1 −1  tests/junit/org/jgroups/tests/ChannelTestBase.java
  25. +13 −9 tests/junit/org/jgroups/tests/Deadlock2Test.java
  26. +2 −2 tests/other/org/jgroups/tests/UnicastTestRpcDist.java
View
46 doc/API_Changes.txt
@@ -49,6 +49,52 @@ API changes in 3.0.0
- Removed Address.isMulticastAddress(): a null address is a multicast address
+- Rsp, RspList, RpcDispatcher.callRemoteMethodsXXX() and MessageDispatcher.cast/sendXXX() methods now use
+ generics, so the code below:
+
+ RspList rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ for(Map.Entry<Address,Rsp> rsp: rsps.entrySet())
+ System.out.println(rsp.getKey() + ": " + rsp.getValue().getValue());
+
+ has to be changed to:
+
+ RspList<Date> rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ for(Map.Entry<Address,Rsp<Date>> rsp: rsps.entrySet())
+ System.out.println(rsp.getKey() + ": " + rsp.getValue().getValue());
+
+- RpcDispatcher.callRemoteMethod() now throws an exception if the target method threw an exception
+
+- RpcDispatcher.callRemoteMethods() return a RspList. If a member P threw an exception, then the Rsp for
+ P will have it in the field 'exception'. The 'result' field is *not* used for exceptions any longer !
+
+ An example to check for exceptions is:
+
+
+ Multicast:
+
+ RspList<Long> rsps=dispatcher.callRemoteMethods(null, "foo", null, null, new RequestOptions(...));
+ for(Rsp<Long> rsp: rsps.values) {
+ if(rsp.getException() != null) {
+ // we have an exception
+ }
+ else {
+ Long val=rsp.getValue();
+ // do something with the value
+ }
+ }
+
+ Unicast:
+
+ try {
+ Rsp<Long> rsp=disp.callRemoteMethod(target, "foo", null, null, new RequestOptions(...));
+ Long val=rsp.getValue();
+ // do something with the return value
+ }
+ catch(Throwable t) {
+ // "foo" threw an exception
+ }
+
+
View
46 src/org/jgroups/blocks/GroupRequest.java
@@ -48,11 +48,11 @@
*
* @author Bela Ban
*/
-public class GroupRequest extends Request {
+public class GroupRequest<T> extends Request {
/** Correlates requests and responses */
@GuardedBy("lock")
- private final Map<Address,Rsp> requests;
+ private final Map<Address,Rsp<T>> requests;
@GuardedBy("lock")
int num_received, num_not_received, num_suspected;
@@ -72,13 +72,13 @@
public GroupRequest(Message msg, RequestCorrelator corr, Collection<Address> targets, RequestOptions options) {
super(msg, corr, null, options);
int size=targets.size();
- requests=new HashMap<Address,Rsp>(size);
+ requests=new HashMap<Address,Rsp<T>>(size);
setTargets(targets);
}
public GroupRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options) {
super(m, corr, null, options);
- requests=new HashMap<Address,Rsp>(1);
+ requests=new HashMap<Address,Rsp<T>>(1);
setTarget(target);
}
@@ -86,7 +86,7 @@ public GroupRequest(Message m, RequestCorrelator corr, Address target, RequestOp
public GroupRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options) {
super(m, null, transport, options);
int size=mbrs.size();
- requests=new HashMap<Address,Rsp>(size);
+ requests=new HashMap<Address,Rsp<T>>(size);
setTargets(mbrs);
}
@@ -111,18 +111,22 @@ public void sendRequest() throws Exception {
* Adds a response to the response table. When all responses have been received,
* <code>execute()</code> returns.
*/
- public void receiveResponse(Object response_value, Address sender) {
+ public void receiveResponse(Object response_value, Address sender, boolean is_exception) {
if(done)
return;
- Rsp rsp=requests.get(sender);
+ Rsp<T> rsp=requests.get(sender);
if(rsp == null)
return;
RspFilter rsp_filter=options.getRspFilter();
boolean responseReceived=false;
if(!rsp.wasReceived()) {
- if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender)))
- rsp.setValue(response_value);
+ if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
+ if(is_exception && response_value instanceof Throwable)
+ rsp.setException((Throwable)response_value);
+ else
+ rsp.setValue((T)response_value);
+ }
rsp.setReceived(responseReceived);
}
@@ -155,7 +159,7 @@ public void suspect(Address suspected_member) {
return;
boolean changed=false;
- Rsp rsp=requests.get(suspected_member);
+ Rsp<T> rsp=requests.get(suspected_member);
if(rsp != null) {
if(rsp.setSuspected(true)) {
rsp.setValue(null);
@@ -204,10 +208,10 @@ public void viewChange(View new_view) {
lock.lock();
try {
- for(Map.Entry<Address,Rsp> entry: requests.entrySet()) {
+ for(Map.Entry<Address,Rsp<T>> entry: requests.entrySet()) {
Address mbr=entry.getKey();
if(!mbrs.contains(mbr)) {
- Rsp rsp=entry.getValue();
+ Rsp<T> rsp=entry.getValue();
rsp.setValue(null);
if(rsp.setSuspected(true)) {
num_suspected++;
@@ -231,14 +235,14 @@ public void viewChange(View new_view) {
/** Returns the results as a RspList */
- public RspList getResults() {
- Collection<Rsp> rsps=requests.values();
- return new RspList(rsps);
+ public RspList<T> getResults() {
+ Collection<Rsp<T>> rsps=requests.values();
+ return new RspList<T>(rsps);
}
- public RspList get() throws InterruptedException, ExecutionException {
+ public RspList<T> get() throws InterruptedException, ExecutionException {
lock.lock();
try {
waitForResults(0);
@@ -249,7 +253,7 @@ public RspList get() throws InterruptedException, ExecutionException {
return getResults();
}
- public RspList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ public RspList<T> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean ok;
lock.lock();
try {
@@ -269,9 +273,9 @@ public String toString() {
if(!requests.isEmpty()) {
ret.append(", entries:\n");
- for(Map.Entry<Address,Rsp> entry: requests.entrySet()) {
+ for(Map.Entry<Address,Rsp<T>> entry: requests.entrySet()) {
Address mbr=entry.getKey();
- Rsp rsp=entry.getValue();
+ Rsp<T> rsp=entry.getValue();
ret.append(mbr).append(": ").append(rsp).append("\n");
}
}
@@ -282,13 +286,13 @@ public String toString() {
/* --------------------------------- Private Methods -------------------------------------*/
private void setTarget(Address mbr) {
- requests.put(mbr, new Rsp(mbr));
+ requests.put(mbr, new Rsp<T>(mbr));
num_not_received=1;
}
private void setTargets(Collection<Address> mbrs) {
for(Address mbr: mbrs)
- requests.put(mbr, new Rsp(mbr));
+ requests.put(mbr, new Rsp<T>(mbr));
num_not_received=requests.size();
}
View
24 src/org/jgroups/blocks/MessageDispatcher.java
@@ -236,18 +236,18 @@ else if (canReplace) {
* @return
* @since 2.9
*/
- public RspList castMessage(final Collection<Address> dests, Message msg, RequestOptions options) {
- GroupRequest req=cast(dests, msg, options, true);
+ public <T> RspList<T> castMessage(final Collection<Address> dests, Message msg, RequestOptions options) {
+ GroupRequest<T> req=cast(dests, msg, options, true);
return req != null? req.getResults() : RspList.EMPTY_RSP_LIST;
}
- public NotifyingFuture<RspList> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) {
- GroupRequest req=cast(dests, msg, options, false);
+ public <T> NotifyingFuture<RspList<T>> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) {
+ GroupRequest<T> req=cast(dests, msg, options, false);
return req != null? req : new NullFuture<RspList>(RspList.EMPTY_RSP_LIST);
}
- protected GroupRequest cast(final Collection<Address> dests, Message msg, RequestOptions options, boolean block_for_results) {
+ protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg, RequestOptions options, boolean block_for_results) {
List<Address> real_dests;
// we need to clone because we don't want to modify the original
@@ -289,7 +289,7 @@ protected GroupRequest cast(final Collection<Address> dests, Message msg, Reques
return null;
}
- GroupRequest req=new GroupRequest(msg, corr, real_dests, options);
+ GroupRequest<T> req=new GroupRequest<T>(msg, corr, real_dests, options);
if(options != null) {
req.setResponseFilter(options.getRspFilter());
req.setAnycasting(options.getAnycasting());
@@ -315,7 +315,7 @@ public void done(long req_id) {
- public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutException, SuspectedException {
+ public <T> T sendMessage(Message msg, RequestOptions opts) throws Throwable {
Address dest=msg.getDest();
if(dest == null) {
if(log.isErrorEnabled())
@@ -329,7 +329,7 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
msg.setScope(opts.getScope());
}
- UnicastRequest req=new UnicastRequest(msg, corr, dest, opts);
+ UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, opts);
try {
req.execute();
}
@@ -340,9 +340,13 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
if(opts != null && opts.getMode() == ResponseMode.GET_NONE)
return null;
- Rsp rsp=req.getResult();
+ Rsp<T> rsp=req.getResult();
if(rsp.wasSuspected())
throw new SuspectedException(dest);
+
+ if(rsp.getException() != null)
+ throw rsp.getException();
+
if(!rsp.wasReceived())
throw new TimeoutException("timeout sending message to " + dest);
return rsp.getValue();
@@ -379,7 +383,7 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
/* ------------------------ RequestHandler Interface ---------------------- */
- public Object handle(Message msg) {
+ public Object handle(Message msg) throws Throwable {
if(req_handler != null) {
return req_handler.handle(msg);
}
View
378 src/org/jgroups/blocks/MultiRequest.java
@@ -1,378 +0,0 @@
-package org.jgroups.blocks;
-
-
-import org.jgroups.Address;
-import org.jgroups.Message;
-import org.jgroups.Transport;
-import org.jgroups.View;
-import org.jgroups.annotations.GuardedBy;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-
-/**
- * Sends a request to multiple destinations. Alternative implementation when we have few targets: between UnicastRequest
- * with 1 target and GroupRequest with many destination members. Performance is about the same as for GroupRequest, but
- * this class should use less memory as it doesn't create hashmaps. Don't use with many targets as we have to do
- * a linear search through an array of targets to match a response to a request.<p/>
- * MultiRequest is currently not used
- *
- * @author Bela Ban
- * @since 2.9
- */
-public class MultiRequest extends Request {
- @GuardedBy("lock")
- private final Rsp[] responses;
-
- protected final int expected_mbrs;
-
- @GuardedBy("lock")
- int num_received, num_not_received, num_suspected;
-
-
-
- /**
- * @param m
- * The message to be sent
- * @param corr
- * The request correlator to be used. A request correlator
- * sends requests tagged with a unique ID and notifies the
- * sender when matching responses are received. The reason
- * <code>GroupRequest</code> uses it instead of a
- * <code>Transport</code> is that multiple
- * requests/responses might be sent/received concurrently.
- * @param mbrs
- * The initial membership. This value reflects the membership
- * to which the request is sent (and from which potential
- * responses are expected). Is reset by reset().
- * @param options The options to be passed to the request
- */
- public MultiRequest(Message m, RequestCorrelator corr, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) {
- super(m, corr, null, options);
- this.expected_mbrs=expected_mbrs;
- responses=new Rsp[mbrs.size()];
- setTargets(mbrs);
- }
-
- public MultiRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options, int expected_mbrs) {
- super(m, corr, null, options);
- this.expected_mbrs=expected_mbrs;
- responses=new Rsp[1];
- setTarget(target);
- }
-
-
-
- /**
- * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- * (e.g. if a suspicion service is available; timeouts are not needed).
- */
- public MultiRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) {
- super(m, null, transport, options);
- this.expected_mbrs=expected_mbrs;
- responses=new Rsp[1];
- setTargets(mbrs);
- }
-
- void setTarget(Address mbr) {
- responses[0]=new Rsp(mbr);
- num_not_received++;
- }
-
- void setTargets(Collection<Address> mbrs) {
- int index=0;
- for(Address mbr: mbrs) {
- responses[index++]=new Rsp(mbr);
- num_not_received++;
- }
- }
-
- public boolean getAnycasting() {
- return options.getAnycasting();
- }
-
- public void setAnycasting(boolean anycasting) {
- options.setAnycasting(anycasting);
- }
-
-
-
- public void sendRequest() throws Exception {
- List<Address> targets=null;
- targets=new ArrayList<Address>(responses.length);
- for(Rsp rsp: responses)
- targets.add(rsp.getSender());
-
- sendRequest(targets, req_id, options);
- }
-
- Rsp findResponse(Address target) {
- for(Rsp rsp: responses) {
- if(rsp != null && target.equals(rsp.getSender()))
- return rsp;
- }
- return null;
- }
-
- /* ---------------------- Interface RspCollector -------------------------- */
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Adds a response to the response table. When all responses have been received,
- * <code>execute()</code> returns.
- */
- public void receiveResponse(Object response_value, Address sender) {
- if(done)
- return;
- Rsp rsp=findResponse(sender);
- if(rsp == null)
- return;
-
- RspFilter rsp_filter=options.getRspFilter();
- boolean responseReceived=false;
- if(!rsp.wasReceived()) {
- if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender)))
- rsp.setValue(response_value);
- rsp.setReceived(responseReceived);
- }
-
- lock.lock();
- try {
- if(responseReceived)
- num_received++;
- done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses();
- if(responseReceived || done)
- completed.signalAll(); // wakes up execute()
- if(done && corr != null)
- corr.done(req_id);
- }
- finally {
- lock.unlock();
- }
- if(responseReceived || done)
- checkCompletion(this);
- }
-
-
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
- * This method would probably be called when getting a suspect message from a failure detector
- * (where available). It is used to exclude faulty members from the response list.
- */
- public void suspect(Address suspected_member) {
- if(suspected_member == null)
- return;
-
- boolean changed=false;
- Rsp rsp=findResponse(suspected_member);
- if(rsp != null) {
- if(rsp.setSuspected(true)) {
- rsp.setValue(null);
- changed=true;
- lock.lock();
- try {
- num_suspected++;
- completed.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
- }
-
- if(changed)
- checkCompletion(this);
- }
-
-
- /**
- * Any member of 'membership' that is not in the new view is flagged as
- * SUSPECTED. Any member in the new view that is <em>not</em> in the
- * membership (ie, the set of responses expected for the current RPC) will
- * <em>not</em> be added to it. If we did this we might run into the
- * following problem:
- * <ul>
- * <li>Membership is {A,B}
- * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
- * invocation handler)
- * <li>C joins while A waits for responses from A and B
- * <li>If this would generate a new view {A,B,C} and if this expanded the
- * response set to {A,B,C}, A would wait forever on C's response because C
- * never received the request in the first place, therefore won't send a
- * response.
- * </ul>
- */
- public void viewChange(View new_view) {
- List<Address> mbrs=new_view != null? new_view.getMembers() : null;
- if(mbrs == null)
- return;
-
- boolean changed=false;
- if(responses == null || responses.length == 0)
- return;
-
- lock.lock();
- try {
- for(Rsp rsp: responses) {
- Address mbr=rsp.getSender();
- if(!mbrs.contains(mbr)) {
- rsp.setValue(null);
- if(rsp.setSuspected(true)) {
- num_suspected++;
- changed=true;
- }
- }
- }
- if(changed)
- completed.signalAll();
- }
- finally {
- lock.unlock();
- }
- if(changed)
- checkCompletion(this);
- }
-
-
- /* -------------------- End of Interface RspCollector ----------------------------------- */
-
-
-
- /** Returns the results as a RspList */
- public RspList getResults() {
- RspList list=new RspList();
- for(Rsp rsp: responses)
- list.put(rsp.getSender(), rsp);
- return list;
- }
-
-
-
- public RspList get() throws InterruptedException, ExecutionException {
- lock.lock();
- try {
- waitForResults(0);
- }
- finally {
- lock.unlock();
- }
- return getResults();
- }
-
- public RspList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- boolean ok;
- lock.lock();
- try {
- ok=waitForResults(unit.toMillis(timeout));
- }
- finally {
- lock.unlock();
- }
- if(!ok)
- throw new TimeoutException();
- return getResults();
- }
-
- public String toString() {
- StringBuilder ret=new StringBuilder(128);
- ret.append(super.toString());
-
- lock.lock();
- try {
- if(!(responses.length == 0)) {
- ret.append(", entries:\n");
- for(Rsp rsp: responses) {
- Address mbr=rsp.getSender();
- ret.append(mbr).append(": ").append(rsp).append("\n");
- }
- }
- }
- finally {
- lock.unlock();
- }
- return ret.toString();
- }
-
-
-
-
- /* --------------------------------- Private Methods -------------------------------------*/
-
- private static int determineMajority(int i) {
- return i < 2? i : (i / 2) + 1;
- }
-
-
-
-
- private void sendRequest(List<Address> targetMembers, long requestId, RequestOptions options) throws Exception {
- try {
- if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')'));
- if(corr != null) {
- corr.sendRequest(requestId, targetMembers, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this, options);
- }
- else {
- if(options.getAnycasting()) {
- for(Address mbr: targetMembers) {
- Message copy=request_msg.copy(true);
- copy.setDest(mbr);
- transport.send(copy);
- }
- }
- else {
- transport.send(request_msg);
- }
- }
- }
- catch(Exception ex) {
- if(corr != null)
- corr.done(requestId);
- throw ex;
- }
- }
-
-
- @GuardedBy("lock")
- protected boolean responsesComplete() {
- if(done)
- return true;
-
- final int num_total=responses.length;
-
- switch(options.getMode()) {
- case GET_FIRST:
- if(num_received > 0)
- return true;
- if(num_suspected >= num_total)
- // e.g. 2 members, and both suspected
- return true;
- break;
- case GET_ALL:
- return num_received + num_suspected >= num_total;
- case GET_MAJORITY:
- int majority=determineMajority(num_total);
- if(num_received + num_suspected >= majority)
- return true;
- break;
- case GET_ABS_MAJORITY:
- majority=determineMajority(num_total);
- if(num_received >= majority)
- return true;
- break;
- case GET_NONE:
- return true;
- default :
- if(log.isErrorEnabled()) log.error("rsp_mode " + options.getMode() + " unknown !");
- break;
- }
- return false;
- }
-
-
-
-}
View
2  src/org/jgroups/blocks/ReplCache.java
@@ -423,7 +423,7 @@ public V get(K key) {
// 3. Execute a cluster wide GET
try {
- RspList rsps=disp.callRemoteMethods(null,
+ RspList<Object> rsps=disp.callRemoteMethods(null,
new MethodCall(GET, key),
new RequestOptions(ResponseMode.GET_ALL, call_timeout));
for(Rsp rsp: rsps.values()) {
View
2  src/org/jgroups/blocks/Request.java
@@ -99,7 +99,7 @@ public boolean execute() throws Exception {
protected abstract void sendRequest() throws Exception;
- public abstract void receiveResponse(Object response_value, Address sender);
+ public abstract void receiveResponse(Object response_value, Address sender, boolean is_exception);
public abstract void viewChange(View new_view);
View
36 src/org/jgroups/blocks/RequestCorrelator.java
@@ -381,8 +381,10 @@ public boolean receiveMessage(Message msg) {
break;
case Header.RSP:
+ case Header.EXC_RSP:
RspCollector coll=requests.get(hdr.id);
if(coll != null) {
+ boolean is_exception=hdr.type == Header.EXC_RSP;
Address sender=msg.getSrc();
Object retval;
byte[] buf=msg.getBuffer();
@@ -394,8 +396,9 @@ public boolean receiveMessage(Message msg) {
catch(Exception e) {
log.error("failed unmarshalling buffer into return value", e);
retval=e;
+ is_exception=true;
}
- coll.receiveResponse(retval, sender);
+ coll.receiveResponse(retval, sender, is_exception);
}
break;
@@ -452,6 +455,7 @@ protected void handleRequest(Message req, Header hdr) {
Object rsp_buf; // either byte[] or Buffer
Header rsp_hdr;
Message rsp;
+ boolean threw_exception=false;
// i. Get the request correlator header from the msg and pass it to
// the registered handler
@@ -469,7 +473,8 @@ protected void handleRequest(Message req, Header hdr) {
retval=request_handler.handle(req);
}
catch(Throwable t) {
- if(log.isErrorEnabled()) log.error("error invoking method", t);
+ // if(log.isErrorEnabled()) log.error("error invoking method", t);
+ threw_exception=true;
retval=t;
}
@@ -488,6 +493,7 @@ protected void handleRequest(Message req, Header hdr) {
catch(Throwable t) {
try { // this call should succeed (all exceptions are serializable)
rsp_buf=marshaller != null? marshaller.objectToBuffer(t) : Util.objectToByteBuffer(t);
+ threw_exception=true;
}
catch(Throwable tt) {
if(log.isErrorEnabled()) log.error("failed sending rsp: return value (" + retval + ") is not serializable");
@@ -510,7 +516,8 @@ protected void handleRequest(Message req, Header hdr) {
rsp.setBuffer((Buffer)rsp_buf);
else if (rsp_buf instanceof byte[])
rsp.setBuffer((byte[])rsp_buf);
- rsp_hdr=new Header(Header.RSP, hdr.id, false, this.id);
+
+ rsp_hdr=new Header(threw_exception? Header.EXC_RSP : Header.RSP, hdr.id, false, this.id);
rsp.putHeader(this.id, rsp_hdr);
if(log.isTraceEnabled())
log.trace(new StringBuilder("sending rsp for ").append(rsp_hdr.id).append(" to ").append(rsp.getDest()));
@@ -543,8 +550,9 @@ protected void prepareResponse(Message rsp) {
* The header for <tt>RequestCorrelator</tt> messages
*/
public static class Header extends org.jgroups.Header {
- public static final byte REQ = 0;
- public static final byte RSP = 1;
+ public static final byte REQ = 0;
+ public static final byte RSP = 1;
+ public static final byte EXC_RSP = 2; // exception
/** Type of header: request or reply */
public byte type;
@@ -579,12 +587,18 @@ public Header(byte type, long id, boolean rsp_expected, short corr_id) {
this.corrId = corr_id;
}
- /**
- */
public String toString() {
StringBuilder ret=new StringBuilder();
ret.append("id=" + corrId + ", type=");
- ret.append(type == REQ ? "REQ" : type == RSP ? "RSP" : "<unknown>");
+ switch(type) {
+ case REQ: ret.append("REQ");
+ break;
+ case RSP: ret.append("RSP");
+ break;
+ case EXC_RSP: ret.append("EXC_RSP");
+ break;
+ default: ret.append("<unknown>");
+ }
ret.append(", id=" + id);
ret.append(", rsp_expected=" + rsp_expected);
return ret.toString();
@@ -607,9 +621,9 @@ public void readFrom(DataInput in) throws IOException, IllegalAccessException, I
public int size() {
return Global.BYTE_SIZE // type
- + Global.LONG_SIZE // id
- + Global.BYTE_SIZE // rsp_expected
- + Global.SHORT_SIZE; // corrId
+ + Global.LONG_SIZE // id
+ + Global.BYTE_SIZE // rsp_expected
+ + Global.SHORT_SIZE; // corrId
}
}
View
2  src/org/jgroups/blocks/RequestHandler.java
@@ -6,5 +6,5 @@
public interface RequestHandler {
- Object handle(Message msg);
+ Object handle(Message msg) throws Throwable;
}
View
68 src/org/jgroups/blocks/RpcDispatcher.java
@@ -103,8 +103,8 @@ public void setMethodLookup(MethodLookup method_lookup) {
- public RspList callRemoteMethods(Collection<Address> dests, String method_name, Object[] args,
- Class[] types, RequestOptions options) {
+ public <T> RspList<T> callRemoteMethods(Collection<Address> dests, String method_name, Object[] args,
+ Class[] types, RequestOptions options) {
MethodCall method_call=new MethodCall(method_name, args, types);
return callRemoteMethods(dests, method_call, options);
}
@@ -119,7 +119,7 @@ public RspList callRemoteMethods(Collection<Address> dests, String method_name,
* @return RspList A list of return values and flags (suspected, not received) per member
* @since 2.9
*/
- public RspList callRemoteMethods(Collection<Address> dests, MethodCall method_call, RequestOptions options) {
+ public <T> RspList<T> callRemoteMethods(Collection<Address> dests, MethodCall method_call, RequestOptions options) {
if(dests != null && dests.isEmpty()) { // don't send if dest list is empty
if(log.isTraceEnabled())
log.trace(new StringBuilder("destination list of ").append(method_call.getName()).
@@ -153,19 +153,19 @@ public RspList callRemoteMethods(Collection<Address> dests, MethodCall method_ca
if(options.getScope() > 0)
msg.setScope(options.getScope());
- RspList retval=super.castMessage(dests, msg, options);
+ RspList<T> retval=super.castMessage(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
return retval;
}
- public NotifyingFuture<RspList> callRemoteMethodsWithFuture(Collection<Address> dests, MethodCall method_call, RequestOptions options) {
+ public <T> NotifyingFuture<RspList<T>> callRemoteMethodsWithFuture(Collection<Address> dests, MethodCall method_call, RequestOptions options) {
if(dests != null && dests.isEmpty()) { // don't send if dest list is empty
if(log.isTraceEnabled())
log.trace(new StringBuilder("destination list of ").append(method_call.getName()).
append("() is empty: no need to send message"));
- return new NullFuture<RspList>(RspList.EMPTY_RSP_LIST);
+ return new NullFuture<RspList<T>>(RspList.EMPTY_RSP_LIST);
}
if(log.isTraceEnabled())
@@ -193,20 +193,20 @@ public RspList callRemoteMethods(Collection<Address> dests, MethodCall method_ca
if(options.getScope() > 0)
msg.setScope(options.getScope());
- NotifyingFuture<RspList> retval=super.castMessageWithFuture(dests, msg, options);
+ NotifyingFuture<RspList<T>> retval=super.castMessageWithFuture(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
return retval;
}
- public Object callRemoteMethod(Address dest, String method_name, Object[] args,
+ public <T> T callRemoteMethod(Address dest, String method_name, Object[] args,
Class[] types, RequestOptions options) throws Throwable {
MethodCall method_call=new MethodCall(method_name, args, types);
- return callRemoteMethod(dest, method_call, options);
+ return (T)callRemoteMethod(dest, method_call, options);
}
- public Object callRemoteMethod(Address dest, MethodCall call, RequestOptions options) throws Throwable {
+ public <T> T callRemoteMethod(Address dest, MethodCall call, RequestOptions options) throws Throwable {
if(log.isTraceEnabled())
log.trace("dest=" + dest + ", method_call=" + call + ", options=" + options);
@@ -220,10 +220,8 @@ public Object callRemoteMethod(Address dest, MethodCall call, RequestOptions opt
if(options.getScope() > 0)
msg.setScope(options.getScope());
- Object retval=super.sendMessage(msg, options);
+ T retval=(T)super.sendMessage(msg, options);
if(log.isTraceEnabled()) log.trace("retval: " + retval);
- if(retval instanceof Throwable)
- throw (Throwable)retval;
return retval;
}
@@ -255,7 +253,7 @@ protected void correlatorStarted() {
* Message contains MethodCall. Execute it against *this* object and return result.
* Use MethodCall.invoke() to do this. Return result.
*/
- public Object handle(Message req) {
+ public Object handle(Message req) throws Throwable {
Object body;
MethodCall method_call;
@@ -269,43 +267,31 @@ public Object handle(Message req) {
return null;
}
- try {
- body=req_marshaller != null?
- req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
- : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error("exception marshalling object", e);
- return e;
- }
+ body=req_marshaller != null?
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength()) : req.getObject();
if(!(body instanceof MethodCall)) {
if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object");
// create an exception to represent this and return it
- return new IllegalArgumentException("message does not contain a MethodCall object") ;
+ throw new IllegalArgumentException("message does not contain a MethodCall object") ;
}
method_call=(MethodCall)body;
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
-
- if(method_call.getMode() == MethodCall.ID) {
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set");
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method found for " + method_call.getId());
- method_call.setMethod(m);
- }
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- return x;
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
+
+ if(method_call.getMode() == MethodCall.ID) {
+ if(method_lookup == null)
+ throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set");
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " + method_call.getId());
+ method_call.setMethod(m);
}
+
+ return method_call.invoke(server_obj);
}
/**
View
2  src/org/jgroups/blocks/RspCollector.java
@@ -7,7 +7,7 @@
public interface RspCollector {
- void receiveResponse(Object response_value, Address sender);
+ void receiveResponse(Object response_value, Address sender, boolean is_exception);
void suspect(Address mbr);
void viewChange(View new_view);
}
View
35 src/org/jgroups/blocks/UnicastRequest.java
@@ -25,25 +25,16 @@
- /**
- @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- (e.g. if a suspicion service is available; timeouts are not needed).
- */
public UnicastRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options) {
super(m, corr, null, options);
this.target=target;
- result=new Rsp(target);
+ result=new Rsp<T>(target);
}
-
- /**
- * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- * (e.g. if a suspicion service is available; timeouts are not needed).
- */
public UnicastRequest(Message m, Transport transport, Address target, RequestOptions options) {
super(m, null, transport, options);
this.target=target;
- result=new Rsp(target);
+ result=new Rsp<T>(target);
}
@@ -71,7 +62,7 @@ protected void sendRequest() throws Exception {
* Adds a response to the response table. When all responses have been received,
* <code>execute()</code> returns.
*/
- public void receiveResponse(Object response_value, Address sender) {
+ public void receiveResponse(Object response_value, Address sender, boolean is_exception) {
RspFilter rsp_filter=options.getRspFilter();
lock.lock();
@@ -80,11 +71,21 @@ public void receiveResponse(Object response_value, Address sender) {
return;
if(!result.wasReceived()) {
boolean responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender);
- result.setValue((T)response_value);
+ if(is_exception && response_value instanceof Throwable)
+ result.setException((Throwable)response_value);
+ else
+ result.setValue((T)response_value);
result.setReceived(responseReceived);
- if(log.isTraceEnabled())
- log.trace(new StringBuilder("received response for request ").append(req_id)
- .append(", sender=").append(sender).append(", val=").append(response_value));
+ if(log.isTraceEnabled()) {
+ StringBuilder sb=new StringBuilder("received response for request ");
+ sb.append(req_id).append(", sender=").append(sender);
+ if(is_exception && response_value instanceof Throwable)
+ sb.append(", exception=");
+ else
+ sb.append(", val=");
+ sb.append(response_value);
+ log.trace(sb.toString());
+ }
}
done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses();
if(done && corr != null)
@@ -155,7 +156,7 @@ public void viewChange(View new_view) {
/* -------------------- End of Interface RspCollector ----------------------------------- */
- public Rsp getResult() {
+ public Rsp<T> getResult() {
return result;
}
View
2  src/org/jgroups/blocks/mux/MuxMessageDispatcher.java
@@ -81,7 +81,7 @@ public void stop() {
}
@Override
- protected GroupRequest cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) {
+ protected <T> GroupRequest<T> cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) {
RspFilter filter=options.getRspFilter();
RequestOptions newOptions = new RequestOptions(options.getMode(), options.getTimeout(), options.getAnycasting(),
(filter != null) ? new NoMuxHandlerRspFilter(filter) : new NoMuxHandlerRspFilter(),
View
2  src/org/jgroups/blocks/mux/MuxRpcDispatcher.java
@@ -97,7 +97,7 @@ public void stop() {
}
@Override
- protected GroupRequest cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) {
+ protected <T> GroupRequest<T> cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) {
RspFilter filter = options.getRspFilter();
return super.cast(dests, msg, options.setRspFilter((filter != null) ? new NoMuxHandlerRspFilter(filter) : new NoMuxHandlerRspFilter()), blockForResults);
}
View
22 src/org/jgroups/demos/QuoteClient.java
@@ -3,7 +3,6 @@
import org.jgroups.*;
-import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.ResponseMode;
import org.jgroups.blocks.RpcDispatcher;
@@ -134,7 +133,6 @@ public void windowOpened(WindowEvent e) {
public void actionPerformed(ActionEvent e) {
String command=e.getActionCommand();
- RspList rsp_list;
try {
if(command.equals("Get")) {
@@ -144,16 +142,16 @@ public void actionPerformed(ActionEvent e) {
return;
}
showMsg("Looking up value for " + stock_name + ':');
- rsp_list=disp.callRemoteMethods(null, "getQuote", new Object[]{stock_name},
- new Class[]{String.class},
- new RequestOptions(ResponseMode.GET_ALL, 10000));
+ RspList<Object> quotes=disp.callRemoteMethods(null, "getQuote", new Object[]{stock_name},
+ new Class[]{String.class},
+ new RequestOptions(ResponseMode.GET_ALL, 10000));
Float val=null;
- for(Rsp rsp: rsp_list.values()) {
- Object obj=rsp.getValue();
- if(obj == null || obj instanceof Throwable)
+ for(Rsp<Object> rsp: quotes.values()) {
+ Object quote=rsp.getValue();
+ if(quote == null || quote instanceof Throwable)
continue;
- val=(Float)obj;
+ val=(Float)quote;
break;
}
@@ -186,9 +184,9 @@ public void actionPerformed(ActionEvent e) {
if(command.equals("All")) {
listbox.removeAll();
showMsg("Getting all stocks:");
- rsp_list=disp.callRemoteMethods(null, "getAllStocks",
- null, null,
- new RequestOptions(ResponseMode.GET_ALL, 5000));
+ RspList<Object> rsp_list=disp.callRemoteMethods(null, "getAllStocks",
+ null, null,
+ new RequestOptions(ResponseMode.GET_ALL, 5000));
System.out.println("rsp_list is " + rsp_list);
View
2  src/org/jgroups/demos/RelayDemoRpc.java
@@ -68,7 +68,7 @@ public void start(String props, String name) throws Exception {
continue;
}
- RspList rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000).setAnycasting(true));
+ RspList<Object> rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000).setAnycasting(true));
for(Rsp rsp: rsps.values())
System.out.println("<< " + rsp.getValue() + " from " + rsp.getSender());
}
View
56 src/org/jgroups/util/Rsp.java
@@ -5,20 +5,23 @@
/**
- * class that represents a response from a communication
+ * Class that represents a response from a communication
*/
public class Rsp<T> {
- /* flag that represents whether the response was received */
- boolean received;
+ /** Flag that represents whether the response was received */
+ protected boolean received;
- /* flag that represents whether the response was suspected */
- boolean suspected;
+ /** Flag that represents whether the sender of the response was suspected */
+ protected boolean suspected;
- /* The sender of this response */
- Address sender;
+ /** The sender of this response */
+ protected Address sender;
- /* the value from the response */
- T retval;
+ /** The value from the response */
+ protected T retval;
+
+ /** If there was an exception, this field will contain it */
+ protected Throwable exception;
public Rsp(Address sender) {
@@ -32,8 +35,12 @@ public Rsp(Address sender, boolean suspected) {
public Rsp(Address sender, T retval) {
this.sender=sender;
- this.retval=retval;
- received=true;
+ setValue(retval);
+ }
+
+ public Rsp(Address sender, Throwable t) {
+ this.sender=sender;
+ setException(t);
}
public boolean equals(Object obj) {
@@ -55,6 +62,22 @@ public T getValue() {
public void setValue(T val) {
this.retval=val;
+ received=true;
+ exception=null;
+ }
+
+ public boolean hasException() {
+ return exception != null;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public void setException(Throwable t) {
+ this.exception=t;
+ received=true;
+ retval=null;
}
public Address getSender() {
@@ -84,8 +107,15 @@ public boolean setSuspected(boolean suspected) {
}
public String toString() {
- return new StringBuilder("sender=").append(sender).append(", retval=").append(retval).append(", received=").
- append(received).append(", suspected=").append(suspected).toString();
+ StringBuilder sb=new StringBuilder();
+ sb.append("sender=").append(sender);
+ if(retval != null)
+ sb.append(", retval=").append(retval);
+ if(exception != null)
+ sb.append(", exception=").append(exception);
+ sb.append(", received=").
+ append(received).append(", suspected=").append(suspected);
+ return sb.toString();
}
}
View
78 src/org/jgroups/util/RspList.java
@@ -11,11 +11,9 @@
* Contains responses from all members. Marks faulty members.
* A RspList is a response list used in peer-to-peer protocols. This class is unsynchronized
*/
-public class RspList implements Map<Address,Rsp> {
+public class RspList<T extends Object> implements Map<Address,Rsp<T>> {
public static final RspList EMPTY_RSP_LIST=new RspList();
-
- /** Map<Address, Rsp> */
- final Map<Address,Rsp> rsps=new HashMap<Address,Rsp>();
+ final Map<Address,Rsp<T>> rsps=new HashMap<Address,Rsp<T>>();
public RspList() {
@@ -25,9 +23,9 @@ public RspList() {
/** Adds a list of responses
* @param responses Collection<Rsp>
*/
- public RspList(Collection<Rsp> responses) {
+ public RspList(Collection<Rsp<T>> responses) {
if(responses != null) {
- for(Rsp rsp: responses) {
+ for(Rsp<T> rsp: responses) {
rsps.put(rsp.getSender(), rsp);
}
}
@@ -51,7 +49,7 @@ public boolean containsValue(Object value) {
* @param key Address (key)
* @return Rsp
*/
- public Rsp get(Object key) {
+ public Rsp<T> get(Object key) {
return rsps.get(key);
}
@@ -60,20 +58,20 @@ public Rsp get(Object key) {
* @param key
* @return Object value
*/
- public Object getValue(Object key) {
- Rsp rsp=get(key);
+ public T getValue(Object key) {
+ Rsp<T> rsp=get(key);
return rsp != null? rsp.getValue() : null;
}
- public Rsp put(Address key, Rsp value) {
+ public Rsp<T> put(Address key, Rsp<T> value) {
return rsps.put(key, value);
}
- public Rsp remove(Object key) {
+ public Rsp<T> remove(Object key) {
return rsps.remove(key);
}
- public void putAll(Map<? extends Address, ? extends Rsp> m) {
+ public void putAll(Map<? extends Address, ? extends Rsp<T>> m) {
rsps.putAll(m);
}
@@ -85,18 +83,18 @@ public void clear() {
return rsps.keySet();
}
- public Collection<Rsp> values() {
+ public Collection<Rsp<T>> values() {
return rsps.values();
}
- public Set<Map.Entry<Address,Rsp>> entrySet() {
+ public Set<Map.Entry<Address,Rsp<T>>> entrySet() {
return rsps.entrySet();
}
- public void addRsp(Address sender, Object retval) {
- Rsp rsp=get(sender);
+ public void addRsp(Address sender, T retval) {
+ Rsp<T> rsp=get(sender);
if(rsp != null) {
rsp.sender=sender;
rsp.retval=retval;
@@ -104,19 +102,19 @@ public void addRsp(Address sender, Object retval) {
rsp.suspected=false;
return;
}
- rsps.put(sender, new Rsp(sender, retval));
+ rsps.put(sender, new Rsp<T>(sender, retval));
}
public void addNotReceived(Address sender) {
- Rsp rsp=get(sender);
+ Rsp<T> rsp=get(sender);
if(rsp == null)
- rsps.put(sender, new Rsp(sender));
+ rsps.put(sender, new Rsp<T>(sender));
}
public void addSuspect(Address sender) {
- Rsp rsp=get(sender);
+ Rsp<T> rsp=get(sender);
if(rsp != null) {
rsp.sender=sender;
rsp.retval=null;
@@ -124,19 +122,19 @@ public void addSuspect(Address sender) {
rsp.suspected=true;
return;
}
- rsps.put(sender, new Rsp(sender, true));
+ rsps.put(sender, new Rsp<T>(sender, true));
}
public boolean isReceived(Address sender) {
- Rsp rsp=get(sender);
+ Rsp<T> rsp=get(sender);
return rsp != null && rsp.received;
}
public int numSuspectedMembers() {
int num=0;
- Collection<Rsp> values=values();
- for(Rsp rsp: values) {
+ Collection<Rsp<T>> values=values();
+ for(Rsp<T> rsp: values) {
if(rsp.wasSuspected())
num++;
}
@@ -145,8 +143,8 @@ public int numSuspectedMembers() {
public int numReceived() {
int num=0;
- Collection<Rsp> values=values();
- for(Rsp rsp: values) {
+ Collection<Rsp<T>> values=values();
+ for(Rsp<T> rsp: values) {
if(rsp.wasReceived())
num++;
}
@@ -154,9 +152,9 @@ public int numReceived() {
}
/** Returns the first value in the response set. This is random, but we try to return a non-null value first */
- public Object getFirst() {
- Collection<Rsp> values=values();
- for(Rsp rsp: values) {
+ public T getFirst() {
+ Collection<Rsp<T>> values=values();
+ for(Rsp<T> rsp: values) {
if(rsp.getValue() != null)
return rsp.getValue();
}
@@ -167,30 +165,30 @@ public Object getFirst() {
/**
* Returns the results from non-suspected members that are not null.
*/
- public Vector<Object> getResults() {
- Vector<Object> ret=new Vector<Object>();
- Object val;
+ public List<T> getResults() {
+ List<T> ret=new ArrayList<T>(size());
- for(Rsp rsp: values()) {
+ T val;
+ for(Rsp<T> rsp: values()) {
if(rsp.wasReceived() && (val=rsp.getValue()) != null)
- ret.addElement(val);
+ ret.add(val);
}
return ret;
}
- public Vector<Address> getSuspectedMembers() {
- Vector<Address> retval=new Vector<Address>();
- for(Rsp rsp: values()) {
+ public List<Address> getSuspectedMembers() {
+ List<Address> retval=new ArrayList<Address>();
+ for(Rsp<T> rsp: values()) {
if(rsp.wasSuspected())
- retval.addElement(rsp.getSender());
+ retval.add(rsp.getSender());
}
return retval;
}
public boolean isSuspected(Address sender) {
- Rsp rsp=get(sender);
+ Rsp<T> rsp=get(sender);
return rsp != null && rsp.suspected;
}
@@ -203,7 +201,7 @@ public int size() {
public String toString() {
StringBuilder ret=new StringBuilder();
- for(Rsp rsp: values()) {
+ for(Rsp<T> rsp: values()) {
ret.append("[" + rsp + "]\n");
}
return ret.toString();
View
4 tests/junit-functional/org/jgroups/blocks/GroupRequestTest.java
@@ -286,7 +286,7 @@ void sendResponses() {
catch(Exception e) {
e.printStackTrace();
}
- request.receiveResponse(retval, sender);
+ request.receiveResponse(retval, sender, false);
}
else if(obj instanceof View)
request.viewChange((View)obj);
@@ -341,7 +341,7 @@ void sendResponses() {
} catch (Exception e) {
e.printStackTrace();
}
- request.receiveResponse(retval, sender);
+ request.receiveResponse(retval, sender, false);
}
else if (obj instanceof View)
request.viewChange((View) obj);
View
2  tests/junit-functional/org/jgroups/tests/RspListTest.java
@@ -164,7 +164,7 @@ public void testGetFirst() {
public void testGetResults() {
- Vector v=rl.getResults();
+ List v=rl.getResults();
assert v != null;
Assert.assertEquals(2, v.size());
}
View
4 tests/junit/org/jgroups/blocks/MuxMessageDispatcherTest.java
@@ -69,7 +69,7 @@ public void testCastMessage() throws Exception {
Message message = new Message();
// Validate normal dispatchers
- Map<Address, Rsp> responses = dispatchers[0].castMessage(null, message, RequestOptions.SYNC());
+ Map<Address, Rsp<Object>> responses = dispatchers[0].castMessage(null, message, RequestOptions.SYNC());
Assert.assertEquals(responses.size(), 2);
@@ -191,7 +191,7 @@ public void testSendMessage() throws Throwable {
// Assert.assertEquals(response, "muxDispatcher[1][0]");
}
- private static void verifyResponse(Map<Address, Rsp> responses, Channel channel, Object expected) {
+ private static void verifyResponse(Map<Address, Rsp<Object>> responses, Channel channel, Object expected) {
Rsp<?> response = responses.get(channel.getAddress());
String address = channel.getAddress().toString();
Assert.assertNotNull(response, address);
View
4 tests/junit/org/jgroups/blocks/MuxRpcDispatcherTest.java
@@ -67,7 +67,7 @@ public void testMulticastRPCs() throws Exception {
MethodCall method = new MethodCall("getName", new Object[0], new Class[0]);
// Validate normal dispatchers
- Map<Address, Rsp> responses = dispatchers[0].callRemoteMethods(null, method, RequestOptions.SYNC());
+ Map<Address, Rsp<String>> responses = dispatchers[0].callRemoteMethods(null, method, RequestOptions.SYNC());
Assert.assertEquals(responses.size(), 2);
@@ -190,7 +190,7 @@ public void testUnicastRPCs() throws Throwable {
// Assert.assertEquals(response, "muxDispatcher[1][0]");
}
- private static void verifyResponse(Map<Address,Rsp> responses, Channel channel, Object expected) {
+ private static void verifyResponse(Map<Address,Rsp<String>> responses, Channel channel, Object expected) {
Rsp<?> response = responses.get(channel.getAddress());
String address = channel.getAddress().toString();
Assert.assertNotNull(response, address);
View
8 tests/junit/org/jgroups/blocks/RpcDispatcherSerializationTest.java
@@ -64,13 +64,13 @@ public void testNonSerializableArgument() throws Throwable {
public void testTargetMethodNotFound() {
List<Address> members=channel.getView().getMembers();
System.out.println("members are: " + members);
- RspList rsps=disp.callRemoteMethods(members, "foo", null, new Class[]{String.class, String.class},
+ RspList<Object> rsps=disp.callRemoteMethods(members, "foo", null, new Class[]{String.class, String.class},
new RequestOptions(ResponseMode.GET_ALL, 8000));
System.out.println("responses:\n" + rsps + ", channel.view: " + channel.getView() + ", channel2.view: " + channel2.getView());
assert members.size() == rsps.size() : "expected " + members.size() + " responses, but got " + rsps + " (" + rsps.size() + ")";
for(Rsp rsp: rsps.values()) {
- assert rsp.getValue() instanceof NoSuchMethodException : "response value is " + rsp.getValue();
+ assert rsp.getException() instanceof NoSuchMethodException : "exception is " + rsp.getException();
}
}
@@ -108,8 +108,8 @@ public void testMarshaller() {
assertEquals(2, rsps.size());
for(Iterator<Rsp> it=rsps.values().iterator(); it.hasNext();) {
Rsp rsp=it.next();
- assertNotNull(rsp.getValue());
- assertTrue(rsp.getValue() instanceof Throwable);
+ assertNull(rsp.getValue());
+ assertNotNull(rsp.getException());
assertTrue(rsp.wasReceived());
assertFalse(rsp.wasSuspected());
}
View
120 tests/junit/org/jgroups/blocks/RpcDispatcherTest.java
@@ -49,8 +49,6 @@
// specify return values sizes which should work correctly with 64Mb heap
final static int[] SIZES={10000, 20000, 40000, 80000, 100000, 200000, 400000, 800000,
1000000, 2000000, 5000000};
- // specify return value sizes which may generate timeouts or OOMEs with 64Mb heap
- final static int[] HUGESIZES={10000000, 20000000};
@BeforeMethod
protected void setUp() throws Exception {
@@ -87,7 +85,7 @@ public void testEmptyConstructor() throws Exception {
RpcDispatcher d1=new RpcDispatcher(), d2=new RpcDispatcher();
JChannel channel1=null, channel2=null;
- final String GROUP=getUniqueClusterName("RpcDispatcherTest");
+ final String GROUP="RpcDispatcherTest";
try {
channel1=createChannel(true, 2);
channel2=createChannel(channel1);
@@ -109,10 +107,10 @@ public void testEmptyConstructor() throws Exception {
System.out.println("view channel 1= " + view);
assert view.size() == 2;
- RspList rsps=d1.callRemoteMethods(null, "foo", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ RspList<Integer> rsps=d1.callRemoteMethods(null, "foo", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
System.out.println("rsps:\n" + rsps);
assert rsps.size() == 2;
- for(Rsp rsp: rsps.values()) {
+ for(Rsp<Integer> rsp: rsps.values()) {
assert rsp.wasReceived();
assert !rsp.wasSuspected();
assert rsp.getValue() != null;
@@ -144,6 +142,45 @@ public long foobar() {
}
+ public void testException() {
+ RspList<Object> rsps=disp1.callRemoteMethods(null, "throwException", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ for(Rsp<Object> rsp: rsps.values()) {
+ System.out.println(rsp);
+ }
+ for(Rsp<Object> rsp: rsps.values()) {
+ assert rsp.getException() != null && rsp.getValue() == null;
+ }
+ }
+
+
+ public void testExceptionAsReturnValue() {
+ RspList<Object> rsps=disp1.callRemoteMethods(null, "returnException", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ for(Rsp<Object> rsp: rsps.values()) {
+ System.out.println(rsp);
+ }
+ for(Rsp<Object> rsp: rsps.values()) {
+ assert rsp.getException() == null && rsp.getValue() != null && rsp.getValue() instanceof Throwable;
+ }
+ }
+
+
+ public void testUnicastException() {
+ try {
+ disp1.callRemoteMethod(c2.getAddress(), "throwException", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ }
+ catch(Throwable throwable) {
+ System.out.println("received exception (as expected)");
+ }
+ }
+
+
+ public void testUnicastExceptionAsReturnValue() throws Throwable {
+ Object rsp=disp1.callRemoteMethod(c2.getAddress(), "returnException", null, null, new RequestOptions(ResponseMode.GET_ALL, 5000));
+ System.out.println("rsp = " + rsp);
+ assert rsp != null && rsp instanceof Throwable;
+ }
+
+
/**
* Test the response filter mechanism which can be used to filter responses received with
* a call to RpcDispatcher.
@@ -183,8 +220,8 @@ public boolean needMoreResponses() {
public void testFuture() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{1000L}, new Class[]{long.class});
- Future<RspList> future;
- future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L, false, null));
+ //Future<RspList> future;
+ Future<RspList<Object>> future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L, false, null));
assert !future.isDone();
assert !future.isCancelled();
try {
@@ -207,16 +244,15 @@ public void testFuture() throws Exception {
public void testNotifyingFuture() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{1000L}, new Class[]{long.class});
- NotifyingFuture<RspList> future;
- MyFutureListener<RspList> listener=new MyFutureListener<RspList>();
- future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L, false, null));
+ MyFutureListener<RspList<Long>> listener=new MyFutureListener<RspList<Long>>();
+ NotifyingFuture<RspList<Long>> future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L, false, null));
future.setListener(listener);
assert !future.isDone();
assert !future.isCancelled();
assert !listener.isDone();
Util.sleep(2000);
assert listener.isDone();
- RspList result=future.get(1L, TimeUnit.MILLISECONDS);
+ RspList<Long> result=future.get(1L, TimeUnit.MILLISECONDS);
System.out.println("result:\n" + result);
assert result != null;
assert result.size() == 3;
@@ -225,8 +261,8 @@ public void testNotifyingFuture() throws Exception {
public void testNotifyingFutureWithDelayedListener() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{1000L}, new Class[]{long.class});
- NotifyingFuture<RspList> future;
- MyFutureListener<RspList> listener=new MyFutureListener<RspList>();
+ NotifyingFuture<RspList<Long>> future;
+ MyFutureListener<RspList<Long>> listener=new MyFutureListener<RspList<Long>>();
future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L, false, null));
assert !future.isDone();
assert !future.isCancelled();
@@ -244,19 +280,19 @@ public void testNotifyingFutureWithDelayedListener() throws Exception {
public void testMultipleFutures() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{100L}, new Class[]{long.class});
- List<Future<RspList>> futures=new ArrayList<Future<RspList>>();
+ List<Future<RspList<Long>>> futures=new ArrayList<Future<RspList<Long>>>();
long target=System.currentTimeMillis() + 30000L;
- Future<RspList> future;
+ Future<RspList<Long>> future;
RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 30000L, false, null);
for(int i=0; i < 10; i++) {
future=disp1.callRemoteMethodsWithFuture(null, sleep, options);
futures.add(future);
}
- List<Future<RspList>> rsps=new ArrayList<Future<RspList>>();
+ List<Future<RspList<Long>>> rsps=new ArrayList<Future<RspList<Long>>>();
while(!futures.isEmpty() && System.currentTimeMillis() < target) {
- for(Iterator<Future<RspList>> it=futures.iterator(); it.hasNext();) {
+ for(Iterator<Future<RspList<Long>>> it=futures.iterator(); it.hasNext();) {
future=it.next();
if(future.isDone()) {
it.remove();
@@ -267,25 +303,27 @@ public void testMultipleFutures() throws Exception {
Util.sleep(200);
}
System.out.println("\n" + rsps.size() + " responses:\n");
- for(Future<RspList> tmp: rsps) {
+ for(Future<RspList<Long>> tmp: rsps) {
System.out.println(tmp);
}
}
public void testMultipleNotifyingFutures() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{100L}, new Class[]{long.class});
- List<MyFutureListener> listeners=new ArrayList<MyFutureListener>();
+ List<MyFutureListener<RspList<Long>>> listeners=new ArrayList<MyFutureListener<RspList<Long>>>();
RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 30000L, false, null);
for(int i=0; i < 10; i++) {
- MyFutureListener<RspList> listener=new MyFutureListener<RspList>();
+ MyFutureListener<RspList<Long>> listener=new MyFutureListener<RspList<Long>>();
listeners.add(listener);
- disp1.callRemoteMethodsWithFuture(null, sleep, options).setListener(listener);
+ // NotifyingFuture<RspList<Long>> futures=disp1.callRemoteMethodsWithFuture(null, sleep, options);
+ NotifyingFuture<RspList<Long>> futures=disp1.callRemoteMethodsWithFuture(null, sleep, options);
+ futures.setListener(listener);
}
Util.sleep(1000);
for(int i=0; i < 10; i++) {
boolean all_done=true;
- for(MyFutureListener listener: listeners) {
+ for(MyFutureListener<RspList<Long>> listener: listeners) {
boolean done=listener.isDone();
System.out.print(done? "+ " : "- ");
if(!listener.isDone())
@@ -308,8 +346,7 @@ public void testMultipleNotifyingFutures() throws Exception {
public void testFutureCancel() throws Exception {
MethodCall sleep=new MethodCall("sleep", new Object[]{1000L}, new Class[]{long.class});
- Future<RspList> future;
- future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L));
+ NotifyingFuture<RspList<Long>> future=disp1.callRemoteMethodsWithFuture(null, sleep, new RequestOptions(ResponseMode.GET_ALL, 5000L));
assert !future.isDone();
assert !future.isCancelled();
future.cancel(true);
@@ -385,11 +422,11 @@ public void testMethodInvocationToNonExistingMembers() {
// make an RPC call using C's now outdated view of membership
System.out.println("calling method foo() in " + members + " (view=" + c2.getView() + ")");
- RspList rsps=disp1.callRemoteMethods(members, "foo", null, null, new RequestOptions(ResponseMode.GET_ALL, timeout));
+ RspList<Object> rsps=disp1.callRemoteMethods(members, "foo", null, null, new RequestOptions(ResponseMode.GET_ALL, timeout));
// all responses
System.out.println("responses:\n" + rsps);
- for(Map.Entry<Address,Rsp> entry: rsps.entrySet()) {
+ for(Map.Entry<Address,Rsp<Object>> entry: rsps.entrySet()) {
Rsp rsp=entry.getValue();
assertTrue("response from " + entry.getKey() + " was not received", rsp.wasReceived());
assertFalse(rsp.wasSuspected());
@@ -446,13 +483,13 @@ void _testLargeValue(int size) {
final long timeout = 20 * 1000 ;
System.out.println("\ntesting with " + size + " bytes");
- RspList rsps=disp1.callRemoteMethods(null, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
+ RspList<Object> rsps=disp1.callRemoteMethods(null, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
new RequestOptions(ResponseMode.GET_ALL, timeout));
System.out.println("rsps:");
assert rsps.size() == 3 : "there should be three responses to the RPC call but only " + rsps.size() +
" were received: " + rsps;
- for(Map.Entry<Address,Rsp> entry: rsps.entrySet()) {
+ for(Map.Entry<Address,Rsp<Object>> entry: rsps.entrySet()) {
// its possible that an exception was raised in processing
Object obj = entry.getValue().getValue() ;
@@ -482,8 +519,8 @@ void _testHugeValue(int size) {
final long timeout = 20 * 1000 ;
System.out.println("\ntesting with " + size + " bytes");
- RspList rsps=disp1.callRemoteMethods(null, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
- new RequestOptions(ResponseMode.GET_ALL, timeout));
+ RspList<Object> rsps=disp1.callRemoteMethods(null, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
+ new RequestOptions(ResponseMode.GET_ALL, timeout));
System.out.println("rsps:");
assert rsps != null;
assert rsps.size() == 3 : "there should be three responses to the RPC call but only " + rsps.size() +
@@ -491,7 +528,7 @@ void _testHugeValue(int size) {
// in checking the return values, we need to take account of timeouts (i.e. when
// a null value is returned) and exceptions
- for(Map.Entry<Address,Rsp> entry: rsps.entrySet()) {
+ for(Map.Entry<Address,Rsp<Object>> entry: rsps.entrySet()) {
Object obj = entry.getValue().getValue() ;
@@ -535,16 +572,9 @@ void _testLargeValueUnicastCall(Address dst, int size) throws Throwable {
System.out.println("\ntesting unicast call with " + size + " bytes");
assertNotNull(dst);
-
- Object retval=disp1.callRemoteMethod(dst, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
- new RequestOptions(ResponseMode.GET_ALL, timeout));
- // it's possible that an exception was raised
- if (retval instanceof java.lang.Throwable) {
- throw (Throwable)retval;
- }
-
- byte[] val=(byte[])retval;
+ byte[] val=disp1.callRemoteMethod(dst, "largeReturnValue", new Object[]{size}, new Class[]{int.class},
+ new RequestOptions(ResponseMode.GET_ALL, timeout));
// check value is not null, otherwise fail the test
assertNotNull("return value should be non-null", val);
@@ -574,10 +604,20 @@ public static long sleep(long timeout) {
// System.out.println("sleep()");
long start=System.currentTimeMillis();
Util.sleep(timeout);
+ //throw new NullPointerException("boom");
return System.currentTimeMillis() - start;
}
+ public static void throwException() throws Exception {
+ throw new Exception("booom");
+ }
+
+ public static Exception returnException() {
+ return new Exception("booom");
+ }
+
+
public static byte[] largeReturnValue(int size) {
return new byte[size];
}
View
2  tests/junit/org/jgroups/tests/ChannelTestBase.java
@@ -150,7 +150,7 @@ protected final static void assertNull(String message, Object val) {
}
protected final static void assertNull(Object val) {
- Util.assertNotNull(null, val);
+ Util.assertNull(null, val);
}
/**
View
22 tests/junit/org/jgroups/tests/Deadlock2Test.java
@@ -6,14 +6,17 @@
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
-import org.jgroups.blocks.*;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import java.util.Enumeration;
+import java.util.List;
import java.util.Vector;
/**
@@ -163,16 +166,17 @@ public String outerMethod() {
// RspList rspList = disp.callRemoteMethods(null, call, GroupResponseMode.GET_ALL, 5000);
RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, 0, false, null, (byte)0);
opts.setFlags(Message.OOB);
- RspList rspList = disp.callRemoteMethods(null, call, opts);
- Vector results = rspList.getResults();
+ RspList<String> rspList = disp.callRemoteMethods(null, call, opts);
+ List<String> results = rspList.getResults();
log("results of calling innerMethod():\n" + rspList);
StringBuilder sb=new StringBuilder("outerMethod[");
- for(Enumeration e = results.elements(); e.hasMoreElements(); ) {
- String s = (String)e.nextElement();
- sb.append(s);
- if (e.hasMoreElements()) {
+ boolean first=true;
+ for(String s: results) {
+ if(first)
+ first=false;
+ else
sb.append(";");
- }
+ sb.append(s);
}
sb.append("]");
return sb.toString();
View
4 tests/other/org/jgroups/tests/UnicastTestRpcDist.java
@@ -330,13 +330,13 @@ void startBenchmark() throws Throwable {
options.setFlags(Message.OOB);
options.setFlags(Message.DONT_BUNDLE);
options.setFlags(Message.NO_FC);
- RspList responses=disp.callRemoteMethods(null, new MethodCall(START), options);
+ RspList<Object> responses=disp.callRemoteMethods(null, new MethodCall(START), options);
long total_reqs=0;
long total_time=0;
System.out.println("\n======================= Results: ===========================");
- for(Map.Entry<Address,Rsp> entry: responses.entrySet()) {
+ for(Map.Entry<Address,Rsp<Object>> entry: responses.entrySet()) {
Address mbr=entry.getKey();
Rsp rsp=entry.getValue();
Results result=(Results)rsp.getValue();
Please sign in to comment.
Something went wrong with that request. Please try again.