Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Now the candidates for merge leaders are picked from *actual* coordin…

…ators (first in their views) (https://issues.jboss.org/browse/JGRP-1451)
  • Loading branch information...
commit 849995d66ffb8862786762c104ad6a9d1780f888 1 parent be6ac09
@belaban authored
View
4 src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
@@ -221,7 +221,7 @@ public void handleMembershipChange(Collection<Request> requests) {
}
sendLeaveResponses(leaving_mbrs); // no-op if no leaving members
- gms.castViewChange(new_view, join_rsp != null? join_rsp.getDigest() : null, join_rsp, new_mbrs);
+ gms.castViewChange(new_view,join_rsp != null? join_rsp.getDigest() : null,join_rsp,new_mbrs);
}
finally {
if(hasJoiningMembers)
@@ -257,7 +257,7 @@ public void stop() {
private void sendLeaveResponses(Collection<Address> leaving_members) {
- for(Address address:leaving_members){
+ for(Address address: leaving_members){
Message msg=new Message(address, null, null); // send an ack to the leaving member
msg.setFlag(Message.OOB);
GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP);
View
14 src/org/jgroups/protocols/pbcast/GMS.java
@@ -39,7 +39,7 @@
long join_timeout=5000;
@Property(description="Leave timeout")
- long leave_timeout=5000;
+ long leave_timeout=1000;
@Property(description="Timeout (in ms) to complete merge")
long merge_timeout=5000; // time to wait for all MERGE_RSPS
@@ -480,7 +480,7 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection<
// Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
// in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
// Check NAKACK's TMP_VIEW handling for details
- up_prot.up(new Event(Event.TMP_VIEW, new_view));
+ up_prot.up(new Event(Event.TMP_VIEW,new_view));
down_prot.down(new Event(Event.TMP_VIEW, new_view));
List<Address> ackMembers=new ArrayList<Address>(new_view.getMembers());
@@ -490,7 +490,7 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection<
Message view_change_msg=new Message(); // bcast to all members
GmsHeader hdr=new GmsHeader(GmsHeader.VIEW, new_view);
hdr.my_digest=digest;
- view_change_msg.putHeader(this.id, hdr);
+ view_change_msg.putHeader(this.id,hdr);
// If we're the only member the VIEW is broadcast to, let's simply install the view directly, without
// sending the VIEW multicast ! Or else N-1 members drop the multicast anyway...
@@ -569,10 +569,10 @@ public void installView(View new_view, Digest digest) {
ViewId view_id=view.getViewId();
int rc=vid.compareToIDs(view_id);
if(rc <= 0) {
- if(log.isWarnEnabled() && rc < 0 && log_view_warnings) { // only scream if view is smaller, silently discard same views
- log.warn(local_addr + ": received view < current view;" +
- " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
- }
+ //if(log.isTraceEnabled() && rc < 0 && log_view_warnings) { // only scream if view is smaller, silently discard same views
+ // log.trace(local_addr + ": received view < current view;" +
+ // " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
+ //}
return;
}
}
View
13 src/org/jgroups/protocols/pbcast/Merger.java
@@ -63,15 +63,22 @@ public void merge(Map<Address, View> views) {
}
// we need the merge *coordinators* not merge participants because not everyone can lead a merge !
- Collection<Address> coords=Util.determineMergeCoords(views);
- Collection<Address> merge_participants=Util.determineMergeParticipants(views);
+ Collection<Address> coords=Util.determineActualMergeCoords(views);
+ if(coords.isEmpty()) {
+ log.error(gms.local_addr + ": unable to determine merge leader from " + views + "; not starting a merge");
+ return;
+ }
+
+
Membership tmp=new Membership(coords); // establish a deterministic order, so that coords can elect leader
tmp.sort();
Address merge_leader=tmp.elementAt(0);
if(merge_leader.equals(gms.local_addr)) {
- if(log.isDebugEnabled())
+ if(log.isDebugEnabled()) {
+ Collection<Address> merge_participants=Util.determineMergeParticipants(views);
log.debug("I (" + gms.local_addr + ") will be the leader. Starting the merge task for " +
merge_participants.size() + " coords");
+ }
merge_task.start(views);
}
else {
View
9 src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java
@@ -46,7 +46,6 @@ public void joinWithStateTransfer(Address mbr,boolean useFlushIfPresent) {
public void leave(Address mbr) {
Address coord;
int max_tries=3;
- Boolean result;
leave_promise.reset();
@@ -61,9 +60,9 @@ public void leave(Address mbr) {
return;
}
- if(log.isDebugEnabled()) log.debug("sending LEAVE request to " + coord + " (local_addr=" + gms.local_addr + ")");
+ if(log.isDebugEnabled()) log.debug(gms.local_addr + ": sending LEAVE request to " + coord);
sendLeaveMessage(coord, mbr);
- result=leave_promise.getResult(gms.leave_timeout);
+ Boolean result=leave_promise.getResult(gms.leave_timeout);
if(result != null)
break;
}
@@ -87,6 +86,10 @@ public void handleJoinResponse(JoinRsp join_rsp) {
}
public void handleLeaveResponse() {
+
+ System.out.println("[" + gms.local_addr + "]: unblocking leave_promise (hasResult=" + leave_promise.hasResult() + ")");
+
+
leave_promise.setResult(true); // unblocks thread waiting in leave()
}
View
21 src/org/jgroups/util/Util.java
@@ -2315,6 +2315,27 @@ public static boolean containsViewId(Collection<View> views, ViewId vid) {
}
/**
+ * Similar to {@link #determineMergeCoords(java.util.Map)} but only actual coordinators are counted: an actual
+ * coord is when the sender of a view is the first member of that view
+ * @param map
+ * @return
+ */
+ public static Collection<Address> determineActualMergeCoords(Map<Address,View> map) {
+ Set<Address> retval=new HashSet<Address>();
+ if(map != null) {
+ for(Map.Entry<Address,View> entry: map.entrySet()) {
+ Address sender=entry.getKey();
+ List<Address> members=entry.getValue().getMembers();
+ Address actual_coord=members.isEmpty() ? null : members.get(0);
+ if(sender.equals(actual_coord))
+ retval.add(sender);
+ }
+ }
+ return retval;
+ }
+
+
+ /**
* Returns the rank of a member in a given view
* @param view The view
* @param addr The address of a member
View
36 tests/junit/org/jgroups/tests/OverlappingMergeTest.java
@@ -3,6 +3,7 @@
import org.jgroups.*;
import org.jgroups.protocols.Discovery;
import org.jgroups.protocols.MERGE2;
+import org.jgroups.protocols.pbcast.CoordGmsImpl;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
@@ -29,7 +30,7 @@
@BeforeMethod
protected void start() throws Exception {
- a=createChannel(true, 3);
+ a=createChannel(true, 4);
a.setName("A");
ra=new MyReceiver("A", a);
a.setReceiver(ra);
@@ -43,7 +44,7 @@ protected void start() throws Exception {
c.setName("C");
rc=new MyReceiver("C", c);
c.setReceiver(rc);
- modifyConfigs(a, b, c);
+ modifyConfigs(a,b,c);
a.connect("OverlappingMergeTest");
b.connect("OverlappingMergeTest");
@@ -81,7 +82,7 @@ public void testRegularMessageSending() throws Exception {
* <li/>B and C install {B,C}
* <li/>B and C trash the connection table for A in UNICAST
* <li/>A ignores the view, it still has view {A,B,C} and all connection tables intact in UNICAST
- * <li/>We now inject a MERGE(A,B) event into A. This should ause A and B as coords to create a new MergeView {A,B,C}
+ * <li/>We now inject a MERGE(A,B) event into A. This should use A and B as coords to create a new MergeView {A,B,C}
* <li/>The merge already fails because the unicast between A and B fails due to the reason given below !
* Once this is fixed, the next step below should work, too !
* <li/>A sends a unicast to B and C. This should fail until JGRP-940 has been fixed !
@@ -214,7 +215,6 @@ public void testOverlappingMergeWithABC() throws Exception {
System.out.print(".");
for(JChannel ch: new JChannel[]{a,b,c})
runStableProtocol(ch);
- injectMergeEvent(merge_evt, a,b,c);
Util.sleep(1000);
}
@@ -258,14 +258,14 @@ public void testMergeWithDifferentPartitions() throws Exception {
// Inject view {A,C,B} into A, B and C:
View new_view=Util.createView(a.getAddress(), 4, a.getAddress(), c.getAddress(), b.getAddress());
System.out.println("\n ==== Injecting view " + new_view + " into A, B and C ====");
- injectView(new_view, a, b, c);
+ injectView(new_view,false,a,b,c);
assert Util.isCoordinator(a);
assert !Util.isCoordinator(b);
assert !Util.isCoordinator(c);
View view_d=Util.createView(b.getAddress(), 4, b.getAddress(), a.getAddress(), c.getAddress(), d.getAddress());
- System.out.println("\n ==== Injecting view " + view_d + " into D ====");
- injectView(view_d, d);
+ System.out.println("\n ==== Injecting view " + view_d + " into D ====\n");
+ injectView(view_d, false, d);
assert !Util.isCoordinator(d);
for(JChannel ch: Arrays.asList(a,b,c,d))
@@ -276,8 +276,8 @@ public void testMergeWithDifferentPartitions() throws Exception {
Map<Address,View> views=new HashMap<Address,View>();
views.put(a.getAddress(), a.getView());
views.put(b.getAddress(), b.getView());
- views.put(c.getAddress(), c.getView());
- views.put(d.getAddress(), d.getView());
+ views.put(c.getAddress(),c.getView());
+ views.put(d.getAddress(),d.getView());
Event merge_evt=new Event(Event.MERGE, views);
System.out.println("\n==== Injecting a merge event into A, B, C and D====");
@@ -296,11 +296,11 @@ public void testMergeWithDifferentPartitions() throws Exception {
}
for(JChannel ch: Arrays.asList(a,b,c,d))
- System.out.println(ch.getName() + ": " + ch.getView());
+ System.out.println(ch.getName() + ": " + ch.getView() + " (coord=" + isCoord(ch) + ")");
for(JChannel ch: Arrays.asList(a,b,c,d))
assert ch.getView().size() == 4: ch.getName() + ": view is " + ch.getView();
-
+ System.out.println("\n");
}
@@ -399,17 +399,23 @@ private static JChannel determineMergeLeader(JChannel ... coords) {
return null;
}
- private static void injectView(View view, JChannel ... channels) {
+ private static void injectView(View view, boolean print_receivers, JChannel ... channels) {
for(JChannel ch: channels) {
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
gms.installView(view);
}
+ if(!print_receivers)
+ return;
for(JChannel ch: channels) {
MyReceiver receiver=(MyReceiver)ch.getReceiver();
System.out.println("[" + receiver.name + "] view=" + ch.getView());
}
}
+ private static void injectView(View view, JChannel ... channels) {
+ injectView(view, true, channels);
+ }
+
private static void injectMergeEvent(Event evt, JChannel ... channels) {
for(JChannel ch: channels) {
@@ -490,6 +496,12 @@ private static String print(List<Message> msgs) {
}
+ protected boolean isCoord(JChannel ch) {
+ GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
+ return gms.getImpl() instanceof CoordGmsImpl;
+ }
+
+
private static void modifyConfigs(JChannel ... channels) throws Exception {
for(JChannel ch: channels) {
ProtocolStack stack=ch.getProtocolStack();
Please sign in to comment.
Something went wrong with that request. Please try again.