Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18793: S3A StagingCommitter does not clean up staging-uploads directory #5818

Merged
merged 1 commit into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ private static int getAppAttemptId(Configuration conf) {
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
}

/**
* Build a qualified parent path for the temporary multipart upload commit
* directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}.
* @param conf configuration defining default FS.
* @param uuid uuid of job
* @return a path which can be used for temporary work
* @throws IOException on an IO failure.
*/
public static Path getStagingUploadsParentDirectory(Configuration conf,
String uuid) throws IOException {
return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
}

/**
* Build a qualified temporary path for the multipart upload commit
* information in the cluster filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ public void cleanupStagingDirs() {

/**
* Staging committer cleanup includes calling wrapped committer's
* cleanup method, and removing all destination paths in the final
* filesystem.
* cleanup method, and removing staging uploads path and all
* destination paths in the final filesystem.
* @param commitContext commit context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException IO failures if exceptions are not suppressed.
Expand All @@ -515,6 +515,9 @@ protected void cleanup(CommitContext commitContext,
maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
() -> wrappedCommitter.cleanupJob(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete staging uploads path",
() -> deleteStagingUploadsParentDirectory(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete destination paths",
() -> deleteDestinationPaths(
commitContext.getJobContext()));
Expand Down Expand Up @@ -543,11 +546,26 @@ protected void abortJobInternal(CommitContext commitContext,
}
}

/**
* Delete the multipart upload staging directory.
* @param context job context
* @throws IOException IO failure
*/
protected void deleteStagingUploadsParentDirectory(JobContext context)
throws IOException {
Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
context.getConfiguration(), getUUID());
ignoreIOExceptions(LOG,
"Deleting staging uploads path", stagingUploadsPath.toString(),
() -> deleteWithWarning(
stagingUploadsPath.getFileSystem(getConf()),
stagingUploadsPath,
true));
}

/**
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
Expand All @@ -556,14 +574,6 @@ protected void abortJobInternal(CommitContext commitContext,
* @throws IOException IO failure
*/
protected void deleteDestinationPaths(JobContext context) throws IOException {
Path attemptPath = getJobAttemptPath(context);
ignoreIOExceptions(LOG,
"Deleting Job attempt Path", attemptPath.toString(),
() -> deleteWithWarning(
getJobAttemptFileSystem(context),
attemptPath,
true));
Comment on lines -559 to -565
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed these as attemptPath here is a sub dir under ${UUID}/staging-uploads, which has already been deleted in deleteStagingUploadsParentDirectory().


// delete the __temporary directory. This will cause problems
// if there is >1 task targeting the same dest dir
deleteWithWarning(getDestFS(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,30 @@ public JobData(Job job,
this.committer = committer;
conf = job.getConfiguration();
}

public Job getJob() {
return job;
}

public JobContext getJContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use JobContext and TaskContext in getters, even if field names aren't as informative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i'm not worrying about this; will merge as is.

return jContext;
}

public TaskAttemptContext getTContext() {
return tContext;
}

public AbstractS3ACommitter getCommitter() {
return committer;
}

public Configuration getConf() {
return conf;
}

public Path getWrittenTextPath() {
return writtenTextPath;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.UUID;

import org.junit.Test;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -141,6 +143,74 @@ protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter co
assertEquals("file", wd.toUri().getScheme());
}

@Test
public void testStagingUploadsDirectoryCleanedUp() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after successful commit");
JobData jobData = startJob(false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();

Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());

ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);

// write output
writeTextOutput(tContext);

// do commit
committer.commitTask(tContext);

commitJob(committer, jContext);

ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}

@Test
public void testStagingUploadsDirectoryCleanedUpWithFailure() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after failed commit");
JobData jobData = startJob(new FailingCommitterFactory(), false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();

Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());

ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);

// do commit
committer.commitTask(tContext);

// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);

commitJob(committer, jContext);

expectJobCommitToFail(jContext, committer);

ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}

/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
Expand Down