From 16b556479f23210538f537c2783abb91c224d0ed Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Tue, 12 May 2026 10:45:34 +0200 Subject: [PATCH] HIVE-29572: ACID Compaction: Cleaner should check the state of the compaction txn before start cleaning --- .../hadoop/hive/ql/HiveQueryLifeTimeHook.java | 2 +- .../compactor/handler/CompactionCleaner.java | 14 +++ .../hive/ql/txn/compactor/TestCleaner.java | 90 ++++++++++++++++++- .../hadoop/hive/metastore/txn/TxnHandler.java | 9 +- .../hadoop/hive/metastore/txn/TxnStore.java | 5 ++ .../txn/jdbc/queries/ReadyToCleanHandler.java | 7 +- 6 files changed, 119 insertions(+), 8 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/HiveQueryLifeTimeHook.java b/ql/src/java/org/apache/hadoop/hive/ql/HiveQueryLifeTimeHook.java index 82a3f190000e..6b875a66b64d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/HiveQueryLifeTimeHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/HiveQueryLifeTimeHook.java @@ -97,7 +97,7 @@ private void checkAndRollbackCTAS(QueryLifeTimeHookContext ctx) { if (table != null) { LOG.info("Performing cleanup as part of rollback: {}", table.getFullTableName().toString()); try { - CompactionRequest request = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.MAJOR); + CompactionRequest request = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.DEFERRED_CLEANUP); request.setRunas(TxnUtils.findUserToRunAs(tblPath.toString(), table.getTTable(), conf)); request.putToProperties(META_TABLE_LOCATION, tblPath.toString()); request.putToProperties(IF_PURGE, Boolean.toString(true)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java index 5d9a4b14fbe0..c73a6ca6963c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; @@ -40,6 +41,7 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest; import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder; @@ -99,6 +101,18 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t LOG.info("Starting cleaning for {}, based on min open {}", ci, (ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn); + if (ci.nextTxnId == 0 && ci.txnId > 0 && + (ci.type == CompactionType.MAJOR || ci.type == CompactionType.MINOR || ci.type == CompactionType.REBALANCE)) { + TxnStatus status = txnHandler.getTransactionStatus(ci.txnId); + if (TxnStatus.ABORTED == status) { + LOG.warn("The compaction {} is in invalid state. The compaction is marked as 'ready for cleaning', " + + "but its txn is in aborted state. Marking this compaction as failed."); + ci.errorMessage = "Invalid state: the compaction txn (" + ci.txnId + ") is already aborted."; + txnHandler.markFailed(ci); + return; + } + } + PerfLogger perfLogger = PerfLogger.getPerfLogger(false); String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" + (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index df73adf26ed3..c8b08f9b9b4d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; @@ -63,6 +66,10 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_STATE; import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1347,7 +1354,86 @@ public void testCompactionHwmIsHonoredWithMinOpenWriteIdSetAndAbortedIOW() throw } } - private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) throws Exception { + @Test + public void testCleanerRunsWithOpenCompactionTxn() throws Exception { + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0L); + + String dbName = "default"; + String tblName = "campcnb"; + Table t = newTable(dbName, tblName, false); + addDeltaFile(t, null, 1L, 1L, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tblName, 4, null, null); + + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + long txnId = compactInTxn(rqst, CommitAction.MARK_COMPACTED); + addBaseFile(t, null, 4L, 6, txnId); + String deltaName1 = "base_4_v0000005"; + + // should not clean anything since the compaction txn is still open + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + + String deltaName2 = createDeltasAndRunMajorCompaction(t, 5, 2); + + // should not clean anything since the compaction txn is still open + startCleaner(); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); + + String deltaName3 = createDeltasAndRunMajorCompaction(t, 7, 2); + + //Abort the compaction txn + txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId))); + Thread.sleep(10000L); + + Set expectedDirs = new HashSet<>(); + expectedDirs.add(deltaName1); + expectedDirs.add(deltaName2); + expectedDirs.add(deltaName3); + for (int i = 1; i < 9; i++) { + expectedDirs.add(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + // Should mark the compaction 1 failed as its txn is aborted + startCleaner(); + verifyDirectories(t, expectedDirs); + + // Should find the second compaction + startCleaner(); + expectedDirs.remove(deltaName1); + for (int i = 1; i < 7; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + // Should find the third compaction and deletes the directories accordingly + startCleaner(); + expectedDirs.remove(deltaName2); + for (int i = 7; i < 9; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(3, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().get(2).getState()); + assertEquals("Invalid state: the compaction txn (" + txnId + ") is already aborted.", + rsp.getCompacts().get(2).getErrorMessage()); + } + + private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) + throws Exception { String dbName = table.getDbName(); String tableName = table.getTableName(); for (int i = 0; i < numberOfDeltas; i++) { @@ -1367,4 +1453,4 @@ private void verifyDirectories(Table table, Set expectedDirs) throws Exc Assert.assertEquals(expectedDirs, actualDirs); } -} \ No newline at end of file +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a847e01aca4d..b7c81f8237cf 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1100,8 +1100,13 @@ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throw private static void shouldNeverHappen(long txnid) { throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); - } - + } + + public TxnStatus getTransactionStatus(long txnId) throws MetaException { + TxnStatus status = jdbcResource.execute(new FindTxnStateHandler(txnId)); + return status; + } + private void deleteInvalidOpenTransactions(List txnIds) throws MetaException { try { sqlRetryHandler.executeWithRetry(new SqlRetryCallProperties().withCallerId("deleteInvalidOpenTransactions"), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 22e6c279fc84..bcb443a99790 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.entities.CompactionMetricsData; import org.apache.hadoop.hive.metastore.txn.entities.MetricsInfo; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.retry.SqlRetry; import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryException; @@ -336,6 +337,10 @@ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) @RetrySemantics.SafeToRetry void addWriteIdsToMinHistory(long txnId, Map minOpenWriteIds) throws MetaException; + @SqlRetry(lockInternally = true, retryOnDuplicateKey = true) + @Transactional(POOL_TX) + public TxnStatus getTransactionStatus(long txnId) throws MetaException; + /** * Allocate a write ID for the given table and associate it with a transaction * @param rqst info on transaction and table to allocate write id diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java index 1e1ea51420c9..31ed4e55adbb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java @@ -62,7 +62,7 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw String queryStr = " \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," + " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\", \"CQ_RETRY_RETENTION\", " + - " \"CQ_NEXT_TXN_ID\""; + " \"CQ_TXN_ID\", \"CQ_NEXT_TXN_ID\""; if (TxnHandler.ConfVars.useMinHistoryWriteId()) { queryStr += ", \"MIN_OPEN_WRITE_ID\""; } @@ -118,9 +118,10 @@ public List extractData(ResultSet rs) throws SQLException, DataA info.highestWriteId = rs.getLong(7); info.properties = rs.getString(8); info.retryRetention = rs.getInt(9); - info.nextTxnId = rs.getLong(10); + info.txnId = rs.getLong(10); + info.nextTxnId = rs.getLong(11); if (TxnHandler.ConfVars.useMinHistoryWriteId()) { - long value = rs.getLong(11); + long value = rs.getLong(12); info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE; } infos.add(info);