Skip to content

Commit

Permalink
HADOOP-5850. Fixes a problem to do with not being able to jobs with 0…
Browse files Browse the repository at this point in the history
… maps/reduces. Contributed by Vinod K V.

git-svn-id: https://svn.eu.apache.org/repos/asf/hadoop/core/branches/branch-0.20@777575 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Devaraj Das committed May 22, 2009
1 parent 3078ca3 commit b882c18
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 197 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ Release 0.20.1 - Unreleased
HADOOP-5210. Solves a problem in the progress report of the reduce task.
(Ravi Gummadi via ddas)

HADOOP-5850. Fixes a problem to do with not being able to jobs with
0 maps/reduces. (Vinod K V via ddas)

Release 0.20.0 - 2009-04-15

INCOMPATIBLE CHANGES
Expand Down
78 changes: 32 additions & 46 deletions src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public synchronized void initTasks() throws IOException {
jobInitKillStatus.initStarted = true;
}

LOG.debug("initializing " + this.jobId);
LOG.info("Initializing " + jobId);

// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
Expand Down Expand Up @@ -437,38 +437,15 @@ public synchronized void initTasks() throws IOException {
splits[i],
jobtracker, conf, this, i);
}
LOG.info("Input size for job "+ jobId + " = " + inputLength);
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
if (numMapTasks > 0) {
LOG.info("Split info for job:" + jobId + " with " +
splits.length + " splits:");
nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = System.currentTimeMillis();

// if no split is returned, job is considered completed and successful
if (numMapTasks == 0) {
// Finished time need to be setted here to prevent this job to be retired
// from the job tracker jobs at the next retire iteration.
this.finishTime = this.launchTime;
status.setSetupProgress(1.0f);
status.setMapProgress(1.0f);
status.setReduceProgress(1.0f);
status.setCleanupProgress(1.0f);
status.setRunState(JobStatus.SUCCEEDED);
tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(),
this.launchTime, 0, 0);
JobHistory.JobInfo.logFinished(profile.getJobID(),
this.finishTime, 0, 0, 0, 0,
getCounters());
// Special case because the Job is not queued
JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());

return;
}

//
// Create reduce tasks
//
Expand All @@ -490,9 +467,11 @@ public synchronized void initTasks() throws IOException {

// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map is doesn't use split.
// Just assign splits[0]
cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
JobClient.RawSplit emptySplit = new JobClient.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();

Expand All @@ -503,9 +482,10 @@ public synchronized void initTasks() throws IOException {

// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
// setup map tip. This map is doesn't use split.
// Just assign splits[0]
setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();

Expand Down Expand Up @@ -898,20 +878,11 @@ else if (state == TaskStatus.State.FAILED ||
if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
double progressDelta = tip.getProgress() - oldProgress;
if (tip.isMapTask()) {
if (maps.length == 0) {
this.status.setMapProgress(1.0f);
} else {
this.status.setMapProgress((float) (this.status.mapProgress() +
progressDelta / maps.length));
}
} else {
if (reduces.length == 0) {
this.status.setReduceProgress(1.0f);
} else {
this.status.setReduceProgress
((float) (this.status.reduceProgress() +
(progressDelta / reduces.length)));
}
this.status.setReduceProgress((float) (this.status.reduceProgress() +
(progressDelta / reduces.length)));
}
}
}
Expand Down Expand Up @@ -1138,8 +1109,10 @@ private synchronized boolean canLaunchJobCleanupTask() {
status.getRunState() != JobStatus.PREP) {
return false;
}
// check if cleanup task has been launched already.
if (launchedCleanup) {
// check if cleanup task has been launched already or if setup isn't
// launched already. The later check is useful when number of maps is
// zero.
if (launchedCleanup || !isSetupFinished()) {
return false;
}
// check if job has failed or killed
Expand Down Expand Up @@ -1173,7 +1146,6 @@ public Task obtainJobSetupTask(TaskTrackerStatus tts,
if (!canLaunchSetupTask()) {
return null;
}

String taskTracker = tts.getTrackerName();
// Update the last-known clusterSize
this.clusterSize = clusterSize;
Expand Down Expand Up @@ -2121,6 +2093,12 @@ private void jobComplete() {
if (this.status.getRunState() == JobStatus.RUNNING ) {
this.status.setRunState(JobStatus.SUCCEEDED);
this.status.setCleanupProgress(1.0f);
if (maps.length == 0) {
this.status.setMapProgress(1.0f);
}
if (reduces.length == 0) {
this.status.setReduceProgress(1.0f);
}
this.finishTime = System.currentTimeMillis();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
Expand Down Expand Up @@ -2438,6 +2416,14 @@ void killSetupTip(boolean isMap) {
}
}

boolean isSetupFinished() {
if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
|| setup[1].isFailed()) {
return true;
}
return false;
}

/**
* Fail a task with a given reason, but without a status object.
*
Expand Down
26 changes: 18 additions & 8 deletions src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.net.Node;
Expand Down Expand Up @@ -729,7 +730,10 @@ public void completed(TaskAttemptID taskid) {
* Get the split locations
*/
public String[] getSplitLocations() {
return rawSplit.getLocations();
if (isMapTask() && !jobSetup && !jobCleanup) {
return rawSplit.getLocations();
}
return new String[0];
}

/**
Expand Down Expand Up @@ -913,12 +917,18 @@ public Task addRunningTask(TaskAttemptID taskid,
boolean taskCleanup) {
// create the task
Task t = null;
if (isMapTask()) {
LOG.debug("attempt "+ numTaskFailures +
" sending skippedRecords "+failedRanges.getIndicesCount());
t = new MapTask(jobFile, taskid, partition,
rawSplit.getClassName(), rawSplit.getBytes());
} else {
if (isMapTask() && !jobSetup && !jobCleanup) {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());

t =
new MapTask(jobFile, taskid, partition, rawSplit.getClassName(),
rawSplit.getBytes());

} else if (jobSetup || jobCleanup) {
t = new MapTask(jobFile, taskid, partition, null, new BytesWritable());
}
else {
t = new ReduceTask(jobFile, taskid, partition, numMaps);
}
if (jobCleanup) {
Expand Down Expand Up @@ -1027,7 +1037,7 @@ public int getSuccessEventNumber() {
* Gets the Node list of input split locations sorted in rack order.
*/
public String getSplitNodes() {
if ( rawSplit == null) {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
String[] splits = rawSplit.getLocations();
Expand Down
175 changes: 175 additions & 0 deletions src/test/org/apache/hadoop/mapred/TestEmptyJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import junit.framework.TestCase;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

/**
* A JUnit test to test Map-Reduce empty jobs.
*/
public class TestEmptyJob extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestEmptyJob.class.getName());

private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp")).toURI()
.toString().replace(' ', '+');

MiniMRCluster mr = null;

/**
* Simple method running a MapReduce job with no input data. Used to test that
* such a job is successful.
*
* @param fileSys
* @param numMaps
* @param numReduces
* @return true if the MR job is successful, otherwise false
* @throws IOException
*/
private boolean launchEmptyJob(URI fileSys, int numMaps, int numReduces)
throws IOException {
// create an empty input dir
final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
JobConf conf = mr.createJobConf();
FileSystem fs = FileSystem.get(fileSys, conf);
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
LOG.warn("Can't create " + inDir);
return false;
}

// use WordCount example
FileSystem.setDefaultUri(conf, fileSys);
conf.setJobName("empty");
// use an InputFormat which returns no split
conf.setInputFormat(EmptyInputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);

// run job and wait for completion
JobClient jc = new JobClient(conf);
RunningJob runningJob = jc.submitJob(conf);
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (runningJob.isComplete()) {
break;
}
}

assertTrue(runningJob.isComplete());
assertTrue(runningJob.isSuccessful());
JobID jobID = runningJob.getID();

TaskReport[] jobSetupTasks = jc.getSetupTaskReports(jobID);
assertTrue("Number of job-setup tips is not 2!", jobSetupTasks.length == 2);
assertTrue("Setup progress is " + runningJob.setupProgress()
+ " and not 1.0", runningJob.setupProgress() == 1.0);
assertTrue("Setup task is not finished!", mr.getJobTrackerRunner()
.getJobTracker().getJob(jobID).isSetupFinished());

assertTrue("Number of maps is not zero!", jc.getMapTaskReports(runningJob
.getID()).length == 0);
assertTrue(
"Map progress is " + runningJob.mapProgress() + " and not 1.0!",
runningJob.mapProgress() == 1.0);

assertTrue("Reduce progress is " + runningJob.reduceProgress()
+ " and not 1.0!", runningJob.reduceProgress() == 1.0);
assertTrue("Number of reduces is not " + numReduces, jc
.getReduceTaskReports(runningJob.getID()).length == numReduces);

TaskReport[] jobCleanupTasks = jc.getCleanupTaskReports(jobID);
assertTrue("Number of job-cleanup tips is not 2!",
jobCleanupTasks.length == 2);
assertTrue("Cleanup progress is " + runningJob.cleanupProgress()
+ " and not 1.0", runningJob.cleanupProgress() == 1.0);

assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
assertTrue("Number of part-files is " + list.length + " and not "
+ numReduces, list.length == numReduces);

// cleanup
fs.delete(outDir, true);

// return job result
LOG.info("job is complete: " + runningJob.isSuccessful());
return (runningJob.isSuccessful());
}

/**
* Test that a job with no input data (and thus with no input split and no map
* task to execute) is successful.
*
* @throws IOException
*/
public void testEmptyJob()
throws IOException {
FileSystem fileSys = null;
try {
final int taskTrackers = 1;
JobConf conf = new JobConf();
fileSys = FileSystem.get(conf);

conf.set("mapred.job.tracker.handler.count", "1");
conf.set("mapred.job.tracker", "127.0.0.1:0");
conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");

mr =
new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
null, null, conf);

assertTrue(launchEmptyJob(fileSys.getUri(), 3, 1));
assertTrue(launchEmptyJob(fileSys.getUri(), 0, 0));
} finally {
if (fileSys != null) {
fileSys.close();
}
if (mr != null) {
mr.shutdown();
}
}
}
}
Loading

0 comments on commit b882c18

Please sign in to comment.