Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAPREDUCE-7470: multi-thread mapreduce committer #6469

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,11 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -100,12 +105,22 @@ public class FileOutputCommitter extends PathOutputCommitter {
public static final boolean
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;

public static final String FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM = "mapreduce.fileoutputcommitter.jobcommit.thread.io.num";

public static final int FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM_DEFAULT = 50;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this as a factor of the availableProcessors to the JVM. This would keep excessive context switching in check.


private Lock commitJobLock = new ReentrantLock();

private Path outputPath = null;
private Path workPath = null;
private final int algorithmVersion;
private final boolean skipCleanup;
private final boolean ignoreCleanupFailures;

private final int commitJobThreadIONum;

private volatile boolean commitJobEncounterException = false;

/**
* Create a file output committer
* @param outputPath the job's output path, or null if you want the output
Expand Down Expand Up @@ -140,8 +155,8 @@ public FileOutputCommitter(Path outputPath,
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
if (algorithmVersion != 1 && algorithmVersion != 2) {
throw new IOException("Only 1 or 2 algorithm version is supported");
if (algorithmVersion != 1 && algorithmVersion != 2 && algorithmVersion != 3) {
throw new IOException("Only 1, 2 or 3 algorithm version is supported");
}

// if skip cleanup
Expand All @@ -154,6 +169,10 @@ public FileOutputCommitter(Path outputPath,
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED,
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT);

commitJobThreadIONum = conf.getInt(
FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM,
FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM_DEFAULT);

LOG.info("FileOutputCommitter skip cleanup _temporary folders under " +
"output directory:" + skipCleanup + ", ignore cleanup failures: " +
ignoreCleanupFailures);
Expand Down Expand Up @@ -404,6 +423,37 @@ protected void commitJobInternal(JobContext context) throws IOException {
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput, context);
}
} else if (algorithmVersion == 3) {
ExecutorService pool = Executors.newFixedThreadPool(commitJobThreadIONum);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a common executorService per object. That would save overhead of creating new threads each time this method is called.

final List<Future<Void>> futures = new LinkedList<>();
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
if (!commitJobEncounterException) {
futures.add(pool.submit(() -> {
try {
mergePaths(fs, stat, finalOutput, context);
return null;
} catch (Exception e) {
commitJobEncounterException = true;
throw e;
}
}));
} else {
LOG.error("commitJobEncounterException, break.");
break;
}
}
pool.shutdown();
try {
for (Future future : futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Cancelling " + futures.size() + " commit job tasks");
for (Future future : futures) {
future.cancel(true);
}
throw new IOException("Exception when commit job", e);
}
}

if (skipCleanup) {
Expand Down Expand Up @@ -509,6 +559,28 @@ private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
} else if (algorithmVersion == 3) {
boolean rename = false;
if (!fs.exists(to)) {
commitJobLock.lock();
try {
if (!fs.exists(to)) { // double check

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking if we should not have this double check. Reason being, between line 567 and 568, there could be some other parallel operation which can create to. So, we cannot really ensure if its going to be to will not be there while renaming. Now at the instant of renaming, two things can happen:

  1. to path is not there
  2. to path is there and is a directory
  3. to path is there and is a file

Now, in rename, it will give false if to path is there and is a file (ref: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=HDFS%20%3A%20The%20rename%20fails%2C%20no%20exception%20is%20raised.%20Instead%20the%20method%20call%20simply%20returns%20false), or it will give true. Now two things can happen:

  1. to was not existing before rename -> after rename to path will be made.
  2. to was an existing dir before rename -> after rename to/from will get created (ref: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=If%20the%20destination%20exists%20and%20is%20a%20directory%2C%20the%20final%20destination%20of%20the%20rename%20becomes%20the%20destination%20%2B%20the%20filename%20of%20the%20source%20path.).

Proposal is to:

  1. let rename happen
  2. if rename happens, dont do anything more. Reason being, either to initially was not there or to was directory, then rename would have done similar to what is done on line 578 to 582

if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
rename = true;
}
} finally {
commitJobLock.unlock();
}
}

if (!rename) {
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo, context);
}
}
} else {
fs.mkdirs(to);
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Expand Down Expand Up @@ -749,6 +821,9 @@ public String toString() {
sb.append(", algorithmVersion=").append(algorithmVersion);
sb.append(", skipCleanup=").append(skipCleanup);
sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures);
if (algorithmVersion == 3) {
sb.append(", commitJobThreadIONum=").append(commitJobThreadIONum);
}
sb.append('}');
return sb.toString();
}
Expand Down