diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index dead15e1799c..7083df59828c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -84,7 +84,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
@@ -1128,7 +1127,7 @@ public void testAbortAfterMarkCleaned() throws Exception {
doAnswer(invocationOnMock -> {
connection2.abortTransaction();
return invocationOnMock.callRealMethod();
- }).when(mockedTxnHandler).markCleaned(any(), eq(false));
+ }).when(mockedTxnHandler).markCleaned(any());
MetadataCache metadataCache = new MetadataCache(false);
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
index 06dc02942d90..acd4519d0fe3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
@@ -58,15 +58,18 @@ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
/**
The following cleanup is based on the following idea -
- 1. Aborted cleanup is independent of compaction. This is because directories which are written by
- aborted txns are not visible by any open txns. It is only visible while determining the AcidState (which
- only sees the aborted deltas and does not read the file).
+ Aborted cleanup is independent of compaction. This is because directories which are written by
+ aborted txns are not visible by any open txns in most cases. The one case, wherein abort deltas
+ are visible for open txns are streaming aborts wherein, a single delta file can have writes
+ from committed and aborted txns. These deltas are also referred to as uncompacted aborts.
+ In such cases, we do not delete uncompacted aborts or its associated metadata.
The following algorithm is used to clean the set of aborted directories -
a. Find the list of entries which are suitable for cleanup (This is done in {@link TxnStore#findReadyToCleanAborts(long, int)}).
b. If the table/partition does not exist, then remove the associated aborted entry in TXN_COMPONENTS table.
- c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID
- d. Fetch the aborted directories and delete the directories.
+ c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID.
+ The construction of AcidState helps in identifying obsolete/aborted directories in the table/partition.
+ d. Fetch the obsolete/aborted directories and delete the directories using the AcidState.
e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table.
**/
@Override
@@ -79,8 +82,8 @@ public List getTasks() throws MetaException {
List readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
if (!readyToCleanAborts.isEmpty()) {
- return readyToCleanAborts.stream().map(ci -> ThrowingRunnable.unchecked(() ->
- clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE, metricsEnabled)))
+ return readyToCleanAborts.stream().map(info -> ThrowingRunnable.unchecked(() ->
+ clean(info, info.minOpenWriteTxnId, metricsEnabled)))
.collect(Collectors.toList());
}
return Collections.emptyList();
@@ -99,7 +102,7 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
- txnHandler.markCleaned(info, true);
+ txnHandler.markCleaned(info);
return;
}
if (!isNull(info.partName)) {
@@ -108,7 +111,7 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped.",
info.getFullPartitionName());
- txnHandler.markCleaned(info, true);
+ txnHandler.markCleaned(info);
return;
}
}
@@ -118,10 +121,14 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
} catch (InterruptedException e) {
+ LOG.error("Caught an interrupted exception when cleaning, unable to complete cleaning of {} due to {}", info,
+ e.getMessage());
+ handleCleanerAttemptFailure(info, e.getMessage());
throw e;
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
e.getMessage());
+ handleCleanerAttemptFailure(info, e.getMessage());
throw new MetaException(e.getMessage());
} finally {
if (metricsEnabled) {
@@ -149,7 +156,7 @@ private void abortCleanUsingAcidDir(CompactionInfo info, String location, long m
boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
- txnHandler.markCleaned(info, false);
+ txnHandler.markCleaned(info);
} else {
LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index f7c8944ea3ee..a928f1438f49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -58,10 +58,6 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static java.util.Objects.isNull;
/**
@@ -122,7 +118,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
idWatermark(ci));
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
@@ -137,7 +133,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped. {}",
ci.getFullPartitionName(), idWatermark(ci));
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
@@ -168,11 +164,10 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
e.getMessage());
- ci.errorMessage = e.getMessage();
if (metricsEnabled) {
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
}
- handleCleanerAttemptFailure(ci);
+ handleCleanerAttemptFailure(ci, e.getMessage());
} finally {
if (metricsEnabled) {
perfLogger.perfLogEnd(CompactionCleaner.class.getName(), cleanerMetric);
@@ -204,7 +199,7 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires
deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
}
if (!deleted.isEmpty()) {
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
}
@@ -253,7 +248,7 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT
boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
@@ -302,22 +297,6 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V
return validWriteIdList;
}
- private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
- long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
- int cleanAttempts = 0;
- if (ci.retryRetention > 0) {
- cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
- }
- if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
- //Mark it as failed if the max attempt threshold is reached.
- txnHandler.markFailed(ci);
- } else {
- //Calculate retry retention time and update record.
- ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
- txnHandler.setCleanerRetryRetentionTimeOnError(ci);
- }
- }
-
private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index ef95a100c1a2..ff35ce484237 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -49,8 +49,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.commons.collections.ListUtils.subtract;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
/**
* An abstract class which defines the list of utility methods for performing cleanup activities.
@@ -63,6 +68,7 @@ public abstract class TaskHandler {
protected final boolean metricsEnabled;
protected final MetadataCache metadataCache;
protected final FSRemover fsRemover;
+ protected final long defaultRetention;
TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
boolean metricsEnabled, FSRemover fsRemover) {
@@ -71,6 +77,7 @@ public abstract class TaskHandler {
this.metadataCache = metadataCache;
this.metricsEnabled = metricsEnabled;
this.fsRemover = fsRemover;
+ this.defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
}
public abstract List getTasks() throws MetaException;
@@ -161,4 +168,26 @@ protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String
return success;
}
+
+ protected void handleCleanerAttemptFailure(CompactionInfo info, String errorMessage) throws MetaException {
+ int cleanAttempts = 0;
+ info.errorMessage = errorMessage;
+ if (info.isAbortedTxnCleanup()) {
+ info.retryRetention = info.retryRetention > 0 ? info.retryRetention * 2 : defaultRetention;
+ info.errorMessage = errorMessage;
+ txnHandler.setCleanerRetryRetentionTimeOnError(info);
+ } else {
+ if (info.retryRetention > 0) {
+ cleanAttempts = (int) (Math.log(info.retryRetention / defaultRetention) / Math.log(2)) + 1;
+ }
+ if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+ //Mark it as failed if the max attempt threshold is reached.
+ txnHandler.markFailed(info);
+ } else {
+ //Calculate retry retention time and update record.
+ info.retryRetention = (long) Math.pow(2, cleanAttempts) * defaultRetention;
+ txnHandler.setCleanerRetryRetentionTimeOnError(info);
+ }
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
index 57a4dc625c83..33c782ce4ab8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
@@ -46,12 +46,12 @@ public List getHandlers(HiveConf conf, TxnStore txnHandler, Metadat
boolean metricsEnabled, FSRemover fsRemover) {
boolean useAbortHandler = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
List taskHandlers = new ArrayList<>();
+ taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
+ metricsEnabled, fsRemover));
if (useAbortHandler) {
taskHandlers.add(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
metricsEnabled, fsRemover));
}
- taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
- metricsEnabled, fsRemover));
return taskHandlers;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 03a7326710ee..5b334838ac8c 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -238,7 +238,7 @@ public void testMarkCleaned() throws Exception {
List toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
@@ -529,7 +529,7 @@ private void addSucceededCompaction(String dbName, String tableName, String part
txnHandler.compact(rqst);
ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
}
private void addWaitingForCleaningCompaction(String dbName, String tableName, CompactionType type,
@@ -866,7 +866,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents()
Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
List toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
// Check that we are cleaning up the empty aborted transactions
GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
@@ -892,7 +892,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents()
toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
// The open txn will became the low water mark
@@ -999,7 +999,7 @@ public void testEnqueueTimeThroughLifeCycle() throws Exception {
txnHandler.markCompacted(ci);
checkEnqueueTime(enqueueTime);
- txnHandler.markCleaned(ci, false);
+ txnHandler.markCleaned(ci);
checkEnqueueTime(enqueueTime);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 113173acb37e..ba90d8549d19 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -85,6 +85,7 @@
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.thrift.TException;
import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,8 +128,9 @@ public abstract class CompactorTest {
private final AtomicBoolean stop = new AtomicBoolean();
private Path tmpdir;
FileSystem fs;
-
+
@Before
+ @BeforeEach
public void setup() throws Exception {
setup(new HiveConf());
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
index b57599113e6e..01625d431129 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
public class TestAbortCleanupUsingCompactionCycle extends TestCleaner {
@Override
- @Before
+ @BeforeEach
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
index bbc72661abf1..77782534ec42 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner {
@Override
- @Before
+ @BeforeEach
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index df4a786a184b..16266c655bfe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -44,9 +44,9 @@
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
import org.mockito.internal.util.reflection.FieldSetter;
import java.util.ArrayList;
@@ -67,7 +67,6 @@
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -108,7 +107,7 @@ public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
- doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
+ doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
Table t = newTable("default", "retry_test", false);
@@ -196,7 +195,7 @@ public void testRetentionAfterFailedCleanup() throws Exception {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
- doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
+ doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
//Do a run to fail the clean and set the retention time
Cleaner cleaner = new Cleaner();
@@ -810,7 +809,7 @@ boolean useHive130DeltaDirName() {
return false;
}
- @After
+ @AfterEach
public void tearDown() throws Exception {
compactorTestCleanup();
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 3b40e2f75944..e0a2a1f5ed32 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -24,7 +24,14 @@
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
@@ -32,13 +39,18 @@
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runInitiator;
import static org.mockito.ArgumentMatchers.any;
public class TestAbortedTxnCleaner extends TestHandler {
@@ -320,4 +332,417 @@ public void testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
List directories = getDirectories(conf, t, null);
Assert.assertEquals(5, directories.size());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+
+ runInitiator(conf);
+ // Initiator must not add anything to compaction_queue
+ String compactionQueuePresence = "SELECT COUNT(*) FROM \"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence));
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence));
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
+ " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" + tableName + "' AND \"CC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " +
+ " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '" + tableName + "' AND \"CTC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+
+ List directories = getDirectories(conf, t, null);
+ // All aborted directories removed, hence 1 committed delta directory must be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_retry_entry", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TxnStore mockedTxnHandler = Mockito.spy(txnHandler);
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, mockedTxnHandler, metadataCache,
+ false, mockedFSRemover));
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedTxnHandler, Mockito.times(1)).setCleanerRetryRetentionTimeOnError(any(CompactionInfo.class));
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)),
+ TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryInfoBeingUsed(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_retry_usage", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ // Set retry retention time as 10 s (10000 ms).
+ long retryRetentionTime = 10000;
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover);
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+
+ // Delay for time specified in retry retention.
+ Thread.sleep(retryRetentionTime);
+
+ Mockito.doAnswer(InvocationOnMock::callRealMethod).when(mockedFSRemover).clean(any());
+
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ // The retry record must be not present since it will deleted due to successful abort cleanup.
+ Assert.assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryWithinRetentionTime(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_retry_nodelay", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover);
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)),
+ TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+
+ Mockito.doAnswer(InvocationOnMock::callRealMethod).when(mockedFSRemover).clean(any());
+
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ // The retry entry is not removed since retry conditions are not achieved hence its not picked for cleanup.
+ scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)),
+ TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryUpdateRetentionTimeWhenFailedTwice(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_retry_retention_time_failed_twice", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ // Set retry retention time as 10 s (10000 ms).
+ long retryRetentionTime = 10000;
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover);
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+
+ // Delay for time specified in retry retention.
+ Thread.sleep(retryRetentionTime);
+
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ // The retry entry must reflect double the retention time now.
+ Assert.assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryUpdateErrorMessageWhenFailedTwice(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_retry_error_msg_failed_twice", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ // Set retry retention time as 10 s (10000 ms).
+ long retryRetentionTime = 10000;
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover);
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing first retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing first retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+
+ // Delay for time specified in retry retention.
+ Thread.sleep(retryRetentionTime);
+
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing second retry");
+ }).when(mockedFSRemover).clean(any());
+
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing second retry"));
+ // The retry entry must reflect double the retention time now.
+ Assert.assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testZeroRetryRetentionTimeForAbortCleanup(boolean isPartitioned) throws Exception {
+ String dbName = "default", tableName = "handler_zero_retryretention", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 0, TimeUnit.MILLISECONDS);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover);
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName)
+ && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) &&
+ "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP &&
+ scre.getErrorMessage().equalsIgnoreCase("Testing retry"));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause;
+ Assert.assertEquals(Integer.toString(0), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false)
+ .replace("\n", "").trim());
+
+ Mockito.doAnswer(InvocationOnMock::callRealMethod).when(mockedFSRemover).clean(any());
+
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(taskHandler));
+ cleaner.run();
+
+ // The retry entry should be removed since retry conditions are achieved because retry retention time is 0.
+ Assert.assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize());
+ }
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 38ab1894ebcc..b3d17d959abd 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -239,14 +239,16 @@ std::string to_string(const LockType::type& val) {
int _kCompactionTypeValues[] = {
CompactionType::MINOR,
CompactionType::MAJOR,
- CompactionType::REBALANCE
+ CompactionType::REBALANCE,
+ CompactionType::ABORT_TXN_CLEANUP
};
const char* _kCompactionTypeNames[] = {
"MINOR",
"MAJOR",
- "REBALANCE"
+ "REBALANCE",
+ "ABORT_TXN_CLEANUP"
};
-const std::map _CompactionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kCompactionTypeValues, _kCompactionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
+const std::map _CompactionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kCompactionTypeValues, _kCompactionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
std::ostream& operator<<(std::ostream& out, const CompactionType::type& val) {
std::map::const_iterator it = _CompactionType_VALUES_TO_NAMES.find(val);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 3223c3340354..6122d532f689 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -127,7 +127,8 @@ struct CompactionType {
enum type {
MINOR = 1,
MAJOR = 2,
- REBALANCE = 3
+ REBALANCE = 3,
+ ABORT_TXN_CLEANUP = 4
};
};
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
index bfa7b27dd66c..0dabc2868c73 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
@@ -11,7 +11,8 @@
public enum CompactionType implements org.apache.thrift.TEnum {
MINOR(1),
MAJOR(2),
- REBALANCE(3);
+ REBALANCE(3),
+ ABORT_TXN_CLEANUP(4);
private final int value;
@@ -39,6 +40,8 @@ public static CompactionType findByValue(int value) {
return MAJOR;
case 3:
return REBALANCE;
+ case 4:
+ return ABORT_TXN_CLEANUP;
default:
return null;
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php
index 9c62a8e4696a..083d81ee0614 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php
@@ -24,10 +24,13 @@ final class CompactionType
const REBALANCE = 3;
+ const ABORT_TXN_CLEANUP = 4;
+
static public $__names = array(
1 => 'MINOR',
2 => 'MAJOR',
3 => 'REBALANCE',
+ 4 => 'ABORT_TXN_CLEANUP',
);
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 50a6c5fe8088..6625b0e283e8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -156,17 +156,20 @@ class CompactionType(object):
MINOR = 1
MAJOR = 2
REBALANCE = 3
+ ABORT_TXN_CLEANUP = 4
_VALUES_TO_NAMES = {
1: "MINOR",
2: "MAJOR",
3: "REBALANCE",
+ 4: "ABORT_TXN_CLEANUP",
}
_NAMES_TO_VALUES = {
"MINOR": 1,
"MAJOR": 2,
"REBALANCE": 3,
+ "ABORT_TXN_CLEANUP": 4,
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 9d864db0d059..33206ae1f213 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -71,8 +71,9 @@ module CompactionType
MINOR = 1
MAJOR = 2
REBALANCE = 3
- VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "REBALANCE"}
- VALID_VALUES = Set.new([MINOR, MAJOR, REBALANCE]).freeze
+ ABORT_TXN_CLEANUP = 4
+ VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "REBALANCE", 4 => "ABORT_TXN_CLEANUP"}
+ VALID_VALUES = Set.new([MINOR, MAJOR, REBALANCE, ABORT_TXN_CLEANUP]).freeze
end
module GrantRevokeType
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 29c4b5774bea..112125c955dd 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -651,7 +651,7 @@ public enum ConfVars {
COMPACTOR_CLEANER_TABLECACHE_ON("metastore.compactor.cleaner.tablecache.on",
"hive.compactor.cleaner.tablecache.on", true,
"Enable table caching in the cleaner. Currently the cache is cleaned after each cycle."),
- COMPACTOR_CLEAN_ABORTS_USING_CLEANER("metastore.compactor.clean.aborts.using.cleaner", "hive.compactor.clean.aborts.using.cleaner", false,
+ COMPACTOR_CLEAN_ABORTS_USING_CLEANER("metastore.compactor.clean.aborts.using.cleaner", "hive.compactor.clean.aborts.using.cleaner", true,
"Whether to use cleaner for cleaning aborted directories or not.\n" +
"Set to true when cleaner is expected to clean delta/delete-delta directories from aborted transactions.\n" +
"Otherwise the cleanup of such directories will take place within the compaction cycle."),
diff --git a/standalone-metastore/metastore-common/src/main/protobuf/org/apache/hadoop/hive/metastore/hive_metastore.proto b/standalone-metastore/metastore-common/src/main/protobuf/org/apache/hadoop/hive/metastore/hive_metastore.proto
index 3f3787aba9c1..34e6a273dd32 100644
--- a/standalone-metastore/metastore-common/src/main/protobuf/org/apache/hadoop/hive/metastore/hive_metastore.proto
+++ b/standalone-metastore/metastore-common/src/main/protobuf/org/apache/hadoop/hive/metastore/hive_metastore.proto
@@ -2115,6 +2115,8 @@ enum CompactionType {
COMPACTION_TYPE_UNSPECIFIED = 0;
COMPACTION_TYPE_MINOR = 1;
COMPACTION_TYPE_MAJOR = 2;
+ COMPACTION_TYPE_REBALANCE = 3;
+ COMPACTION_TYPE_ABORT_TXN_CLEANUP = 4;
}
enum DataOperationType {
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 63936d299850..c09811ed97bb 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -202,6 +202,7 @@ enum CompactionType {
MINOR = 1,
MAJOR = 2,
REBALANCE = 3,
+ ABORT_TXN_CLEANUP = 4,
}
enum GrantRevokeType {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 206d690b79d9..3c9e5e8ff851 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -8818,7 +8818,7 @@ public OptionalCompactionInfoStruct find_next_compact2(FindNextCompactRequest rq
@Override
public void mark_cleaned(CompactionInfoStruct cr) throws MetaException {
- getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr), false);
+ getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr));
}
@Override
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index dda975f17b43..b238a98f44aa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -67,6 +67,7 @@ public class CompactionInfo implements Comparable {
public String poolName;
public int numberOfBuckets = 0;
public String orderByClause;
+ public long minOpenWriteTxnId = 0;
/**
* The highest write id that the compaction job will pay attention to.
@@ -178,6 +179,7 @@ public String toString() {
.append("poolName", poolName)
.append("numberOfBuckets", numberOfBuckets)
.append("orderByClause", orderByClause)
+ .append("minOpenWriteTxnId", minOpenWriteTxnId)
.build();
}
@@ -351,4 +353,8 @@ public void setWriteIds(boolean hasUncompactedAborts, Set writeIds) {
this.hasUncompactedAborts = hasUncompactedAborts;
this.writeIds = writeIds;
}
+
+ public boolean isAbortedTxnCleanup() {
+ return type == CompactionType.ABORT_TXN_CLEANUP;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 83ceacbc30c5..4955f62d58b3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -40,6 +40,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -50,6 +51,7 @@
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
+import static org.apache.hadoop.hive.metastore.txn.TxnUtils.thriftCompactionType2DbType;
/**
* Extends the transaction handler with methods needed only by the compactor threads. These
@@ -92,6 +94,54 @@ class CompactionTxnHandler extends TxnHandler {
"DELETE FROM \"TXNS\" WHERE \"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") " +
"AND (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") " +
"AND \"TXN_ID\" < ?";
+
+ // Three inner sub-queries which are under left-join to fetch the required data for aborted txns.
+ private static final String SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY =
+ "SELECT " +
+ " \"res1\".\"TC_DATABASE\" AS \"DB\", \"res1\".\"TC_TABLE\" AS \"TBL\", \"res1\".\"TC_PARTITION\" AS \"PART\", " +
+ " \"res1\".\"MIN_TXN_START_TIME\" AS \"MIN_TXN_START_TIME\", \"res1\".\"ABORTED_TXN_COUNT\" AS \"ABORTED_TXN_COUNT\", " +
+ " \"res2\".\"MIN_OPEN_WRITE_TXNID\" AS \"MIN_OPEN_WRITE_TXNID\", \"res3\".\"RETRY_RETENTION\" AS \"RETRY_RETENTION\", " +
+ " \"res3\".\"ID\" AS \"RETRY_CQ_ID\" " +
+ " FROM " +
+ // First sub-query - Gets the aborted txns with min txn start time, number of aborted txns
+ // for corresponding db, table, partition.
+ " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
+ " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED +
+ " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " +
+ " LEFT JOIN" +
+ // Second sub-query - Gets the min open txn id for corresponding db, table, partition.
+ "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " +
+ " FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.OPEN +
+ " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"res2\"" +
+ " ON \"res1\".\"TC_DATABASE\" = \"res2\".\"TC_DATABASE\"" +
+ " AND \"res1\".\"TC_TABLE\" = \"res2\".\"TC_TABLE\"" +
+ " AND (\"res1\".\"TC_PARTITION\" = \"res2\".\"TC_PARTITION\" " +
+ " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res2\".\"TC_PARTITION\" IS NULL)) " +
+ " LEFT JOIN " +
+ // Third sub-query - Gets the retry entries for corresponding db, table, partition.
+ "( SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", MAX(\"CQ_ID\") AS \"ID\", " +
+ " MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RETENTION\", " +
+ " MIN(\"CQ_COMMIT_TIME\") - %s + MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RECORD_CHECK\" FROM \"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_TYPE\" = " + quoteChar(TxnStore.ABORT_TXN_CLEANUP_TYPE) +
+ " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"res3\" " +
+ " ON \"res1\".\"TC_DATABASE\" = \"res3\".\"CQ_DATABASE\" " +
+ " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
+ " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
+ " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" +
+ " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
+
+ private static final String DELETE_ABORT_RETRY_ENTRIES_FROM_COMPACTION_QUEUE =
+ "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_DATABASE\" = ? " +
+ " AND \"CQ_TABLE\" = ? AND (\"CQ_PARTITION\" = ? OR \"CQ_PARTITION\" IS NULL) AND \"CQ_TYPE\" = " +
+ quoteChar(TxnStore.ABORT_TXN_CLEANUP_TYPE);
+
+ private static final String INSERT_ABORT_RETRY_ENTRY_INTO_COMPACTION_QUEUE =
+ "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " +
+ " \"CQ_TYPE\", \"CQ_STATE\", \"CQ_RETRY_RETENTION\", \"CQ_ERROR_MESSAGE\", \"CQ_COMMIT_TIME\") " +
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s)";
+
public CompactionTxnHandler() {
}
@@ -255,7 +305,7 @@ public CompactionInfo findNextToCompact(FindNextCompactRequest rqst) throws Meta
StringBuilder sb = new StringBuilder();
sb.append("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " +
"\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\", " +
- "\"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "' AND ");
+ "\"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " + quoteChar(INITIATED_STATE) + " AND ");
boolean hasPoolName = StringUtils.isNotBlank(rqst.getPoolName());
if(hasPoolName) {
sb.append("\"CQ_POOL_NAME\"=?");
@@ -399,7 +449,8 @@ public List findReadyToClean(long minOpenTxnWaterMark, long rete
* By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see
* the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction.
*/
- String whereClause = " WHERE \"CQ_STATE\" = " + quoteChar(READY_FOR_CLEANING) +
+ String whereClause = " WHERE \"CQ_STATE\" = " + quoteChar(READY_FOR_CLEANING) +
+ " AND \"CQ_TYPE\" != " + quoteChar(TxnStore.ABORT_TXN_CLEANUP_TYPE) +
" AND (\"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - \"CQ_RETRY_RETENTION\" - " + retentionTime + ") OR \"CQ_COMMIT_TIME\" IS NULL)";
String queryStr =
@@ -478,17 +529,8 @@ public List findReadyToCleanAborts(long abortedTimeThreshold, in
try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
Statement stmt = dbConn.createStatement()) {
boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
- String sCheckAborted = "SELECT \"tc\".\"TC_DATABASE\", \"tc\".\"TC_TABLE\", \"tc\".\"TC_PARTITION\", " +
- " \"tc\".\"MIN_TXN_START_TIME\", \"tc\".\"ABORTED_TXN_COUNT\", \"minOpenWriteTxnId\".\"MIN_OPEN_WRITE_TXNID\" FROM " +
- " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
- " MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED +
- " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
- (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold) + " ) \"tc\" " +
- " LEFT JOIN ( SELECT MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\"=" + TxnStatus.OPEN + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"minOpenWriteTxnId\" " +
- " ON \"tc\".\"TC_DATABASE\" = \"minOpenWriteTxnId\".\"TC_DATABASE\" AND \"tc\".\"TC_TABLE\" = \"minOpenWriteTxnId\".\"TC_TABLE\"" +
- " AND (\"tc\".\"TC_PARTITION\" = \"minOpenWriteTxnId\".\"TC_PARTITION\" OR (\"tc\".\"TC_PARTITION\" IS NULL AND \"minOpenWriteTxnId\".\"TC_PARTITION\" IS NULL))";
+ String sCheckAborted = String.format(SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY,
+ checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold, getEpochFn(dbProduct));
LOG.debug("Going to execute query <{}>", sCheckAborted);
try (ResultSet rs = stmt.executeQuery(sCheckAborted)) {
@@ -503,7 +545,12 @@ public List findReadyToCleanAborts(long abortedTimeThreshold, in
info.tableName = rs.getString(2);
info.partName = rs.getString(3);
// In this case, this field contains min open write txn ID.
- info.txnId = rs.getLong(6);
+ info.minOpenWriteTxnId = rs.getLong(6) > 0 ? rs.getLong(6) : Long.MAX_VALUE;
+ // The specific type, state assigned to abort cleanup.
+ info.type = CompactionType.ABORT_TXN_CLEANUP;
+ info.state = READY_FOR_CLEANING;
+ info.retryRetention = rs.getLong(7);
+ info.id = rs.getLong(8);
readyToCleanAborts.add(info);
}
}
@@ -511,7 +558,7 @@ public List findReadyToCleanAborts(long abortedTimeThreshold, in
return readyToCleanAborts;
} catch (SQLException e) {
LOG.error("Unable to select next element for cleaning, " + e.getMessage());
- checkRetryable(e, "findReadyToCleanForAborts");
+ checkRetryable(e, "findReadyToCleanAborts");
throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
@@ -619,7 +666,7 @@ private void setCleanerStart(Connection dbConn, CompactionInfo info, Long timest
*/
@Override
@RetrySemantics.CannotRetry
- public void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException {
+ public void markCleaned(CompactionInfo info) throws MetaException {
LOG.debug("Running markCleaned with CompactionInfo: {}", info);
try {
Connection dbConn = null;
@@ -627,7 +674,7 @@ public void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaExc
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
- if (!isAbortOnly) {
+ if (!info.isAbortedTxnCleanup()) {
String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
+ "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
+ "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
@@ -694,13 +741,13 @@ public void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaExc
LOG.error("Unable to delete from compaction queue " + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- checkRetryable(e, "markCleaned(" + info + "," + isAbortOnly + ")");
+ checkRetryable(e, "markCleaned(" + info + ")");
throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs, pStmt, dbConn);
}
} catch (RetryException e) {
- markCleaned(info, isAbortOnly);
+ markCleaned(info);
}
}
@@ -708,6 +755,14 @@ private void removeTxnComponents(Connection dbConn, CompactionInfo info) throws
PreparedStatement pStmt = null;
ResultSet rs = null;
try {
+ /*
+ * Remove all abort retry associated metadata of table/partition in the COMPACTION_QUEUE both when compaction
+ * or abort cleanup is successful. We don't want a situation wherein we have a abort retry entry for a table
+ * but no corresponding entry in TXN_COMPONENTS table. Successful compaction will delete
+ * the retry metadata, so that abort cleanup is retried again (an optimistic retry approach).
+ */
+ removeAbortRetryEntries(dbConn, info);
+
/*
* compaction may remove data from aborted txns above tc_writeid bit it only guarantees to
* remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about
@@ -935,7 +990,7 @@ public void removeDuplicateCompletedTxnComponents() throws MetaException {
* The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally.
* The reason such aborted txns exist can be that now work was done in this txn
* (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work)
- * or due to {@link #markCleaned(CompactionInfo, boolean)} being called.
+ * or due to {@link #markCleaned(CompactionInfo)} being called.
*/
@Override
@RetrySemantics.SafeToRetry
@@ -1240,6 +1295,77 @@ private static boolean timedOut(CompactionInfo ci, RetentionCounters rc, long pa
&& (rc.hasSucceededMajorCompaction || (rc.hasSucceededMinorCompaction && ci.type == CompactionType.MINOR));
}
+ private void removeAbortRetryEntries(Connection dbConn, CompactionInfo info) throws MetaException, RetryException {
+ LOG.debug("Going to execute update <{}>", DELETE_ABORT_RETRY_ENTRIES_FROM_COMPACTION_QUEUE);
+ try (PreparedStatement pStmt = dbConn.prepareStatement(DELETE_ABORT_RETRY_ENTRIES_FROM_COMPACTION_QUEUE)) {
+ pStmt.setString(1, info.dbname);
+ pStmt.setString(2, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(3, info.partName);
+ } else {
+ // Since the type of 'CQ_PARTITION' column is varchar.
+ // Hence, setting null for VARCHAR type.
+ pStmt.setNull(3, Types.VARCHAR);
+ }
+ int rc = pStmt.executeUpdate();
+ LOG.debug("Removed {} records in COMPACTION_QUEUE", rc);
+ } catch (SQLException e) {
+ LOG.error("Unable to delete abort retry entries from COMPACTION_QUEUE due to {}", e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "removeAbortRetryEntries(" + info + ")");
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
+ }
+ }
+
+ private void insertAbortRetryRetentionTimeOnError(Connection dbConn, CompactionInfo info) throws MetaException, SQLException {
+ String query = String.format(INSERT_ABORT_RETRY_ENTRY_INTO_COMPACTION_QUEUE, getEpochFn(dbProduct));
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try (PreparedStatement pStmt = dbConn.prepareStatement(query);
+ Statement stmt = dbConn.createStatement()) {
+ lockInternal();
+ /**
+ * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 entry in
+ * Initiated/Working state for any resource. This ensures that we don't run concurrent
+ * compactions for any resource.
+ */
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
+ long id = generateCompactionQueueId(stmt);
+ pStmt.setLong(1, id);
+ pStmt.setString(2, info.dbname);
+ pStmt.setString(3, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(4, info.partName);
+ } else {
+ // Since the type of 'CQ_PARTITION' column is varchar.
+ // Hence, setting null for VARCHAR type.
+ pStmt.setNull(4, Types.VARCHAR);
+ }
+ pStmt.setString(5, Character.toString(thriftCompactionType2DbType(info.type)));
+ pStmt.setString(6, Character.toString(info.state));
+ pStmt.setLong(7, info.retryRetention);
+ pStmt.setString(8, info.errorMessage);
+ int updCnt = pStmt.executeUpdate();
+ if (updCnt == 0) {
+ LOG.error("Unable to update/insert compaction queue record: {}. updCnt={}", info, updCnt);
+ dbConn.rollback();
+ throw new MetaException("Unable to insert abort retry entry into COMPACTION QUEUE: " +
+ " CQ_DATABASE=" + info.dbname + ", CQ_TABLE=" + info.tableName + ", CQ_PARTITION" + info.partName);
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue: {}", e.getMessage());
+ rollbackDBConn(dbConn);
+ throw e;
+ } finally {
+ if (handle != null) {
+ handle.releaseLocks();
+ }
+ unlockInternal();
+ }
+ }
+
/**
* For any given compactable entity (partition; table if not partitioned) the history of compactions
* may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
@@ -1525,35 +1651,38 @@ public void markRefused(CompactionInfo info) throws MetaException {
updateStatus(info);
}
-
@Override
@RetrySemantics.CannotRetry
public void setCleanerRetryRetentionTimeOnError(CompactionInfo info) throws MetaException {
try {
try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
- try (PreparedStatement stmt = dbConn.prepareStatement("UPDATE \"COMPACTION_QUEUE\" " +
- "SET \"CQ_RETRY_RETENTION\" = ?, \"CQ_ERROR_MESSAGE\"= ? WHERE \"CQ_ID\" = ?")) {
- stmt.setLong(1, info.retryRetention);
- stmt.setString(2, info.errorMessage);
- stmt.setLong(3, info.id);
- int updCnt = stmt.executeUpdate();
- if (updCnt != 1) {
- LOG.error("Unable to update compaction queue record: {}. updCnt={}", info, updCnt);
- dbConn.rollback();
- throw new MetaException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+ if (info.isAbortedTxnCleanup() && info.id == 0) {
+ insertAbortRetryRetentionTimeOnError(dbConn, info);
+ } else {
+ try (PreparedStatement stmt = dbConn.prepareStatement("UPDATE \"COMPACTION_QUEUE\" " +
+ "SET \"CQ_RETRY_RETENTION\" = ?, \"CQ_ERROR_MESSAGE\"= ? WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, info.retryRetention);
+ stmt.setString(2, info.errorMessage);
+ stmt.setLong(3, info.id);
+ int updCnt = stmt.executeUpdate();
+ if (updCnt != 1) {
+ LOG.error("Unable to update compaction queue record: {}. updCnt={}", info, updCnt);
+ dbConn.rollback();
+ throw new MetaException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue: " + e.getMessage());
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "insertOrSetCleanerRetryRetentionTimeOnError(" + info + ")");
+ throw new MetaException("Unable to update compaction queue: " +
+ e.getMessage());
}
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to update compaction queue: " + e.getMessage());
- rollbackDBConn(dbConn);
- checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info + ")");
- throw new MetaException("Unable to update compaction queue: " +
- e.getMessage());
}
} catch (SQLException e) {
LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
- checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info + ")");
+ checkRetryable(e, "insertOrSetCleanerRetryRetentionTimeOnError(" + info + ")");
throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index fa194c11b314..77be31dbced8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -6084,12 +6084,12 @@ static String quoteChar(char c) {
* Select ... For Update to sequence operations properly. In practice that means when running
* with Derby database. See more notes at class level.
*/
- private void lockInternal() {
+ protected void lockInternal() {
if(dbProduct.isDERBY()) {
derbyLock.lock();
}
}
- private void unlockInternal() {
+ protected void unlockInternal() {
if(dbProduct.isDERBY()) {
derbyLock.unlock();
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index cc8f9d94a26d..7be1b0cc52b1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionAbortedException;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
@@ -125,6 +124,7 @@ enum MUTEX_KEY {
char MAJOR_TYPE = 'a';
char MINOR_TYPE = 'i';
char REBALANCE_TYPE = 'r';
+ char ABORT_TXN_CLEANUP_TYPE = 'c';
String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE, WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE,
SUCCEEDED_RESPONSE, DID_NOT_INITIATE_RESPONSE, REFUSED_RESPONSE };
@@ -550,10 +550,9 @@ Set findPotentialCompactions(int abortedThreshold, long abortedT
* it has been compacted.
*
* @param info info on the compaction entry to remove
- * @param isAbortOnly whether to cleanup only abort related cleanup information
*/
@RetrySemantics.CannotRetry
- void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException;
+ void markCleaned(CompactionInfo info) throws MetaException;
/**
* Mark a compaction entry as failed. This will move it to the compaction history queue with a
@@ -575,7 +574,7 @@ Set findPotentialCompactions(int abortedThreshold, long abortedT
/**
* Stores the value of {@link CompactionInfo#retryRetention} and {@link CompactionInfo#errorMessage} fields
- * of the CompactionInfo in the HMS database.
+ * of the CompactionInfo either by inserting or updating the fields in the HMS database.
* @param info The {@link CompactionInfo} object holding the values.
* @throws MetaException
*/
@@ -598,7 +597,7 @@ Set findPotentialCompactions(int abortedThreshold, long abortedT
/**
* Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such
* txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and
- * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo, boolean)} being called,
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called,
* or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window.
*/
@RetrySemantics.SafeToRetry
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 704956a6f460..1eedef276a36 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -586,6 +586,8 @@ public static CompactionType dbCompactionType2ThriftType(char dbValue) throws Me
return CompactionType.MINOR;
case TxnStore.REBALANCE_TYPE:
return CompactionType.REBALANCE;
+ case TxnStore.ABORT_TXN_CLEANUP_TYPE:
+ return CompactionType.ABORT_TXN_CLEANUP;
default:
throw new MetaException("Unexpected compaction type " + dbValue);
}
@@ -599,6 +601,8 @@ public static Character thriftCompactionType2DbType(CompactionType ct) throws Me
return TxnStore.MINOR_TYPE;
case REBALANCE:
return TxnStore.REBALANCE_TYPE;
+ case ABORT_TXN_CLEANUP:
+ return TxnStore.ABORT_TXN_CLEANUP_TYPE;
default:
throw new MetaException("Unexpected compaction type " + ct);
}