Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

GIRAPH-1132 Giraph jobs don't end if zookeeper dies before job starts #21

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -281,7 +281,8 @@ public BspService(
conf.getZookeeperOpsRetryWaitMsecs(),
this,
context);
connectedEvent.waitForever();
connectedEvent.waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
this.fs = FileSystem.get(getConfiguration());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.OutputFormat;

import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -1238,5 +1239,27 @@ public interface GiraphConstants {
BooleanConfOption PREFER_IP_ADDRESSES =
new BooleanConfOption("giraph.preferIP", false,
"Prefer IP addresses instead of host names");

/**
* Timeout for "waitForever", when we need to wait for zookeeper.
* Since we should never really have to wait forever.
* We should only wait some reasonable but large amount of time.
*/
LongConfOption WAIT_FOREVER_ZOOKEEPER_TIMEOUT_MSEC =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: since we are not waiting forever anymore, I'd drop word forever from everywhere (forever and timeout have opposite meaning :-))

new LongConfOption("giraph.waitForeverZookeeperTimeout",
MINUTES.toMillis(15),
"How long should we stay in waitForever loops in various " +
"places that require network connection");

/**
* Timeout for "waitForever", when we need to wait for other workers
* to complete their job.
* Since we should never really have to wait forever.
* We should only wait some reasonable but large amount of time.
*/
LongConfOption WAIT_FOREVER_FOR_OTHER_WORKERS =
new LongConfOption("giraph.waitForeverForOtherWorkers",
HOURS.toMillis(48),
"How long should workers wait to finish superstep");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
Expand Up @@ -863,7 +863,9 @@ public boolean becomeMaster() {
return isMaster;
}
LOG.info("becomeMaster: Waiting to become the master...");
getMasterElectionChildrenChangedEvent().waitForever();
getMasterElectionChildrenChangedEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
getMasterElectionChildrenChangedEvent().reset();
} catch (KeeperException e) {
throw new IllegalStateException(
Expand Down Expand Up @@ -1832,7 +1834,9 @@ private void cleanUpZooKeeper() {
return;
}

getCleanedUpChildrenChangedEvent().waitForever();
getCleanedUpChildrenChangedEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
getCleanedUpChildrenChangedEvent().reset();
}

Expand Down
Expand Up @@ -448,7 +448,9 @@ private void markCurrentWorkerDoneReadingThenWaitForOthers() {
if (inputSplitsDoneStat != null) {
break;
}
getInputSplitsAllDoneEvent().waitForever();
getInputSplitsAllDoneEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_FOR_OTHER_WORKERS.get(
getConfiguration()));
getInputSplitsAllDoneEvent().reset();
}
}
Expand Down Expand Up @@ -647,7 +649,9 @@ private void registerHealth(long superstep) {
"from previous failure): " + myHealthPath +
". Waiting for change in attempts " +
"to re-join the application");
getApplicationAttemptChangedEvent().waitForever();
getApplicationAttemptChangedEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
if (LOG.isInfoEnabled()) {
LOG.info("registerHealth: Got application " +
"attempt changed event, killing self");
Expand Down Expand Up @@ -868,7 +872,9 @@ private void waitForRequestsToFinish() {
private void waitForOtherWorkers(String superstepFinishedNode) {
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
getSuperstepFinishedEvent().waitForever();
getSuperstepFinishedEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_FOR_OTHER_WORKERS.get(
getConfiguration()));
getSuperstepFinishedEvent().reset();
}
} catch (KeeperException e) {
Expand Down Expand Up @@ -1683,7 +1689,9 @@ public final void exchangeVertexPartitions(
LOG.info("exchangeVertexPartitions: Waiting for workers " +
workerIdSet);
}
getPartitionExchangeChildrenChangedEvent().waitForever();
getPartitionExchangeChildrenChangedEvent().waitForeverOrFail(
GiraphConstants.WAIT_FOREVER_FOR_OTHER_WORKERS.get(
getConfiguration()));
getPartitionExchangeChildrenChangedEvent().reset();
}
} catch (KeeperException | InterruptedException e) {
Expand Down
5 changes: 3 additions & 2 deletions giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
Expand Up @@ -43,7 +43,8 @@ public interface BspEvent {
boolean waitMsecs(int msecs);

/**
* Wait indefinitely until the event occurs.
* Waits until timeout or fails with runtime exception.
* @param timeout Throws exception if waiting takes longer than timeout.
*/
void waitForever();
void waitForeverOrFail(long timeout);
}
Expand Up @@ -134,8 +134,12 @@ public boolean waitMsecs(int msecs) {
}

@Override
public void waitForever() {
public void waitForeverOrFail(long timeout) {
long t0 = System.currentTimeMillis();
while (!waitMsecs(msecPeriod)) {
if (System.currentTimeMillis() > t0 + timeout) {
throw new RuntimeException("Timeout waiting");
}
progressable.progress();
}
}
Expand Down
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.giraph.time.Time;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -111,14 +109,14 @@ public void testEvent() {
}

/**
* Thread signaled test for {@link PredicateLock#waitForever()}
* Thread signaled test for {@link PredicateLock#waitForeverOrFail(long)}
*/
@Test
public void testWaitForever() {
BspEvent event = new PredicateLock(getStubProgressable());
Thread signalThread = new SignalThread(event);
signalThread.start();
event.waitForever();
event.waitForeverOrFail(5 * 60_000);
try {
signalThread.join();
} catch (InterruptedException e) {
Expand Down