Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -1128,7 +1127,7 @@ public void testAbortAfterMarkCleaned() throws Exception {
doAnswer(invocationOnMock -> {
connection2.abortTransaction();
return invocationOnMock.callRealMethod();
}).when(mockedTxnHandler).markCleaned(any(), eq(false));
}).when(mockedTxnHandler).markCleaned(any());

MetadataCache metadataCache = new MetadataCache(false);
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,

/**
The following cleanup is based on the following idea - <br>
1. Aborted cleanup is independent of compaction. This is because directories which are written by
aborted txns are not visible by any open txns. It is only visible while determining the AcidState (which
only sees the aborted deltas and does not read the file).<br><br>
Aborted cleanup is independent of compaction. This is because directories which are written by
aborted txns are not visible by any open txns in most cases. The one case, wherein abort deltas
are visible for open txns are streaming aborts wherein, a single delta file can have writes
from committed and aborted txns. These deltas are also referred to as uncompacted aborts.
In such cases, we do not delete uncompacted aborts or its associated metadata.

The following algorithm is used to clean the set of aborted directories - <br>
a. Find the list of entries which are suitable for cleanup (This is done in {@link TxnStore#findReadyToCleanAborts(long, int)}).<br>
b. If the table/partition does not exist, then remove the associated aborted entry in TXN_COMPONENTS table. <br>
c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID <br>
d. Fetch the aborted directories and delete the directories. <br>
c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID.
The construction of AcidState helps in identifying obsolete/aborted directories in the table/partition. <br>
d. Fetch the obsolete/aborted directories and delete the directories using the AcidState. <br>
e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table.
**/
@Override
Expand All @@ -79,8 +82,8 @@ public List<Runnable> getTasks() throws MetaException {
List<CompactionInfo> readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);

if (!readyToCleanAborts.isEmpty()) {
return readyToCleanAborts.stream().map(ci -> ThrowingRunnable.unchecked(() ->
clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE, metricsEnabled)))
return readyToCleanAborts.stream().map(info -> ThrowingRunnable.unchecked(() ->
clean(info, info.minOpenWriteTxnId, metricsEnabled)))
.collect(Collectors.toList());
}
return Collections.emptyList();
Expand All @@ -99,7 +102,7 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
txnHandler.markCleaned(info, true);
txnHandler.markCleaned(info);
return;
}
if (!isNull(info.partName)) {
Expand All @@ -108,7 +111,7 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped.",
info.getFullPartitionName());
txnHandler.markCleaned(info, true);
txnHandler.markCleaned(info);
return;
}
}
Expand All @@ -118,10 +121,14 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
abortCleanUsingAcidDir(info, location, minOpenWriteTxn);

} catch (InterruptedException e) {
LOG.error("Caught an interrupted exception when cleaning, unable to complete cleaning of {} due to {}", info,
e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw e;
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw new MetaException(e.getMessage());
} finally {
if (metricsEnabled) {
Expand Down Expand Up @@ -149,7 +156,7 @@ private void abortCleanUsingAcidDir(CompactionInfo info, String location, long m

boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
txnHandler.markCleaned(info, false);
txnHandler.markCleaned(info);
} else {
LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static java.util.Objects.isNull;

/**
Expand Down Expand Up @@ -122,7 +118,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
idWatermark(ci));
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
Expand All @@ -137,7 +133,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped. {}",
ci.getFullPartitionName(), idWatermark(ci));
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
Expand Down Expand Up @@ -168,11 +164,10 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
e.getMessage());
ci.errorMessage = e.getMessage();
if (metricsEnabled) {
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
}
handleCleanerAttemptFailure(ci);
handleCleanerAttemptFailure(ci, e.getMessage());
} finally {
if (metricsEnabled) {
perfLogger.perfLogEnd(CompactionCleaner.class.getName(), cleanerMetric);
Expand Down Expand Up @@ -204,7 +199,7 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires
deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
}
if (!deleted.isEmpty()) {
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
}
Expand Down Expand Up @@ -253,7 +248,7 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT

boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
Expand Down Expand Up @@ -302,22 +297,6 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V
return validWriteIdList;
}

private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
int cleanAttempts = 0;
if (ci.retryRetention > 0) {
cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
}
if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
//Mark it as failed if the max attempt threshold is reached.
txnHandler.markFailed(ci);
} else {
//Calculate retry retention time and update record.
ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
txnHandler.setCleanerRetryRetentionTimeOnError(ci);
}
}

private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.commons.collections.ListUtils.subtract;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;

/**
* An abstract class which defines the list of utility methods for performing cleanup activities.
Expand All @@ -63,6 +68,7 @@ public abstract class TaskHandler {
protected final boolean metricsEnabled;
protected final MetadataCache metadataCache;
protected final FSRemover fsRemover;
protected final long defaultRetention;

TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
boolean metricsEnabled, FSRemover fsRemover) {
Expand All @@ -71,6 +77,7 @@ public abstract class TaskHandler {
this.metadataCache = metadataCache;
this.metricsEnabled = metricsEnabled;
this.fsRemover = fsRemover;
this.defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
}

public abstract List<Runnable> getTasks() throws MetaException;
Expand Down Expand Up @@ -161,4 +168,26 @@ protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String

return success;
}

protected void handleCleanerAttemptFailure(CompactionInfo info, String errorMessage) throws MetaException {
int cleanAttempts = 0;
info.errorMessage = errorMessage;
if (info.isAbortedTxnCleanup()) {
info.retryRetention = info.retryRetention > 0 ? info.retryRetention * 2 : defaultRetention;
info.errorMessage = errorMessage;
txnHandler.setCleanerRetryRetentionTimeOnError(info);
} else {
if (info.retryRetention > 0) {
cleanAttempts = (int) (Math.log(info.retryRetention / defaultRetention) / Math.log(2)) + 1;
}
if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
//Mark it as failed if the max attempt threshold is reached.
txnHandler.markFailed(info);
} else {
//Calculate retry retention time and update record.
info.retryRetention = (long) Math.pow(2, cleanAttempts) * defaultRetention;
txnHandler.setCleanerRetryRetentionTimeOnError(info);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public List<TaskHandler> getHandlers(HiveConf conf, TxnStore txnHandler, Metadat
boolean metricsEnabled, FSRemover fsRemover) {
boolean useAbortHandler = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
List<TaskHandler> taskHandlers = new ArrayList<>();
taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
metricsEnabled, fsRemover));
if (useAbortHandler) {
taskHandlers.add(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
metricsEnabled, fsRemover));
}
taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
metricsEnabled, fsRemover));
return taskHandlers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testMarkCleaned() throws Exception {
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());

Expand Down Expand Up @@ -529,7 +529,7 @@ private void addSucceededCompaction(String dbName, String tableName, String part
txnHandler.compact(rqst);
ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
}

private void addWaitingForCleaningCompaction(String dbName, String tableName, CompactionType type,
Expand Down Expand Up @@ -866,7 +866,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents()
Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);

// Check that we are cleaning up the empty aborted transactions
GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
Expand All @@ -892,7 +892,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents()

toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);

txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
// The open txn will became the low water mark
Expand Down Expand Up @@ -999,7 +999,7 @@ public void testEnqueueTimeThroughLifeCycle() throws Exception {
txnHandler.markCompacted(ci);
checkEnqueueTime(enqueueTime);

txnHandler.markCleaned(ci, false);
txnHandler.markCleaned(ci);
checkEnqueueTime(enqueueTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -127,8 +128,9 @@ public abstract class CompactorTest {
private final AtomicBoolean stop = new AtomicBoolean();
private Path tmpdir;
FileSystem fs;

@Before
@BeforeEach
public void setup() throws Exception {
setup(new HiveConf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;

public class TestAbortCleanupUsingCompactionCycle extends TestCleaner {
@Override
@Before
@BeforeEach
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;

public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner {
@Override
@Before
@BeforeEach
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.mockito.internal.util.reflection.FieldSetter;

import java.util.ArrayList;
Expand All @@ -67,7 +67,6 @@
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -108,7 +107,7 @@ public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));

Table t = newTable("default", "retry_test", false);

Expand Down Expand Up @@ -196,7 +195,7 @@ public void testRetentionAfterFailedCleanup() throws Exception {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));

//Do a run to fail the clean and set the retention time
Cleaner cleaner = new Cleaner();
Expand Down Expand Up @@ -810,7 +809,7 @@ boolean useHive130DeltaDirName() {
return false;
}

@After
@AfterEach
public void tearDown() throws Exception {
compactorTestCleanup();
}
Expand Down
Loading