Permalink
Browse files

Suppressing merge if we found less than 2 coordinators (https://issue…

…s.jboss.org/browse/JGRP-1387) -- todo: remove stdout statements
  • Loading branch information...
1 parent 811c7bd commit 02567dce73ee2aa7493ac9fe1c37629396e536f8 Bela Ban committed Nov 23, 2011
Showing with 30 additions and 16 deletions.
  1. +30 −16 src/org/jgroups/protocols/pbcast/Merger.java
@@ -4,6 +4,7 @@
import org.jgroups.annotations.GuardedBy;
import org.jgroups.logging.Log;
import org.jgroups.util.*;
+import org.jgroups.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
@@ -168,7 +169,7 @@ public void handleMergeView(final MergeData data, final MergeId merge_id) {
}
// only send to our *current* members, if we have A and B being merged (we are B), then we would *not*
- // receive a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view
+ // want to block on a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view
List<Address> newViewMembers=new ArrayList<Address>(data.view.getMembers());
newViewMembers.removeAll(gms.members.getMembers());
@@ -286,6 +287,10 @@ private void sendMergeView(Collection<Address> coords, MergeData combined_merge_
}
long start=System.currentTimeMillis();
+
+ //int cnt=0;
+
+
for(Address coord: coords) {
Message msg=new Message(coord, null, null);
GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW);
@@ -294,16 +299,19 @@ private void sendMergeView(Collection<Address> coords, MergeData combined_merge_
hdr.merge_id=merge_id;
msg.putHeader(gms.getId(),hdr);
gms.getDownProtocol().down(new Event(Event.MSG,msg));
+ // cnt++;
}
+ //System.out.println("--->> " + gms.local_addr + " sent INSTALL_MERGE_VIEW to " + cnt + " targets in " + (System.currentTimeMillis() - start) + " ms");
+
//[JGRP-700] - FLUSH: flushing should span merge
// if flush is in stack wait for acks from separated island coordinators
if(gms.flushProtocolInStack) {
try {
gms.merge_ack_collector.waitForAllAcks(gms.view_ack_collection_timeout);
- long stop=System.currentTimeMillis();
if(log.isTraceEnabled())
- log.trace(gms.local_addr + ": received all ACKs (" + size + ") for merge view " + view + " in " + (stop - start) + "ms");
+ log.trace(gms.local_addr + ": received all ACKs (" + size + ") for merge view " + view +
+ " in " + (System.currentTimeMillis() - start) + "ms");
}
catch(TimeoutException e) {
log.warn(gms.local_addr + ": failed to collect all ACKs (" + size + ") for merge view " + view
@@ -558,6 +566,11 @@ public synchronized void start(Map<Address, View> views) {
coords.putIfAbsent(merge_participant, tmp);
}
+ if(coords.keySet().size() <= 1) {
+ log.trace(gms.local_addr + ": less than 2 coordinators; merge is not done");
+ return;
+ }
+
thread=gms.getThreadFactory().newThread(this, "MergeTask");
thread.setDaemon(true);
thread.start();
@@ -595,7 +608,8 @@ public void run() {
}
finally {
/* 5. if flush is in stack stop the flush for entire cluster [JGRP-700] - FLUSH: flushing should span merge */
- gms.stopFlush();
+ if(gms.flushProtocolInStack)
+ gms.stopFlush();
thread=null;
}
long diff=System.currentTimeMillis() - start;
@@ -612,22 +626,22 @@ protected void _run(MergeId new_merge_id, final Collection<Address> coordsCopy)
}
if(log.isTraceEnabled())
log.trace(gms.local_addr + ": merge_id is " + merge_id);
-/*
+
List<Address> tmp=new ArrayList<Address>(coords.keySet());
Collections.sort(tmp, new Comparator<Address>() {
public int compare(Address o1, Address o2) {
String s1=UUID.get(o1), s2=UUID.get(o2);
return s1 != null && s2 != null? s1.compareTo(s2) : o1.compareTo(o2);
}
- });*/
+ });
if(log.isDebugEnabled())
log.debug(gms.local_addr + ": merge task " + merge_id + " started with " + coords.keySet().size() +
" coords: " + Util.printListWithDelimiter(coords.keySet(), ", "));
- //System.out.println("\n--->> " + gms.local_addr + ": merge task " + merge_id + " started with " +
- // coords.keySet().size() + " coords: " +
- // (coords.keySet().size() < 20? (": " + tmp) : ""));
+ System.out.println("\n--->> " + gms.local_addr + ": merge task " + merge_id + " started with " +
+ coords.keySet().size() + " coords " +
+ (coords.keySet().size() < 20? (": " + tmp) : ""));
/* 2. Fetch the current Views/Digests from all subgroup coordinators */
success=getMergeDataFromSubgroupCoordinators(coords, new_merge_id, gms.merge_timeout);
@@ -637,8 +651,8 @@ public int compare(Address o1, Address o2) {
if(log.isDebugEnabled())
log.debug("merge leader " + gms.local_addr + " did not get responses from all partition coordinators " +
coords.keySet().size() + "; missing responses from " + missing.size() + ", removing them from the merge");
- //System.out.println("-->> merge leader " + gms.local_addr + " did not get responses from all " + coords.keySet().size()
- // + " partition coordinators " + "; missing " + missing.size() + " responses, removing them from the merge");
+ System.out.println("-->> merge leader " + gms.local_addr + " did not get responses from all " + coords.keySet().size()
+ + " partition coordinators " + "; missing " + missing.size() + " responses, removing them from the merge");
merge_rsps.remove(missing);
}
@@ -665,10 +679,10 @@ public int compare(Address o1, Address o2) {
/* 4. Send the new View/Digest to all coordinators (including myself). On reception, they will
install the digest and view in all of their subgroup members */
- //System.out.println("--->> " + gms.local_addr + ": installing merge view in " + coords.keySet().size() + " coords: " +
- // combined_merge_data.view.getViewId()
- // + " (" + combined_merge_data.view.size() + " members" + (combined_merge_data.view.size() < 20?
- // ": " + combined_merge_data.view.getMembers() + ")" : ")"));
+ System.out.println("--->> " + gms.local_addr + ": installing merge view in " + coords.keySet().size() + " coords: " +
+ combined_merge_data.view.getViewId()
+ + " (" + combined_merge_data.view.size() + " members" + (combined_merge_data.view.size() < 20?
+ ": " + combined_merge_data.view.getMembers() + ")" : ")"));
if(log.isDebugEnabled())
log.debug(gms.local_addr + ": installing merge view in " + coords.keySet().size() + " coords: " +
combined_merge_data.view.getViewId()
@@ -734,7 +748,7 @@ private void removeRejectedMergeRequests(Collection<Address> coords) {
}
if(num_removed > 0) {
- // System.out.println(gms.local_addr + " -->> removed " + num_removed + " rejected merge responses");
+ System.out.println(gms.local_addr + " -->> removed " + num_removed + " rejected merge responses");
if(log.isTraceEnabled())
log.trace(gms.local_addr + ": removed " + num_removed + " rejected merge responses");
}

0 comments on commit 02567dc

Please sign in to comment.