Skip to content
Browse files

RSVP now cancels pending tasks and unblocks AckCollectors in destroy() (

  • Loading branch information...
1 parent 9d1d017 commit 332c1186e04edccccac330096f75df40131f8c97 @belaban committed Apr 4, 2012
View
12 src/org/jgroups/protocols/RSVP.java
@@ -71,6 +71,17 @@ public void init() throws Exception {
}
}
+
+ public void destroy() {
+ synchronized(ids) {
+ for(Entry entry: ids.values())
+ entry.destroy();
+ ids.clear();
+ }
+ super.destroy();
+ }
+
+
public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
@@ -255,6 +266,7 @@ public void run() {
protected void cancelTask() {
if(resend_task != null)
resend_task.cancel(false);
+ ack_collector.destroy();
}
protected void ack(Address member) {ack_collector.ack(member);}
View
7 src/org/jgroups/util/AckCollector.java
@@ -42,6 +42,13 @@ public synchronized void reset(Collection<Address> members) {
all_acks_received.reset();
}
+ public synchronized void destroy() {
+ suspected_mbrs.clear();
+ missing_acks.clear();
+ expected_acks=0;
+ all_acks_received.setResult(null);
+ }
+
public synchronized int size() {
return missing_acks.size();
}
View
17 tests/junit-functional/org/jgroups/tests/AckCollectorTest.java
@@ -143,6 +143,23 @@ public void testResetWithDuplicateMembers() {
assert ac.size() == 5;
}
+ public void testDestroy() {
+ List<Address> tmp_list=Arrays.asList(one,two,one,three,four,one,five);
+ final AckCollector ac=new AckCollector(tmp_list);
+ System.out.println("ac = " + ac);
+ assert ac.size() == 5;
+ Thread thread=new Thread() {
+ public void run() {
+ Util.sleep(2000);
+ ac.destroy();
+ }
+ };
+ thread.start();
+ boolean result=ac.waitForAllAcks(10000);
+ System.out.println("result = " + result);
+ assert !result;
+ }
+
public static void testNullList() throws TimeoutException {
AckCollector coll=new AckCollector();
coll.waitForAllAcks(1000);
View
26 tests/junit-functional/org/jgroups/tests/PromiseTest.java
@@ -91,6 +91,15 @@ public void run() {
}
+ public static void testReset() {
+ final Promise p=new Promise();
+ Resetter resetter=new Resetter(p, 2000);
+ resetter.start();
+ Object result=p.getResultWithTimeout(5000);
+ System.out.println("result = " + result);
+ assert result == null;
+ }
+
static class ResultSetter extends Thread {
@@ -125,6 +134,23 @@ public void run() {
}
+ static class Resetter extends Thread {
+ protected final Promise<?> target;
+ protected final long wait_time;
+
+ public Resetter(Promise<?> target, long wait_time) {
+ this.target=target;
+ this.wait_time=wait_time;
+ }
+
+ public void run() {
+ Util.sleep(wait_time);
+ // target.reset();
+ target.setResult(null);
+ }
+ }
+
+
}

0 comments on commit 332c118

Please sign in to comment.
Something went wrong with that request. Please try again.