From a1d546f7a39df6ffb7d3ed9d3bcc47126ab2e579 Mon Sep 17 00:00:00 2001 From: Maja Kabiljo Date: Fri, 17 Mar 2017 10:40:53 -0700 Subject: [PATCH] JIRA-1134 closes #24 --- .../graph/JobProgressTrackerClientNoOp.java | 5 ++ .../RetryableJobProgressTrackerClient.java | 11 +++ .../giraph/job/CombinedWorkerProgress.java | 32 +++++-- .../job/DefaultJobProgressTrackerService.java | 13 ++- .../apache/giraph/job/JobProgressTracker.java | 10 +++ .../giraph/master/BspServiceMaster.java | 32 ++++--- .../apache/giraph/master/MasterProgress.java | 90 +++++++++++++++++++ 7 files changed, 172 insertions(+), 21 deletions(-) create mode 100644 giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java index 369941fbb..e699bfb9a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java @@ -18,6 +18,7 @@ package org.apache.giraph.graph; +import org.apache.giraph.master.MasterProgress; import org.apache.giraph.worker.WorkerProgress; /** @@ -48,4 +49,8 @@ public void logFailure(String reason) { @Override public void updateProgress(WorkerProgress workerProgress) { } + + @Override + public void updateMasterProgress(MasterProgress masterProgress) { + } } diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java index 21204bdc1..a7ac0554b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java @@ -21,6 +21,7 @@ import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.job.ClientThriftServer; import org.apache.giraph.job.JobProgressTracker; +import org.apache.giraph.master.MasterProgress; import org.apache.giraph.worker.WorkerProgress; import org.apache.log4j.Logger; @@ -152,6 +153,16 @@ public void run() { }); } + @Override + public void updateMasterProgress(final MasterProgress masterProgress) { + executeWithRetry(new Runnable() { + @Override + public void run() { + jobProgressTracker.updateMasterProgress(masterProgress); + } + }); + } + /** * Execute Runnable, if disconnected try to connect again and retry * diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java index e26516305..8cc16ec3f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java @@ -20,6 +20,7 @@ import com.google.common.collect.Iterables; import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.master.MasterProgress; import org.apache.giraph.worker.WorkerProgress; import org.apache.giraph.worker.WorkerProgressStats; import org.apache.hadoop.conf.Configuration; @@ -71,15 +72,19 @@ public class CombinedWorkerProgress extends WorkerProgressStats { private int minGraphPercentageInMemory = 100; /** Id of the worker with min percentage of graph in memory */ private int workerWithMinGraphPercentageInMemory = -1; + /** Master progress */ + private MasterProgress masterProgress; /** * Constructor * * @param workerProgresses Worker progresses to combine + * @param masterProgress Master progress * @param conf Configuration */ public CombinedWorkerProgress(Iterable workerProgresses, - Configuration conf) { + MasterProgress masterProgress, Configuration conf) { + this.masterProgress = masterProgress; normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf); for (WorkerProgress workerProgress : workerProgresses) { if (workerProgress.getCurrentSuperstep() > currentSuperstep) { @@ -151,11 +156,26 @@ public String toString() { sb.append("Data from ").append(workersInSuperstep).append(" workers - "); if (isInputSuperstep()) { sb.append("Loading data: "); - sb.append(verticesLoaded).append(" vertices loaded, "); - sb.append(vertexInputSplitsLoaded).append( - " vertex input splits loaded; "); - sb.append(edgesLoaded).append(" edges loaded, "); - sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded"); + if (!masterProgress.vertexInputSplitsSet() || + masterProgress.getVertexInputSplitCount() > 0) { + sb.append(verticesLoaded).append(" vertices loaded, "); + sb.append(vertexInputSplitsLoaded).append( + " vertex input splits loaded"); + if (masterProgress.getVertexInputSplitCount() > 0) { + sb.append(" (out of ").append( + masterProgress.getVertexInputSplitCount()).append(")"); + } + sb.append("; "); + } + if (!masterProgress.edgeInputSplitsSet() || + masterProgress.getEdgeInputSplitsCount() > 0) { + sb.append(edgesLoaded).append(" edges loaded, "); + sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded"); + if (masterProgress.getEdgeInputSplitsCount() > 0) { + sb.append(" (out of ").append( + masterProgress.getEdgeInputSplitsCount()).append(")"); + } + } } else if (isComputeSuperstep()) { sb.append("Compute superstep ").append(currentSuperstep).append(": "); sb.append(verticesComputed).append(" out of ").append( diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java index 9e836dc75..ccd0fbacc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java @@ -20,6 +20,7 @@ import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.master.MasterProgress; import org.apache.giraph.utils.ThreadUtils; import org.apache.giraph.worker.WorkerProgress; import org.apache.hadoop.mapreduce.Job; @@ -28,6 +29,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * Default implementation of JobProgressTrackerService @@ -55,6 +57,9 @@ public class DefaultJobProgressTrackerService /** Map of worker progresses */ private final Map workerProgresses = new ConcurrentHashMap<>(); + /** Master progress */ + private final AtomicReference masterProgress = + new AtomicReference<>(new MasterProgress()); /** Job */ private Job job; @@ -81,7 +86,8 @@ public void run() { !workerProgresses.isEmpty()) { // Combine and log CombinedWorkerProgress combinedWorkerProgress = - new CombinedWorkerProgress(workerProgresses.values(), conf); + new CombinedWorkerProgress(workerProgresses.values(), + masterProgress.get(), conf); if (LOG.isInfoEnabled()) { LOG.info(combinedWorkerProgress.toString()); } @@ -172,6 +178,11 @@ public void updateProgress(WorkerProgress workerProgress) { workerProgresses.put(workerProgress.getTaskId(), workerProgress); } + @Override + public void updateMasterProgress(MasterProgress masterProgress) { + this.masterProgress.set(masterProgress); + } + @Override public void stop(boolean succeeded) { finished = true; diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java index 3041d087c..92e35b8ef 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -20,6 +20,8 @@ import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; + +import org.apache.giraph.master.MasterProgress; import org.apache.giraph.worker.WorkerProgress; /** @@ -64,5 +66,13 @@ public interface JobProgressTracker { */ @ThriftMethod void updateProgress(WorkerProgress workerProgress); + + /** + * Master should call this method to update its progress + * + * @param masterProgress Progress of the master + */ + @ThriftMethod + void updateMasterProgress(MasterProgress masterProgress); } diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index da1374aea..6c930cde0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -659,24 +659,28 @@ public int createMappingInputSplits() { @Override public int createVertexInputSplits() { - // Short-circuit if there is no vertex input format - if (!getConfiguration().hasVertexInputFormat()) { - return 0; - } - VertexInputFormat vertexInputFormat = - getConfiguration().createWrappedVertexInputFormat(); - return createInputSplits(vertexInputFormat, InputType.VERTEX); + int splits = 0; + if (getConfiguration().hasVertexInputFormat()) { + VertexInputFormat vertexInputFormat = + getConfiguration().createWrappedVertexInputFormat(); + splits = createInputSplits(vertexInputFormat, InputType.VERTEX); + } + MasterProgress.get().setVertexInputSplitCount(splits); + getJobProgressTracker().updateMasterProgress(MasterProgress.get()); + return splits; } @Override public int createEdgeInputSplits() { - // Short-circuit if there is no edge input format - if (!getConfiguration().hasEdgeInputFormat()) { - return 0; - } - EdgeInputFormat edgeInputFormat = - getConfiguration().createWrappedEdgeInputFormat(); - return createInputSplits(edgeInputFormat, InputType.EDGE); + int splits = 0; + if (getConfiguration().hasEdgeInputFormat()) { + EdgeInputFormat edgeInputFormat = + getConfiguration().createWrappedEdgeInputFormat(); + splits = createInputSplits(edgeInputFormat, InputType.EDGE); + } + MasterProgress.get().setEdgeInputSplitsCount(splits); + getJobProgressTracker().updateMasterProgress(MasterProgress.get()); + return splits; } @Override diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java new file mode 100644 index 000000000..89dc4d3a3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +/** + * Stores information about master progress + */ +@ThriftStruct +public final class MasterProgress { + /** Singleton instance for everyone to use */ + private static final MasterProgress INSTANCE = new MasterProgress(); + + /** How many vertex input splits were created */ + private int vertexInputSplitCount = -1; + /** How many edge input splits were created */ + private int edgeInputSplitCount = -1; + + /** + * Public constructor for thrift to create us. + * Please use MasterProgress.get() to get the static instance. + */ + public MasterProgress() { + } + + /** + * Get singleton instance of MasterProgress. + * + * @return MasterProgress singleton instance + */ + public static MasterProgress get() { + return INSTANCE; + } + + @ThriftField(1) + public int getVertexInputSplitCount() { + return vertexInputSplitCount; + } + + @ThriftField + public void setVertexInputSplitCount(int vertexInputSplitCount) { + this.vertexInputSplitCount = vertexInputSplitCount; + } + + @ThriftField(2) + public int getEdgeInputSplitsCount() { + return edgeInputSplitCount; + } + + @ThriftField + public void setEdgeInputSplitsCount(int edgeInputSplitCount) { + this.edgeInputSplitCount = edgeInputSplitCount; + } + + /** + * Whether or not number of vertex input splits was set yet + * + * @return True iff it was set + */ + public boolean vertexInputSplitsSet() { + return vertexInputSplitCount != -1; + } + + /** + * Whether or not number of edge input splits was set yet + * + * @return True iff it was set + */ + public boolean edgeInputSplitsSet() { + return edgeInputSplitCount != -1; + } +}