diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 03b0c76b40f9..f47cbffca663 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -578,7 +578,17 @@ private void initializeSpecPath() { unionPath = null; } else if (conf.isMmTable() || isUnionDp) { // MM tables need custom handling for union suffix; DP tables use parent too. - specPath = conf.getParentDir(); + // For !isDirectInsert and !conf.isMmTable() cases, the output will be like: + // w1: //_tmp.-ext-10000//HIVE_UNION_SUBDIR_1/ + // w2: //_tmp.-ext-10000//HIVE_UNION_SUBDIR_2/ + // When the BaseWork w2 in a TezTask closes first, it may rename the entire directory: + // //_tmp.-ext-10000 to //_tmp.-ext-10000.moved, + // make the specPath to conf.getDirName() can give w1 a chance to deal with his output under the + // directory HIVE_UNION_SUBDIR_1, the output directory after will be + // //-ext-10000/_tmp.HIVE_UNION_SUBDIR_1//HIVE_UNION_SUBDIR_1. + // When the job finishes, it will move the output to + // //-ext-10000//HIVE_UNION_SUBDIR_1 as it does before. + specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName(); unionPath = conf.getDirName().getName(); } else { // For now, keep the old logic for non-MM non-DP union case. Should probably be unified. @@ -1585,7 +1595,7 @@ public void jobCloseOp(Configuration hconf, boolean success) DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); ListBucketingCtx lbCtx = conf.getLbCtx(); if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) { - specPath = conf.getParentDir(); + specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName(); unionSuffix = conf.getDirName().getName(); } if (conf.isLinkedFileSink() && conf.isDirectInsert()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d9c726da1227..91acd24465cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1435,6 +1435,9 @@ public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configur FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); + if (!StringUtils.isEmpty(unionSuffix)) { + specPath = specPath.getParent(); + } PerfLogger perfLogger = SessionState.getPerfLogger(); boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); boolean avoidRename = false; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a27215ffd10e..9b2fd95442e3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import org.junit.After; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -78,7 +79,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -172,8 +172,9 @@ public void testNonAcidRemoveDuplicate() throws Exception { setupData(DataFormat.WITH_PARTITION_VALUE); FileSinkDesc desc = (FileSinkDesc) getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone(); + Path linkedDir = desc.getDirName(); desc.setLinkedFileSink(true); - desc.setDirName(new Path(desc.getDirName(), AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0")); + desc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0")); JobConf jobConf = new JobConf(jc); jobConf.set("hive.execution.engine", "tez"); jobConf.set("mapred.task.id", "000000_0"); @@ -188,41 +189,85 @@ public void testNonAcidRemoveDuplicate() throws Exception { op2.setConf(desc); op2.initialize(jobConf2, new ObjectInspector[]{inspector}); + // Another sub-query in union + JobConf jobConf3 = new JobConf(jobConf); + jobConf3.set("mapred.task.id", "000001_0"); + FileSinkOperator op3 = (FileSinkOperator)OperatorFactory.get( + new CompilationOpContext(), FileSinkDesc.class); + FileSinkDesc sinkDesc = (FileSinkDesc) desc.clone(); + sinkDesc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1")); + op3.setConf(sinkDesc); + op3.initialize(jobConf3, new ObjectInspector[]{inspector}); + + JobConf jobConf4 = new JobConf(jobConf); + jobConf4.set("mapred.task.id", "000001_1"); + FileSinkOperator op4 = (FileSinkOperator)OperatorFactory.get( + new CompilationOpContext(), FileSinkDesc.class); + op4.setConf(sinkDesc); + op4.initialize(jobConf4, new ObjectInspector[]{inspector}); + for (Object r : rows) { op1.process(r, 0); op2.process(r, 0); + op3.process(r, 0); + op4.process(r, 0); } op1.close(false); // Assume op2 also ends successfully, this happens in different containers op2.close(false); - Path[] paths = findFilesInBasePath(); - List mondays = Arrays.stream(paths) - .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0")) - .collect(Collectors.toList()); - Assert.assertEquals("Two result files are expected", 2, mondays.size()); - Set fileNames = new HashSet<>(); - fileNames.add(mondays.get(0).getName()); - fileNames.add(mondays.get(1).getName()); + op3.close(false); + op4.close(false); + Path[] paths = findFilesInPath(linkedDir); + // = findFilesInBasePath() # use findFilesInBasePath before the fix + Set fileNames = Arrays.stream(paths) + .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0")) + .map(path -> path.getName()) + .collect(Collectors.toSet()); + Assert.assertEquals("Two result files are expected", 2, fileNames.size()); Assert.assertTrue("000000_1 file is expected", fileNames.contains("000000_1")); Assert.assertTrue("000000_0 file is expected", fileNames.contains("000000_0")); + fileNames = Arrays.stream(paths) + .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_1")) + .map(path -> path.getName()) + .collect(Collectors.toSet()); + Assert.assertEquals("Two result files are expected", 2, fileNames.size()); + Assert.assertTrue("000001_0 file is expected", fileNames.contains("000001_0")); + Assert.assertTrue("000001_1 file is expected", fileNames.contains("000001_1")); + + // Close op3 first to see if it can deduplicate the result under HIVE_UNION_SUBDIR_0 + op3.jobCloseOp(jobConf, true); // This happens in HiveServer2 when the job is finished, the job will call // jobCloseOp to end his operators. For the FileSinkOperator, a deduplication on the // output files may happen so that only one output file is left for each yarn task. op1.jobCloseOp(jobConf, true); List resultFiles = new ArrayList(); - recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles); - mondays = resultFiles.stream() - .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0")) + String linkedDirPath = linkedDir.toUri().getPath(); + recurseOnPath(linkedDir, linkedDir.getFileSystem(jc), resultFiles); + List mondays = resultFiles.stream() + .filter(path -> path.getParent().toUri().getPath() + .equals(linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_0")) .collect(Collectors.toList()); Assert.assertEquals("Only 1 file should be here after cleaning", 1, mondays.size()); Assert.assertEquals("000000_1 file is expected", "000000_1", mondays.get(0).getName()); - confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.toArray(new Path[0])); - // Clean out directory after testing - basePath.getFileSystem(jc).delete(basePath, true); + List subdir1 = resultFiles.stream() + .filter(path -> path.getParent().getName().equals("HIVE_UNION_SUBDIR_1")).sorted() + .collect(Collectors.toList()); + Assert.assertEquals("Two partitions expected", 2, subdir1.size()); + Path monday = subdir1.get(0), tuesday = subdir1.get(1); + Assert.assertEquals("Only 1 file left under the partition after deduplication", monday.toUri().getPath(), + linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_1/000001_1"); + Assert.assertEquals("Only 1 file left under the partition after deduplication", tuesday.toUri().getPath(), + linkedDirPath + "/partval=Tuesday/HIVE_UNION_SUBDIR_1/000001_1"); + + // Confirm the output + confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.stream() + .filter(p -> p.getParent().getName().equals("HIVE_UNION_SUBDIR_0")).sorted() + .collect(Collectors.toList()).toArray(new Path[0])); + confirmOutput(DataFormat.WITH_PARTITION_VALUE, subdir1.toArray(new Path[0])); } @Test @@ -270,6 +315,16 @@ public void setup() throws Exception { jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom"); } + @After + public void afterTest() throws Exception { + Path parent = basePath.getParent(); + String last = basePath.getName(); + FileSystem fs = basePath.getFileSystem(jc); + fs.delete(basePath, true); + fs.delete(new Path(parent, "_tmp." + last), true); + fs.delete(new Path(parent, "_task_tmp." + last), true); + } + private void setBasePath(String testName) { basePath = new Path(new File(tmpdir, testName).getPath()); @@ -414,6 +469,13 @@ private Path[] findFilesInBasePath() throws IOException { return paths.toArray(new Path[paths.size()]); } + private Path[] findFilesInPath(Path path) throws IOException { + FileSystem fs = path.getFileSystem(jc); + List paths = new ArrayList(); + recurseOnPath(path, fs, paths); + return paths.toArray(new Path[paths.size()]); + } + private void recurseOnPath(Path p, FileSystem fs, List paths) throws IOException { if (fs.getFileStatus(p).isDir()) { FileStatus[] stats = fs.listStatus(p);