Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 30, 2007
1 parent a09d51f commit c536f3b
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: CoordGmsImpl.java,v 1.69.2.1 2007/10/17 16:32:18 belaban Exp $
// $Id: CoordGmsImpl.java,v 1.69.2.2 2007/10/30 12:35:00 belaban Exp $

package org.jgroups.protocols.pbcast;

Expand Down Expand Up @@ -593,7 +593,7 @@ private ViewId generateMergeId() {
* views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher
* seqnos for duplicate digests.<p>
* After merging all members into a Membership and subsequent sorting, the first member of the sorted membership
* will be the new coordinator.
* will be the new coordinator. This method has a lock on merge_rsps.
* @param merge_rsps A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed
* not to be null and to contain at least 1 member.
*/
Expand Down Expand Up @@ -654,7 +654,7 @@ private MergeData consolidateMergeData(Vector<MergeData> merge_rsps) {

/**
* Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno),
* max(high_seqno_seen)
* max(high_seqno_seen). This method has a lock on merge_rsps
*/
private Digest consolidateDigests(Vector<MergeData> merge_rsps, int num_mbrs) {
MergeData data;
Expand Down Expand Up @@ -748,7 +748,7 @@ private void sendMergeCancelledMessage(Vector coords, ViewId merge_id) {
}
}

/** Removed rejected merge requests from merge_rsps and coords */
/** Removed rejected merge requests from merge_rsps and coords. This method has a lock on merge_rsps */
private void removeRejectedMergeRequests(Vector coords) {
MergeData data;
for(Iterator it=merge_rsps.iterator(); it.hasNext();) {
Expand Down Expand Up @@ -802,8 +802,6 @@ public boolean isRunning() {
* Runs the merge protocol as a leader
*/
public void run() {
MergeData combined_merge_data;

if(merging == true) {
if(log.isWarnEnabled()) log.warn("merge is already in progress, terminating");
return;
Expand All @@ -820,23 +818,30 @@ public void run() {

/* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only
to members who accepted the merge request) */
removeRejectedMergeRequests(coords);

if(merge_rsps.size() <= 1) {
if(log.isWarnEnabled())
log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge");
sendMergeCancelledMessage(coords, merge_id);
return;
MergeData combined_merge_data=null;
boolean abort_merge=false;
synchronized(merge_rsps) {
removeRejectedMergeRequests(coords);
if(merge_rsps.size() <= 1) {
if(log.isWarnEnabled())
log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge");
abort_merge=true;
}
else {
/* 4. Combine all views and digests into 1 View/1 Digest */
combined_merge_data=consolidateMergeData(merge_rsps);
if(combined_merge_data == null) {
if(log.isErrorEnabled()) log.error("combined_merge_data == null");
abort_merge=true;
}
}
}

/* 4. Combine all views and digests into 1 View/1 Digest */
combined_merge_data=consolidateMergeData(merge_rsps);
if(combined_merge_data == null) {
if(log.isErrorEnabled()) log.error("combined_merge_data == null");
if(abort_merge) {
sendMergeCancelledMessage(coords, merge_id);
return;
}

/* 5. Don't allow JOINs or LEAVEs until we are done with the merge. Suspend() will clear the
view handler queue, so no requests beyond this current MERGE request will be processed */
gms.getViewHandler().suspend(merge_id);
Expand Down

0 comments on commit c536f3b

Please sign in to comment.