Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
GIRAPH-1065: Allow extending JobProgressTrackerService
Browse files Browse the repository at this point in the history
Summary: We might want to perform additional actions on events from JobProgressTrackerService. Allow overriding it and specifying another class to use.

Test Plan: Ran a job with custom JobProgressTrackerService and verify actions on it are called

Differential Revision: https://reviews.facebook.net/D58383
  • Loading branch information
Maja Kabiljo committed May 19, 2016
1 parent 608d506 commit 9b6d6f9
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 182 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -31,3 +31,6 @@ failed-profile.txt


/for-each-profile-results.txt /for-each-profile-results.txt
/giraph-hive/derby.log /giraph-hive/derby.log

# File with all giraph conf options
/src/site/xdoc/options.xml
Expand Up @@ -60,9 +60,11 @@
import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.job.DefaultGiraphJobRetryChecker; import org.apache.giraph.job.DefaultGiraphJobRetryChecker;
import org.apache.giraph.job.DefaultJobObserver; import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.DefaultJobProgressTrackerService;
import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.job.GiraphJobRetryChecker;
import org.apache.giraph.job.HaltApplicationUtils; import org.apache.giraph.job.HaltApplicationUtils;
import org.apache.giraph.job.JobProgressTrackerService;
import org.apache.giraph.mapping.MappingStore; import org.apache.giraph.mapping.MappingStore;
import org.apache.giraph.mapping.MappingStoreOps; import org.apache.giraph.mapping.MappingStoreOps;
import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.mapping.translate.TranslateEdge;
Expand Down Expand Up @@ -1170,6 +1172,13 @@ public interface GiraphConstants {
new BooleanConfOption("giraph.trackJobProgressOnClient", false, new BooleanConfOption("giraph.trackJobProgressOnClient", false,
"Whether to track job progress on client or not"); "Whether to track job progress on client or not");


/** Class to use to track job progress on client */
ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS =
ClassConfOption.create("giraph.jobProgressTrackerClass",
DefaultJobProgressTrackerService.class,
JobProgressTrackerService.class,
"Class to use to track job progress on client");

/** Number of retries for creating the HDFS files */ /** Number of retries for creating the HDFS files */
IntConfOption HDFS_FILE_CREATION_RETRIES = IntConfOption HDFS_FILE_CREATION_RETRIES =
new IntConfOption("giraph.hdfs.file.creation.retries", 10, new IntConfOption("giraph.hdfs.file.creation.retries", 10,
Expand Down
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.job;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Default implementation of JobProgressTrackerService
*/
public class DefaultJobProgressTrackerService
implements JobProgressTrackerService {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(JobProgressTrackerService.class);
/** How often to print job's progress */
private static final int UPDATE_MILLISECONDS = 10 * 1000;

/** Configuration */
private GiraphConfiguration conf;
/** Giraph job callback */
private GiraphJobObserver jobObserver;
/** Thread which periodically writes job's progress */
private Thread writerThread;
/** Whether application is finished */
private volatile boolean finished = false;
/** Number of mappers which the job got */
private int mappersStarted;
/** Last time number of mappers started was logged */
private long lastTimeMappersStartedLogged;
/** Map of worker progresses */
private final Map<Integer, WorkerProgress> workerProgresses =
new ConcurrentHashMap<>();
/** Job */
private Job job;

@Override
public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
this.conf = conf;
this.jobObserver = jobObserver;

if (LOG.isInfoEnabled()) {
LOG.info("Waiting for job to start... (this may take a minute)");
}
startWriterThread();
}

/**
* Start the thread which writes progress periodically
*/
private void startWriterThread() {
writerThread = new Thread(new Runnable() {
@Override
public void run() {
while (!finished) {
if (mappersStarted == conf.getMaxWorkers() + 1 &&
!workerProgresses.isEmpty()) {
// Combine and log
CombinedWorkerProgress combinedWorkerProgress =
new CombinedWorkerProgress(workerProgresses.values(), conf);
if (LOG.isInfoEnabled()) {
LOG.info(combinedWorkerProgress.toString());
}
// Check if application is done
if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
break;
}
}
try {
Thread.sleep(UPDATE_MILLISECONDS);
} catch (InterruptedException e) {
if (LOG.isInfoEnabled()) {
LOG.info("Progress thread interrupted");
}
break;
}
}
}
});
writerThread.setDaemon(true);
writerThread.start();
}

@Override
public void setJob(Job job) {
this.job = job;
}

/**
* Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
* and potentially start a thread which will kill the job after this time
*/
private void jobGotAllMappers() {
jobObserver.jobGotAllMappers(job);
final long maxAllowedJobTimeMs =
GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
if (maxAllowedJobTimeMs > 0) {
// Start a thread which will kill the job if running for too long
Thread killThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(maxAllowedJobTimeMs);
try {
LOG.warn("Killing job because it took longer than " +
maxAllowedJobTimeMs + " milliseconds");
job.killJob();
} catch (IOException e) {
LOG.warn("Failed to kill job", e);
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Thread checking for jobs max allowed time " +
"interrupted");
}
}
}
});
killThread.setDaemon(true);
killThread.start();
}
}

@Override
public synchronized void mapperStarted() {
mappersStarted++;
if (LOG.isInfoEnabled()) {
if (mappersStarted == conf.getMaxWorkers() + 1) {
LOG.info("Got all " + mappersStarted + " mappers");
jobGotAllMappers();
} else {
if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
UPDATE_MILLISECONDS) {
lastTimeMappersStartedLogged = System.currentTimeMillis();
LOG.info("Got " + mappersStarted + " but needs " +
(conf.getMaxWorkers() + 1) + " mappers");
}
}
}
}

@Override
public void logInfo(String logLine) {
if (LOG.isInfoEnabled()) {
LOG.info(logLine);
}
}

@Override
public void logError(String logLine) {
LOG.error(logLine);
}

@Override
public void logFailure(String reason) {
LOG.fatal(reason);
finished = true;
writerThread.interrupt();
}

@Override
public void updateProgress(WorkerProgress workerProgress) {
workerProgresses.put(workerProgress.getTaskId(), workerProgress);
}

@Override
public void stop(boolean succeeded) {
finished = true;
writerThread.interrupt();
if (LOG.isInfoEnabled()) {
LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
", cleaning up...");
}
}

/**
* Create job progress server on job client if enabled in configuration.
*
* @param conf Configuration
* @param jobObserver Giraph job callbacks
* @return JobProgressTrackerService
*/
public static JobProgressTrackerService createJobProgressTrackerService(
GiraphConfiguration conf, GiraphJobObserver jobObserver) {
if (!conf.trackJobProgressOnClient()) {
return null;
}

JobProgressTrackerService jobProgressTrackerService =
GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
jobProgressTrackerService.init(conf, jobObserver);
return jobProgressTrackerService;
}
}
Expand Up @@ -241,7 +241,7 @@ public final boolean run(boolean verbose)
GiraphJobObserver jobObserver = conf.getJobObserver(); GiraphJobObserver jobObserver = conf.getJobObserver();


JobProgressTrackerService jobProgressTrackerService = JobProgressTrackerService jobProgressTrackerService =
JobProgressTrackerService.createJobProgressTrackerService( DefaultJobProgressTrackerService.createJobProgressTrackerService(
conf, jobObserver); conf, jobObserver);
ClientThriftServer clientThriftServer = null; ClientThriftServer clientThriftServer = null;
if (jobProgressTrackerService != null) { if (jobProgressTrackerService != null) {
Expand Down

0 comments on commit 9b6d6f9

Please sign in to comment.