Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- Implementation of call termination in MessageDispatcher when underl…

…ying channel is closed (https://issues.jboss.org/browse/JGRP-1508)

 - Unit test (by Dan Berindei)
  • Loading branch information...
commit 618dc198d937b06ceb2696a1f7367d25bf54b9eb 1 parent 1b7eb34
@belaban authored
View
24 src/org/jgroups/blocks/GroupRequest.java
@@ -229,6 +229,30 @@ public void viewChange(View new_view) {
checkCompletion(this);
}
+ /** Marks all responses with an exception (unless a response was already marked as done) */
+ public void transportClosed() {
+ boolean changed=false;
+
+ lock.lock();
+ try {
+ for(Map.Entry<Address, Rsp<T>> entry: requests.entrySet()) {
+ Rsp<T> rsp=entry.getValue();
+ if(rsp != null && !(rsp.wasReceived() || rsp.wasSuspected())) {
+ rsp.setException(new IllegalStateException("transport was closed"));
+ num_received++;
+ changed=true;
+ }
+ }
+ if(changed) {
+ completed.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ if(changed)
+ checkCompletion(this);
+ }
/* -------------------- End of Interface RspCollector ----------------------------------- */
View
6 src/org/jgroups/blocks/MessageDispatcher.java
@@ -65,6 +65,7 @@ public MessageDispatcher(Channel channel, MessageListener l, MembershipListener
prot_adapter=new ProtocolAdapter();
if(channel != null) {
local_addr=channel.getAddress();
+ channel.addChannelListener(this);
}
setMessageListener(l);
setMembershipListener(l2);
@@ -613,8 +614,11 @@ public Object up(Event evt) {
public Object down(Event evt) {
- if(channel != null)
+ if(channel != null) {
+ if(evt.getType() == Event.MSG && !channel.isConnected())
+ throw new IllegalStateException("channel is not connected");
return channel.down(evt);
+ }
return null;
}
View
2  src/org/jgroups/blocks/Request.java
@@ -150,7 +150,7 @@ public boolean isDone() {
public String toString() {
StringBuilder ret=new StringBuilder(128);
ret.append(super.toString());
- ret.append("req_id=").append(req_id).append(", mode=" + options.getMode());
+ ret.append(", req_id=").append(req_id).append(", mode=" + options.getMode());
return ret.toString();
}
View
3  src/org/jgroups/blocks/RequestCorrelator.java
@@ -249,6 +249,9 @@ public final void start() {
public void stop() {
started=false;
+ for(RspCollector coll: requests.values())
+ coll.transportClosed();
+ requests.clear();
}
View
3  src/org/jgroups/blocks/RpcDispatcher.java
@@ -20,7 +20,7 @@
* Is the equivalent of RpcProtocol on the application rather than protocol level.
* @author Bela Ban
*/
-public class RpcDispatcher extends MessageDispatcher implements ChannelListener {
+public class RpcDispatcher extends MessageDispatcher {
protected Object server_obj=null;
/** Marshaller to marshall requests at the caller and unmarshal requests at the receiver(s) */
protected Marshaller req_marshaller=null;
@@ -37,7 +37,6 @@ public RpcDispatcher() {
public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj) {
super(channel, l, l2);
- channel.addChannelListener(this);
this.server_obj=server_obj;
}
View
2  src/org/jgroups/blocks/RspCollector.java
@@ -2,7 +2,6 @@
package org.jgroups.blocks;
import org.jgroups.Address;
-import org.jgroups.Message;
import org.jgroups.View;
@@ -10,4 +9,5 @@
void receiveResponse(Object response_value, Address sender, boolean is_exception);
void suspect(Address mbr);
void viewChange(View new_view);
+ void transportClosed();
}
View
17 src/org/jgroups/blocks/UnicastRequest.java
@@ -151,6 +151,23 @@ public void viewChange(View new_view) {
checkCompletion(this);
}
+ public void transportClosed() {
+ lock.lock();
+ try {
+ if(done)
+ return;
+ if(result != null && !result.wasReceived())
+ result.setException(new IllegalStateException("transport was closed"));
+ done=true;
+ if(corr != null)
+ corr.done(req_id);
+ completed.signalAll();
+ }
+ finally {
+ lock.unlock();
+ }
+ checkCompletion(this);
+ }
/* -------------------- End of Interface RspCollector ----------------------------------- */
View
1  tests/junit-functional/org/jgroups/tests/MessageDispatcherRSVPTest.java
@@ -29,6 +29,7 @@
/**
* Tests the {@link org.jgroups.protocols.RSVP} protocol
* @author Dan Berindei
+ * @author Bela Ban
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
public class MessageDispatcherRSVPTest {
Please sign in to comment.
Something went wrong with that request. Please try again.