diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 60447192b112..27ff40eafdc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -1181,18 +1181,28 @@ public void commitJob(JobContext context) throws IOException { //name is that we want to rename; leave it for another day // TODO: if we expect one dir why don't we enforce it? for (FileStatus fileStatus : contents) { + Path tmpPath = fileStatus.getPath(); //newPath is the base/delta dir - Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); - /*rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist, - * it does the expected operation and everything that was in A is now in B. If B exists, - * it will make A a child of B... thus make sure the rename() is done before creating the - * meta files which will create base_x/ (i.e. B)...*/ - fs.rename(fileStatus.getPath(), newPath); - AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs); - createCompactorMarker(conf, newPath, fs); + Path newPath = new Path(finalLocation, tmpPath.getName()); + /* rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist, + * it does the expected operation and everything that was in A is now in B. If B exists, + * it will make A a child of B. + * This issue can happen if the previous MR job succeeded but HMS was unable to persist compaction result. + * We will delete the directory B if it exists to avoid the above issue + */ + if (fs.exists(newPath)) { + LOG.info(String.format("Final path %s already exists. Deleting the path to avoid redundant base creation", newPath.toString())); + fs.delete(newPath, true); + } + /* Create the markers in the tmp location and rename everything in the end to prevent race condition between + * marker creation and split read. */ + AcidUtils.OrcAcidVersion.writeVersionFile(tmpPath, fs); + createCompactorMarker(conf, tmpPath, fs); + fs.rename(tmpPath, newPath); } fs.delete(tmpLocation, true); } + private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs) throws IOException { if(conf.getBoolean(IS_MAJOR, false)) {