Permalink
Browse files

changes to MessageDispatcher and RpcDispatcher to throw Exceptions ra…

…ther than Throwable (or nothing)
  • Loading branch information...
Bela Ban
Bela Ban committed Jul 22, 2011
1 parent 271c5a7 commit f44a32d635200aa1008952ec91066f7b13511cbf
Showing with 349 additions and 441 deletions.
  1. +9 −0 doc/API_Changes.txt
  2. +1 −1 src/org/jgroups/JChannel.java
  3. +70 −82 src/org/jgroups/blocks/MessageDispatcher.java
  4. +21 −32 src/org/jgroups/blocks/MethodCall.java
  5. +1 −1 src/org/jgroups/blocks/RequestHandler.java
  6. +68 −40 src/org/jgroups/blocks/RpcDispatcher.java
  7. +14 −2 src/org/jgroups/blocks/UnicastRequest.java
  8. +2 −1 src/org/jgroups/blocks/mux/MuxMessageDispatcher.java
  9. +1 −1 src/org/jgroups/blocks/mux/MuxRpcDispatcher.java
  10. +74 −225 tests/junit-functional/org/jgroups/tests/MethodCallTest.java
  11. +2 −2 tests/junit/org/jgroups/blocks/MuxMessageDispatcherTest.java
  12. +3 −1 tests/junit/org/jgroups/blocks/MuxRpcDispatcherTest.java
  13. +1 −1 tests/junit/org/jgroups/blocks/RpcDispatcherAnycastServerObject.java
  14. +1 −1 tests/junit/org/jgroups/blocks/RpcDispatcherAnycastTest.java
  15. +11 −5 tests/junit/org/jgroups/blocks/RpcDispatcherExceptionTest.java
  16. +2 −2 tests/junit/org/jgroups/blocks/RpcDispatcherInterruptTest.java
  17. +5 −10 tests/junit/org/jgroups/blocks/RpcDispatcherSerializationTest.java
  18. +31 −11 tests/junit/org/jgroups/blocks/RpcDispatcherTest.java
  19. +5 −5 tests/junit/org/jgroups/blocks/RpcDispatcherUnicastMethodExceptionTest.java
  20. +5 −5 tests/junit/org/jgroups/blocks/RpcDispatcherUnitTest.java
  21. +1 −1 tests/junit/org/jgroups/tests/ChannelConcurrencyTest.java
  22. +1 −1 tests/junit/org/jgroups/tests/Deadlock2Test.java
  23. +6 −7 tests/junit/org/jgroups/tests/MessageDispatcherUnitTest.java
  24. +1 −1 tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java
  25. +7 −2 tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java
  26. +6 −1 tests/stress/org/jgroups/tests/UnicastStressTest.java
View
@@ -38,6 +38,8 @@ API changes in 3.0.0
- RpcDispatcher: merged Marshaller and Marshaller2 into Marshaller interface
+- MessageDispatcher: handle(Message) now throws Exception rather than Throwable
+
- Removed partial state transfer (nobody ever used it !)
- Removed ExtendedXXXAdapters: there is only Receiver, ReceiverAdapter, MessageListener and MembershipListener
@@ -94,6 +96,13 @@ API changes in 3.0.0
// "foo" threw an exception
}
+- In MessageDispatcher, sendMessage(), sendMessageWithFuture(), castMessage() and castMessageWithFuture() now
+ throw an exception (used to be a Throwable, or nothing)
+
+- In RpcDispatcher, callRemoteMethod(), callRemoteMethodWithFuture(), callRemoteMethods() and callRemoteMethodsWithFuture()
+ methods now throw an exception
+- Ditto for MuxRpcDispatcher, MuxMessageDispatcher.
+
- ChannelNotConnectedException and ChannelClosedException have been replaced by IllegalStateException. This way,
new exceptions can be thrown (and documented in the javadoc) without having to change the signature in the future.
These changes affect mainly JChannel
@@ -1104,7 +1104,7 @@ else if(key instanceof DatagramSocket) {
* @param map
* @param operation Protocol.OperationName[args], e.g. STABLE.foo[arg1 arg2 arg3]
*/
- private void handleOperation(Map<String, String> map, String operation) throws Throwable {
+ private void handleOperation(Map<String, String> map, String operation) throws Exception {
int index=operation.indexOf(".");
if(index == -1)
throw new IllegalArgumentException("operation " + operation + " is missing the protocol name");
@@ -159,15 +159,10 @@ public void stop() {
}
}
-
public final void setMessageListener(MessageListener l) {
msg_listener=l;
}
- /**
- * Gives access to the currently configured MessageListener. Returns null if there is no
- * configured MessageListener.
- */
public MessageListener getMessageListener() {
return msg_listener;
}
@@ -180,10 +175,6 @@ public final void setRequestHandler(RequestHandler rh) {
req_handler=rh;
}
- /**
- * Offers access to the underlying Channel.
- * @return a reference to the underlying Channel.
- */
public Channel getChannel() {
return channel;
}
@@ -246,29 +237,44 @@ else if (canReplace) {
/**
* Sends a message to the members listed in dests. If dests is null, the message is sent to all current group
* members.
- * @param dests A list of group members. The message is sent to all members of the current group if null
+ * @param dests A list of group members to send the message to. The message is sent to all members of the current
+ * group if null
* @param msg The message to be sent
* @param options A set of options that govern the call. See {@link org.jgroups.blocks.RequestOptions} for details
- * @return
+ * @return RspList A list of Rsp elements
+ * @throws Exception If the request cannot be sent
* @since 2.9
*/
- public <T> RspList<T> castMessage(final Collection<Address> dests, Message msg, RequestOptions options) {
+ public <T> RspList<T> castMessage(final Collection<Address> dests,
+ Message msg, RequestOptions options) throws Exception {
GroupRequest<T> req=cast(dests, msg, options, true);
return req != null? req.getResults() : RspList.EMPTY_RSP_LIST;
}
- public <T> NotifyingFuture<RspList<T>> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) {
+ /**
+ * Sends a message to the members listed in dests. If dests is null, the message is sent to all current group
+ * members.
+ * @param dests A list of group members to send the message to. The message is sent to all members of the current
+ * group if null
+ * @param msg The message to be sent
+ * @param options A set of options that govern the call. See {@link org.jgroups.blocks.RequestOptions} for details
+ * @return NotifyingFuture<T> A future from which the results (RspList) can be retrieved
+ * @throws Exception If the request cannot be sent
+ */
+ public <T> NotifyingFuture<RspList<T>> castMessageWithFuture(final Collection<Address> dests,
+ Message msg,
+ RequestOptions options) throws Exception {
GroupRequest<T> req=cast(dests, msg, options, false);
return req != null? req : new NullFuture<RspList>(RspList.EMPTY_RSP_LIST);
}
- protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg, RequestOptions options, boolean block_for_results) {
+ protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg,
+ RequestOptions options,
+ boolean block_for_results) throws Exception {
List<Address> real_dests;
// we need to clone because we don't want to modify the original
- // (we remove ourselves if LOCAL is false, see below) !
- // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null);
if(dests != null) {
real_dests=new ArrayList<Address>(dests);
real_dests.retainAll(this.members);
@@ -282,7 +288,6 @@ else if (canReplace) {
// if local delivery is off, then we should not wait for the message from the local member.
// therefore remove it from the membership
Channel tmp=channel;
-
if(tmp != null && tmp.getDiscardOwnMessages()) {
if(local_addr == null)
local_addr=tmp.getAddress();
@@ -314,14 +319,8 @@ else if (canReplace) {
msg.setScope(options.getScope());
}
req.setBlockForResults(block_for_results);
-
- try {
- req.execute();
- return req;
- }
- catch(Exception ex) {
- throw new RuntimeException("failed executing request " + req, ex);
- }
+ req.execute();
+ return req;
}
@@ -330,8 +329,16 @@ public void done(long req_id) {
}
-
- public <T> T sendMessage(Message msg, RequestOptions opts) throws Throwable {
+ /**
+ * Sends a unicast message and - depending on the options - returns a result
+ * @param msg the message to be sent. The destination needs to be non-null
+ * @param opts the options to be used
+ * @return T the result
+ * @throws Exception If there was problem sending the request, processing it at the receiver, or processing
+ * it at the sender.
+ * @throws TimeoutException If the call didn't succeed within the timeout defined in options (if set)
+ */
+ public <T> T sendMessage(Message msg, RequestOptions opts) throws Exception {
Address dest=msg.getDest();
if(dest == null) {
if(log.isErrorEnabled())
@@ -346,12 +353,7 @@ public void done(long req_id) {
}
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, opts);
- try {
- req.execute();
- }
- catch(Exception t) {
- throw new RuntimeException("failed executing request " + req, t);
- }
+ req.execute();
if(opts != null && opts.getMode() == ResponseMode.GET_NONE)
return null;
@@ -360,16 +362,30 @@ public void done(long req_id) {
if(rsp.wasSuspected())
throw new SuspectedException(dest);
- if(rsp.getException() != null)
- throw rsp.getException();
+ Throwable exception=rsp.getException();
+ if(exception != null) {
+ if(exception instanceof Error) throw (Error)exception;
+ else if(exception instanceof RuntimeException) throw (RuntimeException)exception;
+ else if(exception instanceof Exception) throw (Exception)exception;
+ else throw new RuntimeException(exception);
+ }
if(!rsp.wasReceived())
throw new TimeoutException("timeout sending message to " + dest);
return rsp.getValue();
}
- public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions options) throws TimeoutException, SuspectedException {
+ /**
+ * Sends a unicast message to the target defined by msg.getDest() and returns a future
+ * @param msg The unicast message to be sent. msg.getDest() must not be null
+ * @param options
+ * @return NotifyingFuture<T> A future from which the result can be fetched
+ * @throws Exception If there was problem sending the request, processing it at the receiver, or processing
+ * it at the sender. {@link java.util.concurrent.Future#get()} will throw this exception
+ * @throws TimeoutException If the call didn't succeed within the timeout defined in options (if set)
+ */
+ public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions options) throws Exception {
Address dest=msg.getDest();
if(dest == null) {
if(log.isErrorEnabled())
@@ -385,27 +401,19 @@ public void done(long req_id) {
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, options);
req.setBlockForResults(false);
- try {
- req.execute();
- if(options != null && options.getMode() == ResponseMode.GET_NONE)
- return new NullFuture<T>(null);
- return req;
- }
- catch(Exception t) {
- throw new RuntimeException("failed executing request " + req, t);
- }
+ req.execute();
+ if(options != null && options.getMode() == ResponseMode.GET_NONE)
+ return new NullFuture<T>(null);
+ return req;
}
/* ------------------------ RequestHandler Interface ---------------------- */
- public Object handle(Message msg) throws Throwable {
- if(req_handler != null) {
+ public Object handle(Message msg) throws Exception {
+ if(req_handler != null)
return req_handler.handle(msg);
- }
- else {
- return null;
- }
+ return null;
}
/* -------------------- End of RequestHandler Interface ------------------- */
@@ -466,7 +474,7 @@ public String getName() {
- private Object handleUpEvent(Event evt) throws Throwable {
+ protected Object handleUpEvent(Event evt) throws Exception {
switch(evt.getType()) {
case Event.MSG:
if(msg_listener != null) {
@@ -477,34 +485,25 @@ private Object handleUpEvent(Event evt) throws Throwable {
case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
byte[] tmp_state=null;
if(msg_listener != null) {
- try {
- ByteArrayOutputStream output=new ByteArrayOutputStream(1024);
- msg_listener.getState(output);
- tmp_state=output.toByteArray();
- }
- catch(Throwable t) {
- throw new Exception("failed getting state from message listener (" + msg_listener + ')', t);
- }
+ ByteArrayOutputStream output=new ByteArrayOutputStream(1024);
+ msg_listener.getState(output);
+ tmp_state=output.toByteArray();
}
return new StateTransferInfo(null, 0L, tmp_state);
case Event.GET_STATE_OK:
if(msg_listener != null) {
- try {
- StateTransferResult result=(StateTransferResult)evt.getArg();
- ByteArrayInputStream input=new ByteArrayInputStream(result.getBuffer());
- msg_listener.setState(input);
- }
- catch(Throwable t) {
- throw new RuntimeException("failed calling setState() in state requester", t);
- }
+ StateTransferResult result=(StateTransferResult)evt.getArg();
+ ByteArrayInputStream input=new ByteArrayInputStream(result.getBuffer());
+ msg_listener.setState(input);
}
break;
case Event.STATE_TRANSFER_OUTPUTSTREAM:
OutputStream os=(OutputStream)evt.getArg();
- if(msg_listener != null && os != null)
+ if(msg_listener != null && os != null) {
msg_listener.getState(os);
+ }
break;
case Event.STATE_TRANSFER_INPUTSTREAM:
@@ -517,9 +516,8 @@ private Object handleUpEvent(Event evt) throws Throwable {
View v=(View) evt.getArg();
List<Address> new_mbrs=v.getMembers();
setMembers(new_mbrs);
- if(membership_listener != null) {
+ if(membership_listener != null)
membership_listener.viewAccepted(v);
- }
break;
case Event.SET_LOCAL_ADDRESS:
@@ -565,24 +563,14 @@ public Object up(Event evt) {
}
}
}
- else {
- if(log.isErrorEnabled()) { //Something is seriously wrong, correlator should not be null since latch is not locked!
- log.error("correlator is null, event will be ignored (evt=" + evt + ")");
- }
- }
return null;
}
public Object down(Event evt) {
- if(channel != null) {
+ if(channel != null)
return channel.down(evt);
- }
- else
- if(this.log.isWarnEnabled()) {
- this.log.warn("channel is null, discarding event " + evt);
- }
return null;
}
Oops, something went wrong.

0 comments on commit f44a32d

Please sign in to comment.