New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAPREDUCE-7470: multi-thread mapreduce committer #6469
base: trunk
Are you sure you want to change the base?
Conversation
💔 -1 overall
This message was automatically generated. |
@lastbus Thanks for the contribution! we need to fix the checkstyle issue. |
@@ -404,6 +423,37 @@ protected void commitJobInternal(JobContext context) throws IOException { | |||
for (FileStatus stat: getAllCommittedTaskPaths(context)) { | |||
mergePaths(fs, stat, finalOutput, context); | |||
} | |||
} else if (algorithmVersion == 3) { | |||
ExecutorService pool = Executors.newFixedThreadPool(commitJobThreadIONum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a common executorService per object. That would save overhead of creating new threads each time this method is called.
@@ -100,12 +105,22 @@ public class FileOutputCommitter extends PathOutputCommitter { | |||
public static final boolean | |||
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; | |||
|
|||
public static final String FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM = "mapreduce.fileoutputcommitter.jobcommit.thread.io.num"; | |||
|
|||
public static final int FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM_DEFAULT = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have this as a factor of the availableProcessors to the JVM. This would keep excessive context switching in check.
if (!fs.exists(to)) { | ||
commitJobLock.lock(); | ||
try { | ||
if (!fs.exists(to)) { // double check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking if we should not have this double check. Reason being, between line 567 and 568, there could be some other parallel operation which can create to
. So, we cannot really ensure if its going to be to
will not be there while renaming. Now at the instant of renaming, two things can happen:
to
path is not thereto
path is there and is a directoryto
path is there and is a file
Now, in rename, it will give false if to
path is there and is a file (ref: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=HDFS%20%3A%20The%20rename%20fails%2C%20no%20exception%20is%20raised.%20Instead%20the%20method%20call%20simply%20returns%20false), or it will give true. Now two things can happen:
to
was not existing before rename -> after renameto
path will be made.to
was an existing dir before rename -> after renameto/from
will get created (ref: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=If%20the%20destination%20exists%20and%20is%20a%20directory%2C%20the%20final%20destination%20of%20the%20rename%20becomes%20the%20destination%20%2B%20the%20filename%20of%20the%20source%20path.).
Proposal is to:
- let rename happen
- if rename happens, dont do anything more. Reason being, either
to
initially was not there orto
was directory, then rename would have done similar to what is done on line 578 to 582
Like I said on the jira, I don't want this. It has the same scale issues encountered on abfs as #6399 and #6378, the same correctness problems on GCS as v2, as in "incorrect task commit semantics" unless v1 commit can made to not rely on atomic directory rename, but instead "atomic file rename", which does work there.
Even if the store meets the v1 correctness pre-requisites I would like to see a comparison of the same job you have tested through the manifest committer. Ideally with any profiling to highlight where it could be improved. |
💔 -1 overall
This message was automatically generated. |
Description of PR
In cloud environment, such as aws, aliyun etc., the internet delay is non-trival when we commit thounds of files.
In our situation, the ping delay is about 0.03ms in IDC, but when move to Coud, the ping delay is about 3ms, which is roughly 100x slower. We found that, committing tens thounds of files will cost a few tens of minutes. The more files there are, the logger it takes.
So we propose a new committer algorithm, which is a variant of committer algorithm version 1, called 3. In this new algorithm 3, in order to decrease the committer time, we use a thread pool to commit job's final output.
Our test result in Cloud production shows that, the new algorithm 3 has decrease the committer time by serveral tens of times.
How was this patch tested?
For code changes: