Skip to content
Permalink
Browse files
GIRAPH-1162
closes #51
  • Loading branch information
Yuksel Akinci authored and Maja Kabiljo committed Oct 2, 2017
1 parent 3bbac90 commit 83d06d95dd70db6e3340afead3bd32fa58d9a396
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
@@ -1181,6 +1181,15 @@ public interface GiraphConstants {
JobProgressTrackerService.class,
"Class to use to track job progress on client");

/**
* Minimum number of vertices to compute before adding to worker progress.
*/
LongConfOption VERTICES_TO_UPDATE_PROGRESS =
new LongConfOption("giraph.VerticesToUpdateProgress", 100000,
"Minimum number of vertices to compute before " +
"updating worker progress");


/** Number of retries for creating the HDFS files */
IntConfOption HDFS_FILE_CREATION_RETRIES =
new IntConfOption("giraph.hdfs.file.creation.retries", 10,
@@ -26,6 +26,7 @@
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
import org.apache.giraph.io.SimpleVertexWriter;
@@ -78,7 +79,7 @@
/** Class time object */
private static final Time TIME = SystemTime.get();
/** How often to update WorkerProgress */
private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
private final long verticesToUpdateProgress;
/** Context */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state */
@@ -140,6 +141,8 @@ public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
metrics.getUniformHistogram("wait-per-thread-ms");
histogramProcessingTimePerThread =
metrics.getUniformHistogram("processing-per-thread-ms");
verticesToUpdateProgress =
GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(configuration);
}

@Override
@@ -278,11 +281,12 @@ private PartitionStats computePartition(
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
final LongRef verticesComputedProgress = new LongRef(0);

Progressable verticesProgressable = new Progressable() {
@Override
public void progress() {
verticesComputedProgress.value++;
if (verticesComputedProgress.value == VERTICES_TO_UPDATE_PROGRESS) {
if (verticesComputedProgress.value == verticesToUpdateProgress) {
WorkerProgress.get().addVerticesComputed(
verticesComputedProgress.value);
verticesComputedProgress.value = 0;

0 comments on commit 83d06d9

Please sign in to comment.