Skip to content
Permalink
Browse files
GIRAPH-1226
closes #112
  • Loading branch information
dlogothetis committed Nov 6, 2019
1 parent 3ad8429 commit 19f76a33cb081b8b91afc24b78e4f265fc3e94c8
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 10 deletions.
@@ -49,8 +49,10 @@
import org.apache.giraph.graph.DefaultVertex;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueCombiner;
import org.apache.giraph.graph.JobProgressTrackerClient;
import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.MapperObserver;
import org.apache.giraph.graph.RetryableJobProgressTrackerClient;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueCombiner;
@@ -1201,9 +1203,17 @@ public interface GiraphConstants {
new BooleanConfOption("giraph.trackJobProgressOnClient", false,
"Whether to track job progress on client or not");

/** Class to use as the job progress client */
ClassConfOption<JobProgressTrackerClient> JOB_PROGRESS_TRACKER_CLIENT_CLASS =
ClassConfOption.create("giraph.jobProgressTrackerClientClass",
RetryableJobProgressTrackerClient.class,
JobProgressTrackerClient.class,
"Class to use to make calls to the job progress tracker service");

/** Class to use to track job progress on client */
ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS =
ClassConfOption.create("giraph.jobProgressTrackerClass",
ClassConfOption<JobProgressTrackerService>
JOB_PROGRESS_TRACKER_SERVICE_CLASS =
ClassConfOption.create("giraph.jobProgressTrackerServiceClass",
DefaultJobProgressTrackerService.class,
JobProgressTrackerService.class,
"Class to use to track job progress on client");
@@ -28,7 +28,6 @@
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.sun.management.GarbageCollectionNotificationInfo;
@@ -287,12 +286,15 @@ private void initializeJobProgressTracker() {
if (!conf.trackJobProgressOnClient()) {
jobProgressTracker = new JobProgressTrackerClientNoOp();
} else {
jobProgressTracker =
GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
try {
jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
} catch (InterruptedException | ExecutionException e) {
LOG.warn("createJobProgressClient: Exception occurred while trying to" +
" connect to JobProgressTracker - not reporting progress", e);
jobProgressTracker = new JobProgressTrackerClientNoOp();
jobProgressTracker.init(conf);
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
throw new RuntimeException(
"Failed to initialize JobProgressTrackerClient", e);
}
}
jobProgressTracker.mapperStarted();
@@ -18,6 +18,7 @@

package org.apache.giraph.graph;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.JobProgressTracker;

import java.io.IOException;
@@ -30,4 +31,11 @@
public interface JobProgressTrackerClient extends JobProgressTracker {
/** Close the connections if any */
void cleanup() throws IOException;

/**
* Initialize the client.
* @param conf Job configuration
* @throws Exception
*/
void init(GiraphConfiguration conf) throws Exception;
}
@@ -18,6 +18,7 @@

package org.apache.giraph.graph;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;

@@ -30,6 +31,10 @@ public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient {
public void cleanup() {
}

@Override
public void init(GiraphConfiguration conf) {
}

@Override
public void mapperStarted() {
}
@@ -63,7 +63,7 @@
private static final Logger LOG =
Logger.getLogger(RetryableJobProgressTrackerClient.class);
/** Configuration */
private final GiraphConfiguration conf;
private GiraphConfiguration conf;
/** Thrift client manager to use to connect to job progress tracker */
private ThriftClientManager clientManager;
/** Job progress tracker */
@@ -73,6 +73,13 @@
/** Cached value for wait time between retries */
private int retryWaitMsec;

/**
* Default constructor. Typically once an instance is created it should be
* initialized by calling {@link #init(GiraphConfiguration)}.
*/
public RetryableJobProgressTrackerClient() {
}

/**
* Constructor
*
@@ -86,6 +93,14 @@ public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
resetConnection();
}

@Override
public void init(GiraphConfiguration conf) throws Exception {
this.conf = conf;
numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
resetConnection();
}

/**
* Try to establish new connection to JobProgressTracker
*/
@@ -270,7 +270,7 @@ public static JobProgressTrackerService createJobProgressTrackerService(
}

JobProgressTrackerService jobProgressTrackerService =
GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
jobProgressTrackerService.init(conf, jobObserver);
return jobProgressTrackerService;
}

0 comments on commit 19f76a3

Please sign in to comment.