From bd267231f0ea2383105f7a1b421d746dfe92ba4a Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Mon, 20 Oct 2025 15:41:10 +0200 Subject: [PATCH 1/3] HIVE-29272: Query-based MINOR compaction should not consider minOpenWriteId --- .../txn/compactor/TestCrudCompactorOnTez.java | 93 +++++++++++++++++++ .../txn/compactor/CompactionQueryBuilder.java | 14 ++- .../txn/compactor/MmMinorQueryCompactor.java | 2 +- ...pactionQueryBuilderForMajorCompaction.java | 2 +- ...pactionQueryBuilderForMinorCompaction.java | 2 +- ...CompactionQueryBuilderForMmCompaction.java | 4 +- ...ionQueryBuilderForRebalanceCompaction.java | 2 +- 7 files changed, 111 insertions(+), 8 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 53167ad93f5f..425ee60eb159 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -3709,4 +3709,97 @@ public void testMajorCompactionUpdateMissingColumnStatsOfPartition() throws Exce Assert.assertEquals(3, StatsSetupConst.getColumnsHavingStats(partition.getParameters()).size()); } + + @Test + public void testMinorWithAbortedAndOpenTnx() throws Exception { + String dbName = "default"; + String tableName = "testAbortedAndOpenTnxTbl"; + // Create test table + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(tableName, false, false); + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + + // Abort the first insert transaction + driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true); + testDataProvider.insertOnlyTestData(tableName, 1); + driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, false); + // Do threee successful insert to create 3 deltas + testDataProvider.insertOnlyTestData(tableName, 3); + + // Start an insert and leave it open when the compaction is running + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName) + .withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer) + .withTransactionBatchSize(1).connect(); + connection.beginTransaction(); + connection.write("4,4".getBytes()); + // Run query-based MINOR compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Finish the open transaction + connection.commitTransaction(); + connection.close(); + List expectedData = testDataProvider.getAllData(tableName, false); + // Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction. + CompactorTestUtil.runCleaner(conf); + + verifySuccessfulCompaction(1); + List resultData = testDataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, resultData); + List deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(2, deltas.size()); + Assert.assertEquals("Delta directory names are not matching after compaction", + Arrays.asList("delta_0000002_0000004_v0000007", "delta_0000005_0000005"), deltas); + for (String delta: deltas) { + // Check if none of the delta directories are empty + List files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta); + Assert.assertFalse(files.isEmpty()); + } + } + + @Test + public void testMinorWithOpenTnx() throws Exception { + String dbName = "default"; + String tableName = "testAbortedAndOpenTnxTbl"; + // Create test table + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(tableName, false, false); + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + + // Do threee successful insert to create 3 deltas + testDataProvider.insertOnlyTestData(tableName, 3); + + // Start an insert and leave it open when the compaction is running + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName) + .withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer) + .withTransactionBatchSize(1).connect(); + connection.beginTransaction(); + connection.write("4,4".getBytes()); + // Run query-based MINOR compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Finish the open transaction + connection.commitTransaction(); + connection.close(); + List expectedData = testDataProvider.getAllData(tableName, false); + // Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction. + CompactorTestUtil.runCleaner(conf); + + verifySuccessfulCompaction(1); + List resultData = testDataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, resultData); + List deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(2, deltas.size()); + Assert.assertEquals("Delta directory names are not matching after compaction", + Arrays.asList("delta_0000001_0000003_v0000006", "delta_0000004_0000004"), deltas); + for (String delta: deltas) { + // Check if none of the delta directories are empty + List files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta); + Assert.assertFalse(files.isEmpty()); + } + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java index 98196d20bc38..e5c39aa1039c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.ArrayList; @@ -39,6 +41,9 @@ import java.util.stream.Collectors; abstract class CompactionQueryBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName()); + // required fields, set in constructor protected Operation operation; protected String resultTableName; @@ -317,15 +322,20 @@ protected void addTblProperties(StringBuilder query, Map tblProp private void buildAddClauseForAlter(StringBuilder query) { if (validWriteIdList == null || dir == null) { + LOG.info("There is no delta to be added as partition to the temp external table used by the minor compaction. " + + "This may result an empty compaction directory."); query.setLength(0); return; // avoid NPEs, don't throw an exception but return an empty query } - long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); long highWatermark = validWriteIdList.getHighWatermark(); List deltas = dir.getCurrentDirectories().stream().filter( - delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID) + delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark) .collect(Collectors.toList()); if (deltas.isEmpty()) { + String warnMsg = String.format("No %s delta is found below the highWaterMark %s to be added as partition " + + "to the temp external table, used by the minor compaction. This may result an empty compaction directory.", + isDeleteDelta ? "delete" : "", highWatermark); + LOG.warn(warnMsg); query.setLength(0); // no alter query needed; clear StringBuilder return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index 68c3cb82db98..bdfa5b929795 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -80,7 +80,7 @@ protected HiveConf setUpDriverSession(HiveConf hiveConf) { * Clean up the empty table dir of 'tmpTableName'. */ @Override protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException { - Util.cleanupEmptyTableDir(conf, tmpTableName); + Util.cleanupEmptyTableDir(conf, tmpTableName + "_result"); } /** diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java index 99f8b5303a52..e239f64bffb9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java @@ -172,7 +172,7 @@ public void testAlter() { queryBuilder.setIsDeleteDelta(true); String query = queryBuilder.build(); String expectedQuery = - "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; + "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; Assert.assertEquals(expectedQuery, query); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java index eef965b1a92b..884944157c31 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java @@ -150,7 +150,7 @@ public void testAlter() { queryBuilder.setIsDeleteDelta(true); String query = queryBuilder.build(); String expectedQuery = - "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; + "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; Assert.assertEquals(expectedQuery, query); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java index 2295f9cf715a..d8f379cd2e8b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java @@ -372,7 +372,7 @@ public void testAlterMajorCompaction() { queryBuilder.setIsDeleteDelta(true); String query = queryBuilder.build(); String expectedQuery = - "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; + "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; Assert.assertEquals(expectedQuery, query); } @@ -386,7 +386,7 @@ public void testAlterMinorCompaction() { queryBuilder.setIsDeleteDelta(true); String query = queryBuilder.build(); String expectedQuery = - "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; + "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; Assert.assertEquals(expectedQuery, query); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java index df96fb00314c..3fb4edbd0496 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java @@ -126,7 +126,7 @@ public void testAlter() { queryBuilder.setIsDeleteDelta(true); String query = queryBuilder.build(); String expectedQuery = - "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; + "ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' "; Assert.assertEquals(expectedQuery, query); } From 4ee75446e02ac9fcb136bd1f6f188f13497d3157 Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Thu, 30 Oct 2025 10:43:33 +0100 Subject: [PATCH 2/3] HIVE-29272: Query-based MINOR compaction should not consider minOpenWriteId Fix the output directory generation in MergeCompactor --- .../txn/compactor/CompactionQueryBuilder.java | 2 +- .../hive/ql/txn/compactor/MergeCompactor.java | 25 ++----------------- .../txn/compactor/MmMinorQueryCompactor.java | 3 ++- 3 files changed, 5 insertions(+), 25 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java index e5c39aa1039c..e991f22eace4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java @@ -322,7 +322,7 @@ protected void addTblProperties(StringBuilder query, Map tblProp private void buildAddClauseForAlter(StringBuilder query) { if (validWriteIdList == null || dir == null) { - LOG.info("There is no delta to be added as partition to the temp external table used by the minor compaction. " + + LOG.warn("There is no delta to be added as partition to the temp external table used by the minor compaction. " + "This may result an empty compaction directory."); query.setLength(0); return; // avoid NPEs, don't throw an exception but return an empty query diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java index 7e5a3608e462..9e35c821328b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java @@ -56,8 +56,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException, if (isMergeCompaction(hiveConf, dir, storageDescriptor)) { // Only inserts happened, it is much more performant to merge the files than running a query - Path outputDirPath = getOutputDirPath(hiveConf, writeIds, - compactionInfo.isMajorCompaction(), storageDescriptor); + Path outputDirPath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, + hiveConf, compactionInfo.isMajorCompaction(), false, false, dir); try { return mergeFiles(hiveConf, compactionInfo.isMajorCompaction(), dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters())); @@ -161,27 +161,6 @@ private Map> getBucketFiles(HiveConf conf, Path dirPath, b return bucketIdToBucketFilePath; } - /** - * Generate output path for compaction. This can be used to generate delta or base directories. - * @param conf hive configuration, must be non-null - * @param writeIds list of valid write IDs - * @param isBaseDir if base directory path should be generated - * @param sd the resolved storadge descriptor - * @return output path, always non-null - */ - private Path getOutputDirPath(HiveConf conf, ValidWriteIdList writeIds, boolean isBaseDir, - StorageDescriptor sd) { - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWatermark = writeIds.getHighWatermark(); - long compactorTxnId = Compactor.getCompactorTxnId(conf); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .writingBase(isBaseDir).writingDeleteDelta(false) - .isCompressed(false) - .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark) - .statementId(-1).visibilityTxnId(compactorTxnId); - return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), options); - } - /** * Merge files from base/delta directories. If the directories contains multiple buckets, the result will also * contain the same amount. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index bdfa5b929795..ef3e6868bf0d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -79,7 +79,8 @@ protected HiveConf setUpDriverSession(HiveConf hiveConf) { /** * Clean up the empty table dir of 'tmpTableName'. */ - @Override protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException { + @Override + protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException { Util.cleanupEmptyTableDir(conf, tmpTableName + "_result"); } From f43344ca1e16e953c18bf50fb1bd76f3609d1df7 Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Fri, 31 Oct 2025 13:54:06 +0100 Subject: [PATCH 3/3] HIVE-29272: Query-based MINOR compaction should not consider minOpenWriteId Addressing review comments and fixing one testcase --- .../hive/ql/txn/compactor/TestCrudCompactorOnTez.java | 2 +- .../hive/ql/txn/compactor/MajorQueryCompactor.java | 2 +- .../hadoop/hive/ql/txn/compactor/MergeCompactor.java | 2 +- .../hive/ql/txn/compactor/MinorQueryCompactor.java | 4 ++-- .../hive/ql/txn/compactor/MmMajorQueryCompactor.java | 2 +- .../hive/ql/txn/compactor/MmMinorQueryCompactor.java | 2 +- .../hadoop/hive/ql/txn/compactor/QueryCompactor.java | 9 +-------- .../hive/ql/txn/compactor/RebalanceQueryCompactor.java | 2 +- .../ql/txn/compactor/service/AcidCompactionService.java | 4 ++-- 9 files changed, 11 insertions(+), 18 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 425ee60eb159..98121f7df019 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -3369,7 +3369,7 @@ public void testMinorCompactionAfterMajorWithMerge() throws Exception { testCompactionWithMerge(CompactionType.MINOR, false, false, null, Collections.singletonList("bucket_00000"), Arrays.asList("delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), - Collections.singletonList("delta_0000001_0000006_v0000013"), false, true, false); + Collections.singletonList("delta_0000004_0000006_v0000013"), false, true, false); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 269a6a018b54..7f64ef0eaef2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -49,7 +49,7 @@ public boolean run(CompactorContext context) throws IOException { String tmpTableName = getTempTableName(table); Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, - conf, true, false, false, null); + conf, true, false, null); List createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); List compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java index 9e35c821328b..a1e60eeb6daa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java @@ -57,7 +57,7 @@ public boolean run(CompactorContext context) throws IOException, HiveException, if (isMergeCompaction(hiveConf, dir, storageDescriptor)) { // Only inserts happened, it is much more performant to merge the files than running a query Path outputDirPath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, - hiveConf, compactionInfo.isMajorCompaction(), false, false, dir); + hiveConf, compactionInfo.isMajorCompaction(), false, dir); try { return mergeFiles(hiveConf, compactionInfo.isMajorCompaction(), dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index b3c71a34276f..141572d58ba0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -59,9 +59,9 @@ public boolean run(CompactorContext context) throws IOException { table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, - writeIds, conf, false, false, false, dir); + writeIds, conf, false, false, dir); Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, - writeIds, conf, false, true, false, dir); + writeIds, conf, false, true, dir); List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, resultDeltaDir, resultDeleteDeltaDir); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 3440c4c01fb8..c51f4ac8f68a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -56,7 +56,7 @@ public boolean run(CompactorContext context) throws IOException { // "insert overwrite directory" command if there were no bucketing or list bucketing. String tmpTableName = getTempTableName(table); Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir( - storageDescriptor, writeIds, driverConf, true, true, false, null); + storageDescriptor, writeIds, driverConf, true, true, null); List createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, resultBaseDir.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index ef3e6868bf0d..82a18030ce18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -57,7 +57,7 @@ public boolean run(CompactorContext context) throws IOException { String tmpTableName = getTempTableName(table); String resultTmpTableName = tmpTableName + "_result"; Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf, - false, false, false, dir); + false, false, dir); List createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir, writeIds, resultDeltaDir); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index afb01e7fd4c0..79b86314a8eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.metastore.utils.StringableMap; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -43,8 +42,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.stream.Stream; import static org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.overrideConfProps; @@ -188,13 +185,12 @@ public static class Util { * @param conf HiveConf * @param writingBase if true, we are creating a base directory, otherwise a delta * @param createDeleteDelta if true, the delta dir we are creating is a delete delta - * @param bucket0 whether to specify 0 as the bucketid * @param directory AcidUtils.Directory - only required for minor compaction result (delta) dirs * * @return Path of new base/delta/delete delta directory */ public static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf, - boolean writingBase, boolean createDeleteDelta, boolean bucket0, AcidDirectory directory) { + boolean writingBase, boolean createDeleteDelta, AcidDirectory directory) { long minWriteID = writingBase ? 1 : getMinWriteID(directory); long highWatermark = writeIds.getHighWatermark(); long compactorTxnId = Compactor.getCompactorTxnId(conf); @@ -202,9 +198,6 @@ public static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minWriteID) .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId) .writingBase(writingBase).writingDeleteDelta(createDeleteDelta); - if (bucket0) { - options = options.bucket(0); - } Path location = new Path(sd.getLocation()); return AcidUtils.baseOrDeltaSubdirPath(location, options); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java index be69699027aa..3919236d1732 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java @@ -52,7 +52,7 @@ public boolean run(CompactorContext context) String tmpTableName = getTempTableName(table); Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, - conf, true, false, false, null); + conf, true, false, null); int numBuckets = context.getCompactionInfo().numberOfBuckets; if (numBuckets <= 0) { //TODO: This is quite expensive, a better way should be found to get the number of buckets for an implicitly bucketed table diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index 9f2271075c7a..cf3f0ccee024 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -105,7 +105,7 @@ private AcidDirectory getAcidStateForWorker(CompactionInfo ci, StorageDescriptor public void cleanupResultDirs(CompactionInfo ci) { // result directory for compactor to write new files Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf, - ci.type == CompactionType.MAJOR, false, false, dir); + ci.type == CompactionType.MAJOR, false, dir); LOG.info("Deleting result directories created by the compactor:\n"); try { FileSystem fs = resultDir.getFileSystem(conf); @@ -114,7 +114,7 @@ public void cleanupResultDirs(CompactionInfo ci) { if (ci.type == CompactionType.MINOR) { Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf, - false, true, false, dir); + false, true, dir); LOG.info(deleteDeltaDir.toString()); fs.delete(deleteDeltaDir, true);