Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

JGRP-1007 - Flush: change signature of JChannel#startFlush

  • Loading branch information...
commit c1c1251f5c52164e4b2a470272a755cf29775e8e 1 parent 6eade6a
@vblagoje vblagoje authored
View
51 src/org/jgroups/Channel.java
@@ -327,37 +327,34 @@ public Receiver getReceiver() {
abstract public boolean flushSupported();
/**
- * Performs a partial flush in a cluster for flush participants.<p/>
- * All pending messages are flushed out only for the flush participants. The remaining members in a cluster are not
- * included in the flush. The flush participants should be a proper subset of a current view.<p/>
- * @param automatic_resume Call {@link #stopFlush()} after the flush
- * @return true if FLUSH completed within the timeout
- * @see #startFlush(boolean)
- */
- abstract public boolean startFlush(List<Address> flushParticipants,boolean automatic_resume);
-
- /**
- * Will perform a flush of the system, ie. all pending messages are flushed out of the system and all members
- * ack their reception. After this call returns, no member will be sending any messages until
- * {@link #stopFlush()} is called.<p/>
- * In case of flush collisions, a random sleep time backoff algorithm is employed and the flush is reattempted for
- * numberOfAttempts. Therefore this method is guaranteed to return after timeout x numberOfAttempts miliseconds.
- * @param automatic_resume Call {@link #stopFlush()} after the flush
- * @return true if FLUSH completed within the timeout
+ * Performs a partial flush in a cluster for flush participants.
+ * <p/>
+ * All pending messages are flushed out only for the flush participants. The remaining members
+ * in a cluster are not included in the flush. The flush participants should be a proper subset
+ * of a current view.
+ * <p/>
+ *
+ * @param automatic_resume
+ * Call {@link #stopFlush()} after the flush
+ * @see #startFlush(boolean)
*/
- abstract public boolean startFlush(boolean automatic_resume);
+ abstract public void startFlush(List<Address> flushParticipants, boolean automatic_resume)
+ throws Exception;
/**
- * Will perform a flush of the system, ie. all pending messages are flushed out of the
- * system and all members ack their reception. After this call returns, no member will
- * be sending any messages until {@link #stopFlush()} is called.
- * @param timeout
- * @param automatic_resume Call {@link #stopFlush()} after the flush
- * @return true if FLUSH completed within the timeout
- * @see #startFlush(boolean)
+ * Will perform a flush of the system, ie. all pending messages are flushed out of the system
+ * and all members ack their reception. After this call returns, no member will be sending any
+ * messages until {@link #stopFlush()} is called.
+ * <p/>
+ * In case of flush collisions, a random sleep time backoff algorithm is employed and the flush
+ * is reattempted for numberOfAttempts. Therefore this method is guaranteed to return after
+ * timeout x numberOfAttempts miliseconds.
+ *
+ * @param automatic_resume
+ * Call {@link #stopFlush()} after the flush
*/
- abstract public boolean startFlush(long timeout, boolean automatic_resume);
-
+ abstract public void startFlush(boolean automatic_resume) throws Exception;
+
abstract public void stopFlush();
abstract public void stopFlush(List<Address> flushParticipants);
View
75 src/org/jgroups/JChannel.java
@@ -1014,55 +1014,56 @@ public boolean flushSupported() {
}
/** {@inheritDoc} */
- public boolean startFlush(boolean automatic_resume) {
- if(!flushSupported()) {
- throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
- }
- boolean successfulFlush=(Boolean)down(new Event(Event.SUSPEND));
-
- if(automatic_resume)
- stopFlush();
-
- return successfulFlush;
+ public void startFlush(boolean automatic_resume) throws Exception {
+ if (!flushSupported()) {
+ throw new IllegalStateException(
+ "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
+ }
+ try {
+ down(new Event(Event.SUSPEND));
+ } catch (Exception e) {
+ throw new ChannelException("Flush failed", e.getCause());
+ } finally {
+ if (automatic_resume)
+ stopFlush();
+ }
}
/** {@inheritDoc} */
- public boolean startFlush(List<Address> flushParticipants,boolean automatic_resume) {
- boolean successfulFlush;
- if(!flushSupported()){
- throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
+ public void startFlush(List<Address> flushParticipants, boolean automatic_resume)
+ throws Exception {
+ if (!flushSupported()) {
+ throw new IllegalStateException(
+ "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
}
View v = getView();
- if(v != null && v.getMembers().containsAll(flushParticipants)){
- successfulFlush=(Boolean)down(new Event(Event.SUSPEND, flushParticipants));
- }else{
+ boolean validParticipants = v != null && v.getMembers().containsAll(flushParticipants);
+ if (!validParticipants)
throw new IllegalArgumentException("Current view " + v
- + " does not contain all flush participants "
- + flushParticipants);
- }
-
- if(automatic_resume)
- stopFlush(flushParticipants);
-
- return successfulFlush;
- }
-
- /** {@inheritDoc} */
- public boolean startFlush(long timeout, boolean automatic_resume) {
- return startFlush(automatic_resume);
+ + " does not contain all flush participants " + flushParticipants);
+ try {
+ down(new Event(Event.SUSPEND, flushParticipants));
+ } catch (Exception e) {
+ throw new ChannelException("Flush failed", e.getCause());
+ } finally {
+ if (automatic_resume)
+ stopFlush(flushParticipants);
+ }
}
public void stopFlush() {
- if(!flushSupported()) {
- throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
- }
- down(new Event(Event.RESUME));
+ if (!flushSupported()) {
+ throw new IllegalStateException(
+ "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
+ }
+ down(new Event(Event.RESUME));
}
public void stopFlush(List<Address> flushParticipants) {
- if(!flushSupported()) {
- throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
- }
+ if (!flushSupported()) {
+ throw new IllegalStateException(
+ "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
+ }
down(new Event(Event.RESUME, flushParticipants));
}
View
74 src/org/jgroups/protocols/pbcast/FLUSH.java
@@ -41,6 +41,8 @@
@MBean(description = "Flushes the cluster")
public class FLUSH extends Protocol {
+ private static final FlushStartResult SUCCESS_START_FLUSH = new FlushStartResult(Boolean.TRUE,null);
+
/*
* ------------------------------------------ Properties------------------------------------------
*/
@@ -120,7 +122,7 @@
@GuardedBy("sharedLock")
private boolean flushCompleted = false;
- private final Promise<Boolean> flush_promise = new Promise<Boolean>();
+ private final Promise<FlushStartResult> flush_promise = new Promise<FlushStartResult>();
private final Promise<Boolean> flush_unblock_promise = new Promise<Boolean>();
@@ -194,18 +196,17 @@ public int getNumberOfFlushes() {
}
@ManagedOperation(description = "Request cluster flush")
- public boolean startFlush() {
- return startFlush(new Event(Event.SUSPEND));
+ public void startFlush() {
+ startFlush(new Event(Event.SUSPEND));
}
@SuppressWarnings("unchecked")
- private boolean startFlush(Event evt) {
+ private void startFlush(Event evt) {
List<Address> flushParticipants = (List<Address>) evt.getArg();
- return startFlush(flushParticipants);
+ startFlush(flushParticipants);
}
- private boolean startFlush(List<Address> flushParticipants) {
- boolean successfulFlush = false;
+ private void startFlush(List<Address> flushParticipants) throws RuntimeException {
if (!flushInProgress.get()) {
flush_promise.reset();
synchronized(sharedLock) {
@@ -214,19 +215,21 @@ private boolean startFlush(List<Address> flushParticipants) {
}
onSuspend(flushParticipants);
try {
- Boolean r = flush_promise.getResultWithTimeout(start_flush_timeout);
- successfulFlush = r.booleanValue();
+ FlushStartResult r = flush_promise.getResultWithTimeout(start_flush_timeout);
+ if(r.failed())
+ throw new RuntimeException(r.getFailureCause());
} catch (TimeoutException e) {
- if (log.isDebugEnabled())
- log.debug(localAddress
- + ": timed out waiting for flush responses after "
- + start_flush_timeout
- + " ms. Rejecting flush to participants "
- + flushParticipants);
rejectFlush(flushParticipants, currentViewId());
+ throw new RuntimeException(localAddress
+ + " timed out waiting for flush responses after "
+ + start_flush_timeout
+ + " ms. Rejected flush to participants "
+ + flushParticipants,e);
}
}
- return successfulFlush;
+ else {
+ throw new RuntimeException("Flush attempt is in progress");
+ }
}
@ManagedOperation(description = "Request end of flush in a cluster")
@@ -265,7 +268,8 @@ public Object down(Event evt) {
return handleConnect(evt, false);
case Event.SUSPEND:
- return startFlush(evt);
+ startFlush(evt);
+ break;
// only for testing, see FLUSH#testFlushWithCrashedFlushCoordinator
@@ -320,7 +324,7 @@ private void blockMessageDuringFlush() {
if (shouldSuspendByItself) {
isBlockingFlushDown = false;
log.warn(localAddress + ": unblocking after " + timeout + "ms");
- flush_promise.setResult(Boolean.TRUE);
+ flush_promise.setResult(new FlushStartResult(Boolean.TRUE,null));
notBlockedDown.signalAll();
}
} catch (InterruptedException e) {
@@ -408,7 +412,7 @@ public void run() {
new Thread(r).start();
}
// however, flush should fail/retry as soon as one FAIL is received
- flush_promise.setResult(Boolean.FALSE);
+ flush_promise.setResult(new FlushStartResult(Boolean.FALSE, new Exception("Flush failed for " + msg.getSrc())));
break;
case FlushHeader.FLUSH_COMPLETED:
@@ -461,7 +465,8 @@ public void run() {
break;
case Event.SUSPEND:
- return startFlush(evt);
+ startFlush(evt);
+ break;
case Event.RESUME:
onResume(evt);
@@ -492,7 +497,7 @@ private void onFlushReconcileOK(Message msg) {
synchronized (sharedLock) {
reconcileOks.add(msg.getSrc());
if (reconcileOks.size() >= flushMembers.size()) {
- flush_promise.setResult(Boolean.TRUE);
+ flush_promise.setResult(SUCCESS_START_FLUSH);
if (log.isDebugEnabled())
log.debug(localAddress + ": all FLUSH_RECONCILE_OK received");
}
@@ -660,7 +665,7 @@ private void onSuspend(final List<Address> members) {
participantsInFlush));
}
if (participantsInFlush.isEmpty()) {
- flush_promise.setResult(Boolean.TRUE);
+ flush_promise.setResult(SUCCESS_START_FLUSH);
} else {
down_prot.down(new Event(Event.MSG, msg));
if (log.isDebugEnabled())
@@ -792,7 +797,7 @@ private void onFlushCompleted(Address address, final FlushHeader header) {
if (needsReconciliationPhase) {
down_prot.down(new Event(Event.MSG, msg));
} else if (flushCompleted) {
- flush_promise.setResult(Boolean.TRUE);
+ flush_promise.setResult(SUCCESS_START_FLUSH);
if (log.isDebugEnabled())
log.debug(localAddress + ": all FLUSH_COMPLETED received");
} else if (collision) {
@@ -885,6 +890,29 @@ private void onSuspect(Address address) {
log.debug(localAddress + ": sent FLUSH_COMPLETED message to " + flushCoordinator);
}
}
+
+ private static class FlushStartResult {
+ private final Boolean result;
+ private final Exception failureCause;
+
+
+ public FlushStartResult(Boolean result, Exception failureCause) {
+ this.result = result;
+ this.failureCause = failureCause;
+ }
+
+ public Boolean getResult() {
+ return result;
+ }
+
+ public boolean failed(){
+ return result == Boolean.FALSE;
+ }
+
+ public Exception getFailureCause() {
+ return failureCause;
+ }
+ }
public static class FlushHeader extends Header {
public static final byte START_FLUSH = 0;
View
15 src/org/jgroups/protocols/pbcast/GMS.java
@@ -693,17 +693,20 @@ protected boolean _startFlush(final View new_view, int maxAttempts, long randomF
}
try {
- boolean successfulFlush=true;
+ boolean successfulFlush=false;
boolean validView=new_view != null && new_view.size() > 0;
if(validView && flushProtocolInStack) {
int attemptCount = 0;
- while(attemptCount < maxAttempts){
- successfulFlush=(Boolean)up_prot.up(new Event(Event.SUSPEND, new ArrayList<Address>(new_view.getMembers())));
- if(successfulFlush)
+ while (attemptCount < maxAttempts) {
+ try {
+ up_prot.up(new Event(Event.SUSPEND, new ArrayList<Address>(new_view.getMembers())));
+ successfulFlush = true;
break;
- Util.sleepRandom(randomFloor,randomCeiling);
- attemptCount++;
+ } catch (Exception e) {
+ Util.sleepRandom(randomFloor, randomCeiling);
+ attemptCount++;
+ }
}
if(successfulFlush) {
View
39 src/org/jgroups/util/Util.java
@@ -2859,17 +2859,19 @@ public static String shortName(String hostname) {
return hostname;
}
- public static boolean startFlush(Channel c, List<Address> flushParticipants, int numberOfAttempts, long randomSleepTimeoutFloor,long randomSleepTimeoutCeiling) {
- boolean successfulFlush = false;
- int attemptCount = 0;
- while(attemptCount < numberOfAttempts){
- successfulFlush = c.startFlush(flushParticipants, false);
- if(successfulFlush)
- break;
- Util.sleepRandom(randomSleepTimeoutFloor,randomSleepTimeoutCeiling);
- attemptCount++;
- }
- return successfulFlush;
+ public static boolean startFlush(Channel c, List<Address> flushParticipants,
+ int numberOfAttempts, long randomSleepTimeoutFloor, long randomSleepTimeoutCeiling) {
+ int attemptCount = 0;
+ while (attemptCount < numberOfAttempts) {
+ try {
+ c.startFlush(flushParticipants, false);
+ return true;
+ } catch (Exception e) {
+ Util.sleepRandom(randomSleepTimeoutFloor, randomSleepTimeoutCeiling);
+ attemptCount++;
+ }
+ }
+ return false;
}
public static boolean startFlush(Channel c, List<Address> flushParticipants) {
@@ -2877,16 +2879,17 @@ public static boolean startFlush(Channel c, List<Address> flushParticipants) {
}
public static boolean startFlush(Channel c, int numberOfAttempts, long randomSleepTimeoutFloor,long randomSleepTimeoutCeiling) {
- boolean successfulFlush = false;
int attemptCount = 0;
while(attemptCount < numberOfAttempts){
- successfulFlush = c.startFlush(false);
- if(successfulFlush)
- break;
- Util.sleepRandom(randomSleepTimeoutFloor,randomSleepTimeoutCeiling);
- attemptCount++;
+ try{
+ c.startFlush(false);
+ return true;
+ } catch(Exception e) {
+ Util.sleepRandom(randomSleepTimeoutFloor,randomSleepTimeoutCeiling);
+ attemptCount++;
+ }
}
- return successfulFlush;
+ return false;
}
public static boolean startFlush(Channel c) {
View
4 tests/junit/org/jgroups/tests/FlushTest.java
@@ -130,11 +130,9 @@ public void testSequentialFlushInvocation() throws Exception {
for (int i = 0; i < 100; i++) {
System.out.print("flush #" + i + ": ");
long start = System.currentTimeMillis();
- boolean status = channel.startFlush(false);
+ channel.startFlush(false);
channel.stopFlush();
long diff = System.currentTimeMillis() - start;
- System.out.println(status ? " OK (in " + diff + " ms)" : " FAIL");
- assert status;
}
} finally {
Util.close(channel, channel2, channel3);
Please sign in to comment.
Something went wrong with that request. Please try again.