Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
JIRA-1134
Browse files Browse the repository at this point in the history
closes #24
  • Loading branch information
Maja Kabiljo committed Mar 17, 2017
1 parent 3dbd158 commit a1d546f
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 21 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.giraph.graph;

import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;

/**
Expand Down Expand Up @@ -48,4 +49,8 @@ public void logFailure(String reason) {
@Override
public void updateProgress(WorkerProgress workerProgress) {
}

@Override
public void updateMasterProgress(MasterProgress masterProgress) {
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.ClientThriftServer;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -152,6 +153,16 @@ public void run() {
});
}

@Override
public void updateMasterProgress(final MasterProgress masterProgress) {
executeWithRetry(new Runnable() {
@Override
public void run() {
jobProgressTracker.updateMasterProgress(masterProgress);
}
});
}

/**
* Execute Runnable, if disconnected try to connect again and retry
*
Expand Down
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Iterables;
import org.apache.giraph.conf.FloatConfOption;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.worker.WorkerProgressStats;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -71,15 +72,19 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
private int minGraphPercentageInMemory = 100;
/** Id of the worker with min percentage of graph in memory */
private int workerWithMinGraphPercentageInMemory = -1;
/** Master progress */
private MasterProgress masterProgress;

/**
* Constructor
*
* @param workerProgresses Worker progresses to combine
* @param masterProgress Master progress
* @param conf Configuration
*/
public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
Configuration conf) {
MasterProgress masterProgress, Configuration conf) {
this.masterProgress = masterProgress;
normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
for (WorkerProgress workerProgress : workerProgresses) {
if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
Expand Down Expand Up @@ -151,11 +156,26 @@ public String toString() {
sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
if (isInputSuperstep()) {
sb.append("Loading data: ");
sb.append(verticesLoaded).append(" vertices loaded, ");
sb.append(vertexInputSplitsLoaded).append(
" vertex input splits loaded; ");
sb.append(edgesLoaded).append(" edges loaded, ");
sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
if (!masterProgress.vertexInputSplitsSet() ||
masterProgress.getVertexInputSplitCount() > 0) {
sb.append(verticesLoaded).append(" vertices loaded, ");
sb.append(vertexInputSplitsLoaded).append(
" vertex input splits loaded");
if (masterProgress.getVertexInputSplitCount() > 0) {
sb.append(" (out of ").append(
masterProgress.getVertexInputSplitCount()).append(")");
}
sb.append("; ");
}
if (!masterProgress.edgeInputSplitsSet() ||
masterProgress.getEdgeInputSplitsCount() > 0) {
sb.append(edgesLoaded).append(" edges loaded, ");
sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
if (masterProgress.getEdgeInputSplitsCount() > 0) {
sb.append(" (out of ").append(
masterProgress.getEdgeInputSplitsCount()).append(")");
}
}
} else if (isComputeSuperstep()) {
sb.append("Compute superstep ").append(currentSuperstep).append(": ");
sb.append(verticesComputed).append(" out of ").append(
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.hadoop.mapreduce.Job;
Expand All @@ -28,6 +29,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* Default implementation of JobProgressTrackerService
Expand Down Expand Up @@ -55,6 +57,9 @@ public class DefaultJobProgressTrackerService
/** Map of worker progresses */
private final Map<Integer, WorkerProgress> workerProgresses =
new ConcurrentHashMap<>();
/** Master progress */
private final AtomicReference<MasterProgress> masterProgress =
new AtomicReference<>(new MasterProgress());
/** Job */
private Job job;

Expand All @@ -81,7 +86,8 @@ public void run() {
!workerProgresses.isEmpty()) {
// Combine and log
CombinedWorkerProgress combinedWorkerProgress =
new CombinedWorkerProgress(workerProgresses.values(), conf);
new CombinedWorkerProgress(workerProgresses.values(),
masterProgress.get(), conf);
if (LOG.isInfoEnabled()) {
LOG.info(combinedWorkerProgress.toString());
}
Expand Down Expand Up @@ -172,6 +178,11 @@ public void updateProgress(WorkerProgress workerProgress) {
workerProgresses.put(workerProgress.getTaskId(), workerProgress);
}

@Override
public void updateMasterProgress(MasterProgress masterProgress) {
this.masterProgress.set(masterProgress);
}

@Override
public void stop(boolean succeeded) {
finished = true;
Expand Down
Expand Up @@ -20,6 +20,8 @@

import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;

import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;

/**
Expand Down Expand Up @@ -64,5 +66,13 @@ public interface JobProgressTracker {
*/
@ThriftMethod
void updateProgress(WorkerProgress workerProgress);

/**
* Master should call this method to update its progress
*
* @param masterProgress Progress of the master
*/
@ThriftMethod
void updateMasterProgress(MasterProgress masterProgress);
}

Expand Up @@ -659,24 +659,28 @@ public int createMappingInputSplits() {

@Override
public int createVertexInputSplits() {
// Short-circuit if there is no vertex input format
if (!getConfiguration().hasVertexInputFormat()) {
return 0;
}
VertexInputFormat<I, V, E> vertexInputFormat =
getConfiguration().createWrappedVertexInputFormat();
return createInputSplits(vertexInputFormat, InputType.VERTEX);
int splits = 0;
if (getConfiguration().hasVertexInputFormat()) {
VertexInputFormat<I, V, E> vertexInputFormat =
getConfiguration().createWrappedVertexInputFormat();
splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
}
MasterProgress.get().setVertexInputSplitCount(splits);
getJobProgressTracker().updateMasterProgress(MasterProgress.get());
return splits;
}

@Override
public int createEdgeInputSplits() {
// Short-circuit if there is no edge input format
if (!getConfiguration().hasEdgeInputFormat()) {
return 0;
}
EdgeInputFormat<I, E> edgeInputFormat =
getConfiguration().createWrappedEdgeInputFormat();
return createInputSplits(edgeInputFormat, InputType.EDGE);
int splits = 0;
if (getConfiguration().hasEdgeInputFormat()) {
EdgeInputFormat<I, E> edgeInputFormat =
getConfiguration().createWrappedEdgeInputFormat();
splits = createInputSplits(edgeInputFormat, InputType.EDGE);
}
MasterProgress.get().setEdgeInputSplitsCount(splits);
getJobProgressTracker().updateMasterProgress(MasterProgress.get());
return splits;
}

@Override
Expand Down
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.master;

import com.facebook.swift.codec.ThriftField;
import com.facebook.swift.codec.ThriftStruct;

/**
* Stores information about master progress
*/
@ThriftStruct
public final class MasterProgress {
/** Singleton instance for everyone to use */
private static final MasterProgress INSTANCE = new MasterProgress();

/** How many vertex input splits were created */
private int vertexInputSplitCount = -1;
/** How many edge input splits were created */
private int edgeInputSplitCount = -1;

/**
* Public constructor for thrift to create us.
* Please use MasterProgress.get() to get the static instance.
*/
public MasterProgress() {
}

/**
* Get singleton instance of MasterProgress.
*
* @return MasterProgress singleton instance
*/
public static MasterProgress get() {
return INSTANCE;
}

@ThriftField(1)
public int getVertexInputSplitCount() {
return vertexInputSplitCount;
}

@ThriftField
public void setVertexInputSplitCount(int vertexInputSplitCount) {
this.vertexInputSplitCount = vertexInputSplitCount;
}

@ThriftField(2)
public int getEdgeInputSplitsCount() {
return edgeInputSplitCount;
}

@ThriftField
public void setEdgeInputSplitsCount(int edgeInputSplitCount) {
this.edgeInputSplitCount = edgeInputSplitCount;
}

/**
* Whether or not number of vertex input splits was set yet
*
* @return True iff it was set
*/
public boolean vertexInputSplitsSet() {
return vertexInputSplitCount != -1;
}

/**
* Whether or not number of edge input splits was set yet
*
* @return True iff it was set
*/
public boolean edgeInputSplitsSet() {
return edgeInputSplitCount != -1;
}
}

0 comments on commit a1d546f

Please sign in to comment.