Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-27494: Deduplicate the task result that generated by more branch… #4479

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,17 @@ private void initializeSpecPath() {
unionPath = null;
} else if (conf.isMmTable() || isUnionDp) {
// MM tables need custom handling for union suffix; DP tables use parent too.
specPath = conf.getParentDir();
// For !isDirectInsert and !conf.isMmTable() cases, the output will be like:
// w1: <table-dir>/<staging-dir>/_tmp.-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_1/<task_id>
// w2: <table-dir>/<staging-dir>/_tmp.-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_2/<task_id>
// When the BaseWork w2 in a TezTask closes first, it may rename the entire directory:
// <table-dir>/<staging-dir>/_tmp.-ext-10000 to <table-dir>/<staging-dir>/_tmp.-ext-10000.moved,
// make the specPath to conf.getDirName() can give w1 a chance to deal with his output under the
// directory HIVE_UNION_SUBDIR_1, the output directory after will be
// <table-dir>/<staging-dir>/-ext-10000/_tmp.HIVE_UNION_SUBDIR_1/<partition-dir>/HIVE_UNION_SUBDIR_1.
// When the job finishes, it will move the output to
// <table-dir>/<staging-dir>/-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_1 as it does before.
specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName();
unionPath = conf.getDirName().getName();
} else {
// For now, keep the old logic for non-MM non-DP union case. Should probably be unified.
Expand Down Expand Up @@ -1585,7 +1595,7 @@ public void jobCloseOp(Configuration hconf, boolean success)
DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
ListBucketingCtx lbCtx = conf.getLbCtx();
if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) {
specPath = conf.getParentDir();
specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName();
unionSuffix = conf.getDirName().getName();
}
if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
Expand Down
3 changes: 3 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,9 @@ public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configur
FileSystem fs = specPath.getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
if (!StringUtils.isEmpty(unionSuffix)) {
specPath = specPath.getParent();
}
PerfLogger perfLogger = SessionState.getPerfLogger();
boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
boolean avoidRename = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.exec;

import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,7 +79,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -172,8 +172,9 @@ public void testNonAcidRemoveDuplicate() throws Exception {
setupData(DataFormat.WITH_PARTITION_VALUE);

FileSinkDesc desc = (FileSinkDesc) getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
Path linkedDir = desc.getDirName();
desc.setLinkedFileSink(true);
desc.setDirName(new Path(desc.getDirName(), AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
desc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
JobConf jobConf = new JobConf(jc);
jobConf.set("hive.execution.engine", "tez");
jobConf.set("mapred.task.id", "000000_0");
Expand All @@ -188,41 +189,85 @@ public void testNonAcidRemoveDuplicate() throws Exception {
op2.setConf(desc);
op2.initialize(jobConf2, new ObjectInspector[]{inspector});

// Another sub-query in union
JobConf jobConf3 = new JobConf(jobConf);
jobConf3.set("mapred.task.id", "000001_0");
FileSinkOperator op3 = (FileSinkOperator)OperatorFactory.get(
new CompilationOpContext(), FileSinkDesc.class);
FileSinkDesc sinkDesc = (FileSinkDesc) desc.clone();
sinkDesc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1"));
op3.setConf(sinkDesc);
op3.initialize(jobConf3, new ObjectInspector[]{inspector});

JobConf jobConf4 = new JobConf(jobConf);
jobConf4.set("mapred.task.id", "000001_1");
FileSinkOperator op4 = (FileSinkOperator)OperatorFactory.get(
new CompilationOpContext(), FileSinkDesc.class);
op4.setConf(sinkDesc);
op4.initialize(jobConf4, new ObjectInspector[]{inspector});

for (Object r : rows) {
op1.process(r, 0);
op2.process(r, 0);
op3.process(r, 0);
op4.process(r, 0);
}

op1.close(false);
// Assume op2 also ends successfully, this happens in different containers
op2.close(false);
Path[] paths = findFilesInBasePath();
List<Path> mondays = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
.collect(Collectors.toList());
Assert.assertEquals("Two result files are expected", 2, mondays.size());
Set<String> fileNames = new HashSet<>();
fileNames.add(mondays.get(0).getName());
fileNames.add(mondays.get(1).getName());
op3.close(false);
op4.close(false);

Path[] paths = findFilesInPath(linkedDir);
// = findFilesInBasePath() # use findFilesInBasePath before the fix
Set<String> fileNames = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
.map(path -> path.getName())
.collect(Collectors.toSet());
Assert.assertEquals("Two result files are expected", 2, fileNames.size());
Assert.assertTrue("000000_1 file is expected", fileNames.contains("000000_1"));
Assert.assertTrue("000000_0 file is expected", fileNames.contains("000000_0"));

fileNames = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_1"))
.map(path -> path.getName())
.collect(Collectors.toSet());
Assert.assertEquals("Two result files are expected", 2, fileNames.size());
Assert.assertTrue("000001_0 file is expected", fileNames.contains("000001_0"));
Assert.assertTrue("000001_1 file is expected", fileNames.contains("000001_1"));

// Close op3 first to see if it can deduplicate the result under HIVE_UNION_SUBDIR_0
op3.jobCloseOp(jobConf, true);
// This happens in HiveServer2 when the job is finished, the job will call
// jobCloseOp to end his operators. For the FileSinkOperator, a deduplication on the
// output files may happen so that only one output file is left for each yarn task.
op1.jobCloseOp(jobConf, true);
List<Path> resultFiles = new ArrayList<Path>();
recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
mondays = resultFiles.stream()
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
String linkedDirPath = linkedDir.toUri().getPath();
recurseOnPath(linkedDir, linkedDir.getFileSystem(jc), resultFiles);
List<Path> mondays = resultFiles.stream()
.filter(path -> path.getParent().toUri().getPath()
.equals(linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_0"))
.collect(Collectors.toList());
Assert.assertEquals("Only 1 file should be here after cleaning", 1, mondays.size());
Assert.assertEquals("000000_1 file is expected", "000000_1", mondays.get(0).getName());

confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.toArray(new Path[0]));
// Clean out directory after testing
basePath.getFileSystem(jc).delete(basePath, true);
List<Path> subdir1 = resultFiles.stream()
.filter(path -> path.getParent().getName().equals("HIVE_UNION_SUBDIR_1")).sorted()
.collect(Collectors.toList());
Assert.assertEquals("Two partitions expected", 2, subdir1.size());
Path monday = subdir1.get(0), tuesday = subdir1.get(1);
Assert.assertEquals("Only 1 file left under the partition after deduplication", monday.toUri().getPath(),
linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_1/000001_1");
Assert.assertEquals("Only 1 file left under the partition after deduplication", tuesday.toUri().getPath(),
linkedDirPath + "/partval=Tuesday/HIVE_UNION_SUBDIR_1/000001_1");

// Confirm the output
confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.stream()
.filter(p -> p.getParent().getName().equals("HIVE_UNION_SUBDIR_0")).sorted()
.collect(Collectors.toList()).toArray(new Path[0]));
confirmOutput(DataFormat.WITH_PARTITION_VALUE, subdir1.toArray(new Path[0]));
}

@Test
Expand Down Expand Up @@ -270,6 +315,16 @@ public void setup() throws Exception {
jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom");
}

@After
public void afterTest() throws Exception {
Path parent = basePath.getParent();
String last = basePath.getName();
FileSystem fs = basePath.getFileSystem(jc);
fs.delete(basePath, true);
fs.delete(new Path(parent, "_tmp." + last), true);
fs.delete(new Path(parent, "_task_tmp." + last), true);
}

private void setBasePath(String testName) {
basePath = new Path(new File(tmpdir, testName).getPath());

Expand Down Expand Up @@ -414,6 +469,13 @@ private Path[] findFilesInBasePath() throws IOException {
return paths.toArray(new Path[paths.size()]);
}

private Path[] findFilesInPath(Path path) throws IOException {
FileSystem fs = path.getFileSystem(jc);
List<Path> paths = new ArrayList<Path>();
recurseOnPath(path, fs, paths);
return paths.toArray(new Path[paths.size()]);
}

private void recurseOnPath(Path p, FileSystem fs, List<Path> paths) throws IOException {
if (fs.getFileStatus(p).isDir()) {
FileStatus[] stats = fs.listStatus(p);
Expand Down