Skip to content

Commit

Permalink
Revert "HIVE-27637: Compare highest write ID of compaction records wh…
Browse files Browse the repository at this point in the history
…en tryin…"

This reverts commit f343969.
  • Loading branch information
SourabhBadhya committed Feb 2, 2024
1 parent 0a133b3 commit 81eafd4
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,50 +1056,6 @@ public void testFindReadyToCleanAborts_limitFetchSize() throws Exception {
assertEquals(1, potentials.size());
}

@Test
public void testFindReadyToCleanAborts() throws Exception {
long txnId = openTxn();

List<LockComponent> components = new ArrayList<>();
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE));

allocateTableWriteIds("mydb", "mytable", txnId);
allocateTableWriteIds("mydb", "yourtable", txnId);

LockRequest req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnId);
LockResponse res = txnHandler.lock(req);
assertSame(res.getState(), LockState.ACQUIRED);

txnHandler.abortTxn(new AbortTxnRequest((txnId)));

txnId = openTxn();
components = new ArrayList<>();
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
allocateTableWriteIds("mydb", "mytable", txnId);

req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnId);
res = txnHandler.lock(req);
assertSame(res.getState(), LockState.ACQUIRED);

CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR);
rqst.setPartitionname("mypartition=myvalue");
txnHandler.compact(rqst);

CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
ci.highestWriteId = 41;
txnHandler.updateCompactorState(ci, 0);

List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0);
assertEquals(1, potentials.size());
CompactionInfo potentialToCleanAbort = potentials.get(0);
assertEquals("mydb", potentialToCleanAbort.dbname);
assertEquals("yourtable", potentialToCleanAbort.tableName);
}

private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
FindNextCompactRequest request = new FindNextCompactRequest();
request.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -284,21 +282,8 @@ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception {
cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.run();

Mockito.verifyNoInteractions(mockedFSRemover);
Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
String compactionQueuePresence = "SELECT COUNT(*) FROM \"COMPACTION_QUEUE\" " +
" WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\" IS NULL";
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence));

directories = getDirectories(conf, t, null);
// Both base and delta files are present since the cleaner skips them as there is a newer write.
Assert.assertEquals(5, directories.size());
Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());

// Run compaction and clean up
startInitiator();
startWorker();
startCleaner();

directories = getDirectories(conf, t, null);
// The table is already compacted, so we must see 1 base delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf
// First sub-query - Gets the aborted txns with min txn start time, number of aborted txns
// for corresponding db, table, partition.
" ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
" MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
" COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
" WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState" +
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " +
" LEFT JOIN" +
Expand All @@ -76,15 +76,7 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf
" AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
" AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
" OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" +
" WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" +
" AND NOT EXISTS (SELECT 1 " +
" FROM \"COMPACTION_QUEUE\" AS \"cq\" " +
" WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND \"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" +
" AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" +
" OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\" IS NULL))" +
" AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" > \"res1\".\"MAX_ABORTED_WRITE_ID\"" +
" AND \"cq\".\"CQ_STATE\" " +
" IN ('i', 'w', 'r'))";
" WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL";

private final long abortedTimeThreshold;
private final int abortedThreshold;
Expand Down

0 comments on commit 81eafd4

Please sign in to comment.