Skip to content
Browse files

Added MergeKiller to GMS: whenever a merge-id isn't reset for more th…

…an N seconds, the merge will be force-cancelled (https://issues.jboss.org/browse/JGRP-1377)
  • Loading branch information...
1 parent b2031f8 commit c887d4d9714a526532c598d51012954352330501 @belaban committed Oct 19, 2011
View
47 src/org/jgroups/protocols/pbcast/GMS.java
@@ -40,9 +40,15 @@
@Property(description="Leave timeout")
long leave_timeout=5000;
- @Property(description="Timeout to complete merge")
+ @Property(description="Timeout (in ms) to complete merge")
long merge_timeout=5000; // time to wait for all MERGE_RSPS
+ @Property(description="Max time (in ms) a merge is allowed to run before it will be force-killed")
+ protected long max_merge_time=2 * 60 * 1000; // 2 minutes by default
+
+ @Property(description="Interval (in ms) the MergeKiller task runs at, must be less than max_merge_time. 0 disables it.")
+ protected long merge_killer_task_timeout=60 * 1000;
+
@Property(description="Print local address of this member after connect. Default is true")
private boolean print_local_addr=true;
@@ -108,6 +114,8 @@
// Handles merge related tasks
final Merger merger=new Merger(this, log);
+
+ protected Future<?> merge_killer;
protected Address local_addr=null;
protected final Membership members=new Membership(); // real membership
@@ -159,12 +167,16 @@ public GMS() {
public int getNumMembers() {return members.size();}
public long getJoinTimeout() {return join_timeout;}
public void setJoinTimeout(long t) {join_timeout=t;}
+ public long getMergeTimeout() {return merge_timeout;}
+ public void setMergeTimeout(long timeout) {merge_timeout=timeout;}
- public long getMergeTimeout() {
- return merge_timeout;
+ @ManagedAttribute(description="Whether the merge killer task is running")
+ public boolean getMergeKillerRunning() {
+ return !(merge_killer.isCancelled() || merge_killer.isDone());
}
- public void setMergeTimeout(long timeout) {merge_timeout=timeout;}
+ @ManagedAttribute(description="Stringified version of merge_id")
+ public String getMergeIdAsString() {return merger.getMergeIdAsString();}
@ManagedOperation
public String printPreviousMembers() {
@@ -318,10 +330,26 @@ public void init() throws Exception {
}
public void start() throws Exception {
- if(impl != null) impl.start();
+ if(impl != null) impl.start();
+ if(merge_killer_task_timeout > 0) {
+ merge_killer=timer.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ long timestamp=merger.getMergeIdTimestamp();
+ if(timestamp > 0) {
+ long diff=System.currentTimeMillis() - timestamp;
+ if(diff >= max_merge_time) {
+ if(merger.forceCancelMerge())
+ log.warn("force-cancelled merge task after " + diff + " ms");
+ }
+ }
+ }
+ }, merge_killer_task_timeout, merge_killer_task_timeout, TimeUnit.MILLISECONDS);
+ }
}
public void stop() {
+ if(merge_killer != null)
+ merge_killer.cancel(true);
view_handler.stop(true);
if(impl != null) impl.stop();
if(prev_members != null)
@@ -387,6 +415,15 @@ public void fixDigests() {
((CoordGmsImpl)impl).fixDigests();
}
+ @ManagedOperation(description="Forces cancellation of current merge task")
+ public boolean cancelMerge() {
+ boolean result=merger.forceCancelMerge();
+ if(log.isDebugEnabled()) {
+ log.debug(result? "Merge was cancelled" : "There was no merge to be cancelled");
+ }
+ return result;
+ }
+
/**
* Computes the next view. Returns a copy that has <code>old_mbrs</code> and
* <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
View
106 src/org/jgroups/protocols/pbcast/Merger.java
@@ -34,6 +34,11 @@
@GuardedBy("merge_lock")
private MergeId merge_id=null;
+ /** Timestamp when the last merge was started, ie. merge_id was set. Used by the merge canceller
+ (see https://issues.jboss.org/browse/JGRP-1377) */
+ @GuardedBy("merge_lock")
+ protected long merge_id_timestamp=0;
+
@GuardedBy("merge_canceller_lock")
private Future<?> merge_canceller_future=null;
@@ -45,6 +50,12 @@ public Merger(GMS gms, Log log) {
this.log=log;
}
+ public String getMergeIdAsString() {return merge_id != null? merge_id.toString() : null;}
+
+ public long getMergeIdTimestamp() {
+ return merge_id_timestamp;
+ }
+
/**
* Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol.
@@ -166,17 +177,20 @@ public void handleMergeView(final MergeData data,final MergeId merge_id) {
List<Address> newViewMembers=new ArrayList<Address>(data.view.getMembers());
newViewMembers.removeAll(gms.members.getMembers());
-
- gms.castViewChangeWithDest(data.view, data.digest, null, newViewMembers);
- // if we have flush in stack send ack back to merge coordinator
- if(gms.flushProtocolInStack) {
- Message ack=new Message(data.getSender(), null, null);
- ack.setFlag(Message.OOB);
- GMS.GmsHeader ack_hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW_OK);
- ack.putHeader(gms.getId(), ack_hdr);
- gms.getDownProtocol().down(new Event(Event.MSG, ack));
+ try {
+ gms.castViewChangeWithDest(data.view, data.digest, null, newViewMembers);
+ // if we have flush in stack send ack back to merge coordinator
+ if(gms.flushProtocolInStack) {
+ Message ack=new Message(data.getSender(), null, null);
+ ack.setFlag(Message.OOB);
+ GMS.GmsHeader ack_hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW_OK);
+ ack.putHeader(gms.getId(), ack_hdr);
+ gms.getDownProtocol().down(new Event(Event.MSG, ack));
+ }
+ }
+ finally {
+ cancelMerge(merge_id);
}
- cancelMerge(merge_id);
}
public void handleMergeCancelled(MergeId merge_id) {
@@ -391,21 +405,35 @@ void stop() {
}
- void cancelMerge(MergeId id) {
+ boolean cancelMerge(MergeId id) {
if(setMergeId(id, null)) {
merge_task.stop();
merge_rsps.reset();
gms.getViewHandler().resume(id);
+ return true;
}
+ return false;
}
- private boolean setMergeId(MergeId expected, MergeId new_value) {
+ boolean forceCancelMerge() {
+ merge_lock.lock();
+ try {
+ return this.merge_id != null && cancelMerge(this.merge_id);
+ }
+ finally {
+ merge_lock.unlock();
+ }
+ }
+
+
+ boolean setMergeId(MergeId expected, MergeId new_value) {
merge_lock.lock();
try {
boolean match=Util.match(this.merge_id, expected);
if(match) {
this.merge_id=new_value;
+ this.merge_id_timestamp=new_value == null? 0 : System.currentTimeMillis();
stopMergeCanceller();
if(this.merge_id != null)
startMergeCanceller();
@@ -453,7 +481,7 @@ private void startMergeCanceller() {
try {
if(merge_canceller_future == null || merge_canceller_future.isDone()) {
MergeCanceller task=new MergeCanceller(this.merge_id);
- merge_canceller_future=gms.timer.schedule(task, (long)(gms.merge_timeout * 1.5), TimeUnit.MILLISECONDS);
+ merge_canceller_future=gms.timer.schedule(task, (long)(gms.merge_timeout * 2), TimeUnit.MILLISECONDS);
}
}
finally {
@@ -496,35 +524,36 @@ private void stopMergeCanceller() {
* @param views Guaranteed to be non-null and to have >= 2 members, or else this thread would not be started
*/
public synchronized void start(Map<Address, View> views) {
- if(thread == null || thread.isAlive()) {
- this.coords.clear();
+ if(thread != null && thread.isAlive()) // the merge thread is already running
+ return;
- // now remove all members which don't have us in their view, so RPCs won't block (e.g. FLUSH)
- // https://jira.jboss.org/browse/JGRP-1061
- sanitizeViews(views);
-
- // Add all different coordinators of the views into the hashmap and sets their members:
- Collection<Address> coordinators=Util.determineMergeCoords(views);
- for(Address coord: coordinators) {
- View view=views.get(coord);
- if(view != null)
- this.coords.put(coord, new ArrayList<Address>(view.getMembers()));
- }
+ this.coords.clear();
- // For the merge participants which are not coordinator, we simply add them, and the associated
- // membership list consists only of themselves
- Collection<Address> merge_participants=Util.determineMergeParticipants(views);
- merge_participants.removeAll(coordinators);
- for(Address merge_participant: merge_participants) {
- Collection<Address> tmp=new ArrayList<Address>();
- tmp.add(merge_participant);
- coords.putIfAbsent(merge_participant, tmp);
- }
+ // now remove all members which don't have us in their view, so RPCs won't block (e.g. FLUSH)
+ // https://jira.jboss.org/browse/JGRP-1061
+ sanitizeViews(views);
+
+ // Add all different coordinators of the views into the hashmap and sets their members:
+ Collection<Address> coordinators=Util.determineMergeCoords(views);
+ for(Address coord: coordinators) {
+ View view=views.get(coord);
+ if(view != null)
+ this.coords.put(coord, new ArrayList<Address>(view.getMembers()));
+ }
- thread=gms.getThreadFactory().newThread(this, "MergeTask");
- thread.setDaemon(true);
- thread.start();
+ // For the merge participants which are not coordinator, we simply add them, and the associated
+ // membership list consists only of themselves
+ Collection<Address> merge_participants=Util.determineMergeParticipants(views);
+ merge_participants.removeAll(coordinators);
+ for(Address merge_participant: merge_participants) {
+ Collection<Address> tmp=new ArrayList<Address>();
+ tmp.add(merge_participant);
+ coords.putIfAbsent(merge_participant, tmp);
}
+
+ thread=gms.getThreadFactory().newThread(this, "MergeTask");
+ thread.setDaemon(true);
+ thread.start();
}
@@ -579,6 +608,7 @@ public void run() {
if(log.isWarnEnabled())
log.warn(gms.local_addr + ": " + ex.getLocalizedMessage() + ", merge is cancelled");
sendMergeCancelledMessage(coordsCopy, new_merge_id);
+ cancelMerge(new_merge_id); // the message above cancels the merge, too, but this is a 2nd line of defense
}
finally {
gms.getViewHandler().resume(new_merge_id);
View
3 src/org/jgroups/util/MergeId.java
@@ -22,9 +22,10 @@ private MergeId(Address initiator, int id) {
}
public synchronized static MergeId create(Address addr) {
- int id=LAST_ID++;
if(addr == null)
throw new IllegalArgumentException("initiator has to be non null");
+
+ int id=LAST_ID++;
return new MergeId(addr, id);
}
View
22 tests/junit/org/jgroups/protocols/GMS_MergeTest.java
@@ -105,7 +105,7 @@ static void _testMergeRequestTimeout(String props, String cluster_name) throws E
System.out.println("starting merge");
gms.up(new Event(Event.MSG, merge_request));
- long timeout=gms.getMergeTimeout() * 2;
+ long timeout=gms.getMergeTimeout() * 10;
System.out.println("sleeping for " + timeout + " ms, then fetching merge_id: should be null (cancelled by the MergeCanceller)");
long target_time=System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() < target_time) {
@@ -575,14 +575,14 @@ private static void checkUniqueness(String[] ... partitions) throws Exception {
}
private static View createView(String[] partition, JChannel[] channels) throws Exception {
- Vector<Address> members=new Vector<Address>(partition.length);
+ List<Address> members=new ArrayList<Address>(partition.length);
for(String tmp: partition) {
Address addr=findAddress(tmp, channels);
if(addr == null)
throw new Exception(tmp + " not associated with a channel");
members.add(addr);
}
- return new View(members.firstElement(), 10, members);
+ return new View(members.get(0), 10, members);
}
private static void checkViews(JChannel[] channels, String channel_name, String ... members) {
@@ -682,31 +682,31 @@ static void checkMessages(int expected, MyReceiver ... receivers) {
private static class MyChannel extends JChannel {
protected int id=0;
- public MyChannel() throws Exception {
+ private MyChannel() throws Exception {
super();
}
- public MyChannel(File properties) throws Exception {
+ private MyChannel(File properties) throws Exception {
super(properties);
}
- public MyChannel(Element properties) throws Exception {
+ private MyChannel(Element properties) throws Exception {
super(properties);
}
- public MyChannel(URL properties) throws Exception {
+ private MyChannel(URL properties) throws Exception {
super(properties);
}
- public MyChannel(String properties) throws Exception {
+ private MyChannel(String properties) throws Exception {
super(properties);
}
- public MyChannel(ProtocolStackConfigurator configurator) throws Exception {
+ private MyChannel(ProtocolStackConfigurator configurator) throws Exception {
super(configurator);
}
- public MyChannel(JChannel ch) throws Exception {
+ private MyChannel(JChannel ch) throws Exception {
super(ch);
}
@@ -737,7 +737,7 @@ protected void setAddress() {
private final String name;
private AtomicInteger num_msgs=new AtomicInteger(0);
- public MyReceiver(String name) {
+ private MyReceiver(String name) {
this.name=name;
}

0 comments on commit c887d4d

Please sign in to comment.
Something went wrong with that request. Please try again.