From 0a959f4c179dd385f28d2a3a28ae0287caf5b35a Mon Sep 17 00:00:00 2001 From: Maja Kabiljo Date: Mon, 26 Nov 2018 10:15:15 -0800 Subject: [PATCH 1/2] Fix DefaultJobProgressTracker when splitMasterWorker=false Summary: DefaultJobProgressTracker assumes we are using numWorkers+1 mappers, fix that Test Plan: Ran a job with splitMasterWorker=false, verified job progress gets printed correctly --- .../org/apache/giraph/conf/GiraphConfiguration.java | 10 ++++++++++ .../giraph/job/DefaultJobProgressTrackerService.java | 6 +++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 17c48a5f1..832e8b692 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -670,6 +670,16 @@ public final float getMinPercentResponded() { return MIN_PERCENT_RESPONDED.get(this); } + /** + * How many mappers is job asking for, taking into account whether master + * is running on the same mapper as worker or not + * + * @return How many mappers is job asking for + */ + public final int getMaxMappers() { + return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0); + } + /** * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper * will be dynamically started by Giraph for this job. diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java index 243ab81c8..a1e7f1275 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java @@ -93,7 +93,7 @@ public void run() { MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf); CombinedWorkerProgress lastProgress = null; while (!finished) { - if (mappersStarted == conf.getMaxWorkers() + 1 && + if (mappersStarted == conf.getMaxMappers() && !workerProgresses.isEmpty()) { // Combine and log CombinedWorkerProgress combinedWorkerProgress = @@ -202,7 +202,7 @@ public void run() { public synchronized void mapperStarted() { mappersStarted++; if (LOG.isInfoEnabled()) { - if (mappersStarted == conf.getMaxWorkers() + 1) { + if (mappersStarted == conf.getMaxMappers()) { LOG.info("Got all " + mappersStarted + " mappers"); jobGotAllMappers(); } else { @@ -210,7 +210,7 @@ public synchronized void mapperStarted() { UPDATE_MILLISECONDS) { lastTimeMappersStartedLogged = System.currentTimeMillis(); LOG.info("Got " + mappersStarted + " but needs " + - (conf.getMaxWorkers() + 1) + " mappers"); + conf.getMaxMappers() + " mappers"); } } } From bef0b8f78b4af165721782956f00f9ff7e4bdcde Mon Sep 17 00:00:00 2001 From: Maja Kabiljo Date: Tue, 27 Nov 2018 10:49:55 -0800 Subject: [PATCH 2/2] Fix a concurrency bug when splitMasterWorker=false --- .../main/java/org/apache/giraph/master/MasterThread.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java index 8e4e0b8f1..0f6a3a039 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java @@ -34,6 +34,7 @@ import java.util.Map.Entry; import java.util.TreeMap; +import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER; import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS; /** @@ -58,6 +59,8 @@ public class MasterThread bspServiceMaster, this.context = context; GiraphTimers.init(context); superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration()); + splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration()); } /** @@ -118,7 +122,10 @@ public void run() { while (!superstepState.isExecutionComplete()) { long startSuperstepMillis = System.currentTimeMillis(); long cachedSuperstep = bspServiceMaster.getSuperstep(); - GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep); + // If master and worker are running together, worker will call reset + if (splitMasterWorker) { + GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep); + } Class computationClass = bspServiceMaster.getMasterCompute().getComputation(); superstepState = bspServiceMaster.coordinateSuperstep();