Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1181,18 +1181,28 @@ public void commitJob(JobContext context) throws IOException {
//name is that we want to rename; leave it for another day
// TODO: if we expect one dir why don't we enforce it?
for (FileStatus fileStatus : contents) {
Path tmpPath = fileStatus.getPath();
//newPath is the base/delta dir
Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
/*rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist,
* it does the expected operation and everything that was in A is now in B. If B exists,
* it will make A a child of B... thus make sure the rename() is done before creating the
* meta files which will create base_x/ (i.e. B)...*/
fs.rename(fileStatus.getPath(), newPath);
AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs);
createCompactorMarker(conf, newPath, fs);
Path newPath = new Path(finalLocation, tmpPath.getName());
/* rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist,
* it does the expected operation and everything that was in A is now in B. If B exists,
* it will make A a child of B.
* This issue can happen if the previous MR job succeeded but HMS was unable to persist compaction result.
* We will delete the directory B if it exists to avoid the above issue
*/
if (fs.exists(newPath)) {
LOG.info(String.format("Final path %s already exists. Deleting the path to avoid redundant base creation", newPath.toString()));
fs.delete(newPath, true);
}
/* Create the markers in the tmp location and rename everything in the end to prevent race condition between
* marker creation and split read. */
AcidUtils.OrcAcidVersion.writeVersionFile(tmpPath, fs);
createCompactorMarker(conf, tmpPath, fs);
fs.rename(tmpPath, newPath);
}
fs.delete(tmpLocation, true);
}

private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs)
throws IOException {
if(conf.getBoolean(IS_MAJOR, false)) {
Expand Down