Skip to content
Permalink
Browse files
JIRA-1212
closes #94
  • Loading branch information
Maja Kabiljo committed Nov 27, 2018
1 parent d412219 commit f5dacbbebb0fd48f136abab2658711719df575eb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
@@ -670,6 +670,16 @@ public final float getMinPercentResponded() {
return MIN_PERCENT_RESPONDED.get(this);
}

/**
* How many mappers is job asking for, taking into account whether master
* is running on the same mapper as worker or not
*
* @return How many mappers is job asking for
*/
public final int getMaxMappers() {
return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0);
}

/**
* Utilize an existing ZooKeeper service. If this is not set, ZooKeeper
* will be dynamically started by Giraph for this job.
@@ -93,7 +93,7 @@ public void run() {
MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
CombinedWorkerProgress lastProgress = null;
while (!finished) {
if (mappersStarted == conf.getMaxWorkers() + 1 &&
if (mappersStarted == conf.getMaxMappers() &&
!workerProgresses.isEmpty()) {
// Combine and log
CombinedWorkerProgress combinedWorkerProgress =
@@ -202,15 +202,15 @@ public void run() {
public synchronized void mapperStarted() {
mappersStarted++;
if (LOG.isInfoEnabled()) {
if (mappersStarted == conf.getMaxWorkers() + 1) {
if (mappersStarted == conf.getMaxMappers()) {
LOG.info("Got all " + mappersStarted + " mappers");
jobGotAllMappers();
} else {
if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
UPDATE_MILLISECONDS) {
lastTimeMappersStartedLogged = System.currentTimeMillis();
LOG.info("Got " + mappersStarted + " but needs " +
(conf.getMaxWorkers() + 1) + " mappers");
conf.getMaxMappers() + " mappers");
}
}
}
@@ -34,6 +34,7 @@
import java.util.Map.Entry;
import java.util.TreeMap;

import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;

/**
@@ -58,6 +59,8 @@
private final Context context;
/** Use superstep counters? */
private final boolean superstepCounterOn;
/** Are master and worker split or not? */
private final boolean splitMasterWorker;
/** Setup seconds */
private double setupSecs = 0d;
/** Superstep timer (in seconds) map */
@@ -78,6 +81,7 @@ public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
this.context = context;
GiraphTimers.init(context);
superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
}

/**
@@ -118,7 +122,10 @@ public void run() {
while (!superstepState.isExecutionComplete()) {
long startSuperstepMillis = System.currentTimeMillis();
long cachedSuperstep = bspServiceMaster.getSuperstep();
GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
// If master and worker are running together, worker will call reset
if (splitMasterWorker) {
GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
}
Class<? extends Computation> computationClass =
bspServiceMaster.getMasterCompute().getComputation();
superstepState = bspServiceMaster.coordinateSuperstep();

0 comments on commit f5dacbb

Please sign in to comment.