From 0c320b930bb472d2e28751ecb0a1c3712f2f6963 Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Wed, 27 May 2026 15:49:37 +0200 Subject: [PATCH 1/3] HIVE-29571: ACID Compaction: Marking the compaction as compacted should happen after its txn got committed --- .../service/AcidCompactionService.java | 42 ++- .../hive/ql/txn/compactor/TestWorker.java | 354 +++++++++++------- 2 files changed, 250 insertions(+), 146 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index d1c7e3972d03..c9be4f823314 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory; import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline; import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable; import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.Ref; @@ -211,32 +212,27 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { // Don't start compaction or cleaning if not necessary if (isDynPartAbort(table, ci)) { - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - compactionTxn.wasSuccessful(); + compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); return false; } dir = getAcidStateForWorker(ci, sd, tblValidWriteIds); if (!isEnoughToCompact(ci, dir, sd)) { if (needsCleaning(dir, sd)) { - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); } else { // do nothing ci.errorMessage = "None of the compaction thresholds met, compaction request is refused!"; LOG.debug(ci.errorMessage + " Compaction info: {}", ci); - msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); + compactionTxn.onCommit(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); + } - compactionTxn.wasSuccessful(); return false; } if (!ci.isMajorCompaction() && !CompactorUtil.isMinorCompactionSupported(conf, table.getParameters(), dir)) { ci.errorMessage = "Query based Minor compaction is not possible for full acid tables having raw format " + "(non-acid) data in them."; LOG.error(ci.errorMessage + " Compaction info: {}", ci); - try { - msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); - } catch (Throwable tr) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, tr); - } + compactionTxn.onAbort(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); return false; } CompactorUtil.checkInterrupt(CLASS_NAME); @@ -261,8 +257,7 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { LOG.info("Completed " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + compactionTxn + ", marking as compacted."); - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - compactionTxn.wasSuccessful(); + compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, ci.partName, ci.type, dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), conf, msc); @@ -346,7 +341,9 @@ class CompactionTxn implements AutoCloseable { private long lockId = 0; private TxnStatus status = TxnStatus.UNKNOWN; - private boolean successfulCompaction = false; + + private ThrowingRunnable onCommitAction; + private ThrowingRunnable onAbortAction; /** * Try to open a new txn. @@ -380,8 +377,11 @@ private LockRequest createLockRequest(CompactionInfo ci) { /** * Mark compaction as successful. This means the txn will be committed; otherwise it will be aborted. */ - void wasSuccessful() { - this.successfulCompaction = true; + void onCommit(ThrowingRunnable action) { + this.onCommitAction = action; + } + void onAbort(ThrowingRunnable action) { + this.onAbortAction = action; } /** @@ -396,10 +396,18 @@ public void close() throws Exception { //the transaction is about to close, we can stop heartbeating regardless of it's state CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId); } finally { - if (successfulCompaction) { - commit(); + if (onCommitAction != null) { + try { + commit(); + } catch (Exception e) { + abort(); + if (onAbortAction != null) onAbortAction.run(); + throw e; + } + onCommitAction.run(); } else { abort(); + if (onAbortAction != null) onAbortAction.run(); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index bf01034711e3..bc3ce731517c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -39,9 +40,14 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.utils.StringableMap; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -70,8 +76,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PATTERN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -100,9 +110,9 @@ public void stringableMap() throws Exception { // Empty map case StringableMap m = new StringableMap(new HashMap()); String s = m.toString(); - Assert.assertEquals("0:", s); + assertEquals("0:", s); m = new StringableMap(s); - Assert.assertEquals(0, m.size()); + assertEquals(0, m.size()); Map base = new HashMap(); base.put("mary", "poppins"); @@ -111,19 +121,19 @@ public void stringableMap() throws Exception { m = new StringableMap(base); s = m.toString(); m = new StringableMap(s); - Assert.assertEquals(3, m.size()); + assertEquals(3, m.size()); Map saw = new HashMap(3); saw.put("mary", false); saw.put("bert", false); saw.put(null, false); for (Map.Entry e : m.entrySet()) { saw.put(e.getKey(), true); - if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue()); + if ("mary".equals(e.getKey())) assertEquals("poppins", e.getValue()); else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue()); - else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue()); + else if (null == e.getKey()) assertEquals("banks", e.getValue()); else Assert.fail("Unexpected value " + e.getKey()); } - Assert.assertEquals(3, saw.size()); + assertEquals(3, saw.size()); Assert.assertTrue(saw.get("mary")); Assert.assertTrue(saw.get("bert")); Assert.assertTrue(saw.get(null)); @@ -134,9 +144,9 @@ public void stringableList() throws Exception { // Empty list case MRCompactor.StringableList ls = new MRCompactor.StringableList(); String s = ls.toString(); - Assert.assertEquals("0:", s); + assertEquals("0:", s); ls = new MRCompactor.StringableList(s); - Assert.assertEquals(0, ls.size()); + assertEquals(0, ls.size()); ls = new MRCompactor.StringableList(); ls.add(new Path("/tmp")); @@ -145,7 +155,7 @@ public void stringableList() throws Exception { Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s)); ls = new MRCompactor.StringableList(s); - Assert.assertEquals(2, ls.size()); + assertEquals(2, ls.size()); boolean sawTmp = false, sawUsr = false; for (Path p : ls) { if ("/tmp".equals(p.toString())) sawTmp = true; @@ -181,10 +191,10 @@ public void inputSplit() throws Exception { MRCompactor.CompactorInputSplit split = new MRCompactor.CompactorInputSplit(conf, 3, files, new Path(basename), deltas, new HashMap()); - Assert.assertEquals(520L, split.getLength()); + assertEquals(520L, split.getLength()); String[] locations = split.getLocations(); - Assert.assertEquals(1, locations.length); - Assert.assertEquals("localhost", locations[0]); + assertEquals(1, locations.length); + assertEquals("localhost", locations[0]); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buf); @@ -194,12 +204,12 @@ public void inputSplit() throws Exception { DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); split.readFields(in); - Assert.assertEquals(3, split.getBucket()); - Assert.assertEquals(basename, split.getBaseDir().toString()); + assertEquals(3, split.getBucket()); + assertEquals(basename, split.getBaseDir().toString()); deltas = split.getDeltaDirs(); - Assert.assertEquals(2, deltas.length); - Assert.assertEquals(delta1, deltas[0].toString()); - Assert.assertEquals(delta2, deltas[1].toString()); + assertEquals(2, deltas.length); + assertEquals(delta1, deltas[0].toString()); + assertEquals(delta2, deltas[1].toString()); } @Test @@ -234,12 +244,12 @@ public void inputSplitNullBase() throws Exception { DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); split.readFields(in); - Assert.assertEquals(3, split.getBucket()); + assertEquals(3, split.getBucket()); Assert.assertNull(split.getBaseDir()); deltas = split.getDeltaDirs(); - Assert.assertEquals(2, deltas.length); - Assert.assertEquals(delta1, deltas[0].toString()); - Assert.assertEquals(delta2, deltas[1].toString()); + assertEquals(2, deltas.length); + assertEquals(delta1, deltas[0].toString()); + assertEquals(delta2, deltas[1].toString()); } @Test @@ -264,7 +274,7 @@ public void sortedTable() throws Exception { // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); } @Test @@ -291,7 +301,7 @@ public void sortedPartition() throws Exception { // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); } @Test @@ -312,13 +322,13 @@ public void minorTableWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -326,20 +336,20 @@ public void minorTableWithBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); @@ -372,20 +382,20 @@ public void minorWithOpenInMiddle() throws Exception { // since compaction was not run, state should not be "ready for cleaning" but "refused" ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState()); // There should still be 4 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(toString(stat), 4, stat.length); + assertEquals(toString(stat), 4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); + assertEquals("base_20", stat[0].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); } @Test @@ -407,22 +417,22 @@ public void minorWithAborted() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 6 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(6, stat.length); + assertEquals(6, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); + assertEquals("base_20", stat[0].getPath().getName()); + assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); } @Test @@ -444,13 +454,13 @@ public void minorPartitionWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -458,20 +468,20 @@ public void minorPartitionWithBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } @@ -496,13 +506,13 @@ public void minorTableNoBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -510,20 +520,20 @@ public void minorTableNoBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } @@ -549,13 +559,13 @@ public void majorTableWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -563,11 +573,11 @@ public void majorTableWithBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -625,14 +635,14 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/ int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0); - Assert.assertEquals(numFilesExpected, stat.length); + assertEquals(numFilesExpected, stat.length); // Find the new delta file and make sure it has the right contents List matchesNotFound = new ArrayList<>(numFilesExpected); @@ -687,13 +697,13 @@ public void majorPartitionWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -701,11 +711,11 @@ public void majorPartitionWithBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -730,13 +740,13 @@ public void majorTableNoBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should now be 3 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(3, stat.length); + assertEquals(3, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -744,11 +754,11 @@ public void majorTableNoBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000004_v0000005")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -785,8 +795,8 @@ public void majorTableLegacy() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); @@ -799,11 +809,11 @@ public void majorTableLegacy() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -829,8 +839,8 @@ public void minorTableLegacy() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); @@ -842,7 +852,7 @@ public void minorTableLegacy() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); } else { @@ -873,13 +883,13 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -887,7 +897,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { if (stat[i].getPath().getName().equals("base_0000026_v0000028")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); + assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); // Bucket 0 should be small and bucket 1 should be large, make sure that's the case @@ -926,21 +936,21 @@ public void majorWithOpenInMiddle() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName()); - Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + assertEquals("base_0000022_v0000028", stat[0].getPath().getName()); + assertEquals("base_20", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Test @@ -962,21 +972,21 @@ public void majorWithAborted() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName()); - Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + assertEquals("base_0000027_v0000028", stat[0].getPath().getName()); + assertEquals("base_20", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Override boolean useHive130DeltaDirName() { @@ -1008,10 +1018,10 @@ public void testWorkerAndInitiatorVersion() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion()); - Assert.assertEquals(workerVersion, compacts.get(0).getWorkerVersion()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion()); + assertEquals(workerVersion, compacts.get(0).getWorkerVersion()); } @@ -1072,7 +1082,7 @@ public void droppedTable() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + assertEquals(0, compacts.size()); } @Test @@ -1097,7 +1107,7 @@ public void droppedPartition() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + assertEquals(0, compacts.size()); } @Test @@ -1148,8 +1158,8 @@ public void insertOnlyDisabled() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("failed", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("failed", compacts.get(0).getState()); } @@ -1162,21 +1172,21 @@ private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type // Compaction should not have run on a single delta file FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(1, stat.length); - Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName()); + assertEquals(1, stat.length); + assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName()); // State should not be "ready for cleaning" because we skip cleaning List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals(compactionNum + 1, compacts.size()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(compactionNum).getState()); + assertEquals(compactionNum + 1, compacts.size()); + assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(compactionNum).getState()); // assert transaction with txnId=1 is still aborted after cleaner is run startCleaner(); List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); - Assert.assertEquals(1, openTxns.get(0).getId()); - Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); + assertEquals(1, openTxns.get(0).getId()); + assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); } // With high timeout, but fast run we should finish without a problem @@ -1197,6 +1207,92 @@ public void testTimeoutWithoutInterrupt() throws Exception { runTimeoutTest(1, true, true); } + @Test + public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception { + prepareTableForCompactionFailureTests("default", "campcnb"); + runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED); + + List compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + assertEquals(TxnStore.WORKING_RESPONSE, compacts.get(0).getState()); + + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + assertEquals(compacts.get(0).getTxnId(), openTxns.get(0).getId()); + assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); + } + + @Test + public void testExceptionWhenTxnCommit() throws Exception { + prepareTableForCompactionFailureTests("default", "campcnb"); + runWorkerWithException(MethodToFail.COMMIT_TXN); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("Simulated failure in commitTxn", compaction.getErrorMessage()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + TxnInfo txn = openTxns.get(0); + assertEquals(compaction.getTxnId(), txn.getId()); + assertEquals(TxnState.ABORTED, txn.getState()); + } + + @Test + public void testExceptionWhenMarkCompacted() throws Exception { + prepareTableForCompactionFailureTests("default", "campcnb"); + runWorkerWithException(MethodToFail.MARK_COMPACTED); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("Simulated failure in markCompacted", compaction.getErrorMessage()); + assertNotNull(compaction.getNextTxnId()); + + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + private void runWorkerWithException(MethodToFail... methodToFail) throws Exception { + IMetaStoreClient spyMsc = Mockito.spy(ms); + for (MethodToFail method: methodToFail) { + switch (method) { + case MARK_FAILED -> doThrow(new TTransportException("Simulated failure in markFailed")).when(spyMsc).markFailed(any()); + case COMMIT_TXN -> doThrow(new TException("Simulated failure in commitTxn")).when(spyMsc).commitTxn(anyLong()); + case MARK_COMPACTED -> doThrow(new TTransportException("Simulated failure in markCompacted")).when(spyMsc).markCompacted(any()); + } + } + + TestTxnDbUtil.setConfValues(conf); + Worker worker = Mockito.spy(new Worker()); + worker.setConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + worker.init(stop); + worker.msc = spyMsc; + worker.setName("testworker"); + CompactorThread ct = worker; + ct.run(); + } + + private void prepareTableForCompactionFailureTests(String dbName, String tableName) throws Exception { + Table t = newTable(dbName, tableName, false); + addBaseFile(t, null, 1L, 3, 2); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tableName, 4, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + } + + enum MethodToFail { + MARK_COMPACTED, + MARK_FAILED, + COMMIT_TXN; + } + private void runTimeoutTest(long timeout, boolean runForever, boolean swallowInterrupt) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); HiveConf timeoutConf = new HiveConf(conf); From a7774913abce9588567c9549d7f2a48ff9fbf73a Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Thu, 28 May 2026 12:26:01 +0200 Subject: [PATCH 2/3] HIVE-29571: ACID Compaction: Marking the compaction as compacted should happen after its txn got committed - made some fixes and added more tests --- .../service/AcidCompactionService.java | 50 ++--- .../hive/ql/txn/compactor/TestWorker.java | 193 +++++++++++++----- 2 files changed, 164 insertions(+), 79 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index c9be4f823314..ccb8260fa6fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -212,18 +212,18 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { // Don't start compaction or cleaning if not necessary if (isDynPartAbort(table, ci)) { - compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); return false; } dir = getAcidStateForWorker(ci, sd, tblValidWriteIds); if (!isEnoughToCompact(ci, dir, sd)) { if (needsCleaning(dir, sd)) { - compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); } else { // do nothing ci.errorMessage = "None of the compaction thresholds met, compaction request is refused!"; LOG.debug(ci.errorMessage + " Compaction info: {}", ci); - compactionTxn.onCommit(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); + compactionTxn.markForCommit(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); } return false; @@ -232,7 +232,7 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { ci.errorMessage = "Query based Minor compaction is not possible for full acid tables having raw format " + "(non-acid) data in them."; LOG.error(ci.errorMessage + " Compaction info: {}", ci); - compactionTxn.onAbort(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); + compactionTxn.markForAbort(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); return false; } CompactorUtil.checkInterrupt(CLASS_NAME); @@ -257,7 +257,7 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { LOG.info("Completed " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + compactionTxn + ", marking as compacted."); - compactionTxn.onCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, ci.partName, ci.type, dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), conf, msc); @@ -342,8 +342,10 @@ class CompactionTxn implements AutoCloseable { private TxnStatus status = TxnStatus.UNKNOWN; - private ThrowingRunnable onCommitAction; - private ThrowingRunnable onAbortAction; + private ThrowingRunnable onCommitSuccess; + private ThrowingRunnable onAbortSuccess; + + private boolean rollbackOnly = true; /** * Try to open a new txn. @@ -374,14 +376,14 @@ private LockRequest createLockRequest(CompactionInfo ci) { return CompactorUtil.createLockRequest(conf, ci, txnId, lockAndOpType.getKey(), lockAndOpType.getValue()); } - /** - * Mark compaction as successful. This means the txn will be committed; otherwise it will be aborted. - */ - void onCommit(ThrowingRunnable action) { - this.onCommitAction = action; + void markForCommit(ThrowingRunnable action) { + this.rollbackOnly = false; + this.onCommitSuccess = action; } - void onAbort(ThrowingRunnable action) { - this.onAbortAction = action; + + void markForAbort(ThrowingRunnable action) { + this.rollbackOnly = true; + this.onAbortSuccess = action; } /** @@ -396,18 +398,16 @@ public void close() throws Exception { //the transaction is about to close, we can stop heartbeating regardless of it's state CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId); } finally { - if (onCommitAction != null) { - try { - commit(); - } catch (Exception e) { - abort(); - if (onAbortAction != null) onAbortAction.run(); - throw e; - } - onCommitAction.run(); - } else { + if (rollbackOnly) { abort(); - if (onAbortAction != null) onAbortAction.run(); + if (onAbortSuccess != null) { + onAbortSuccess.run(); + } + return; + } + commit(); + if (onCommitSuccess != null) { + onCommitSuccess.run(); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index bc3ce731517c..7ed0688e0fa8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest; @@ -40,9 +40,7 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.utils.StringableMap; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -78,6 +76,7 @@ import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PATTERN; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; @@ -134,9 +133,9 @@ public void stringableMap() throws Exception { else Assert.fail("Unexpected value " + e.getKey()); } assertEquals(3, saw.size()); - Assert.assertTrue(saw.get("mary")); - Assert.assertTrue(saw.get("bert")); - Assert.assertTrue(saw.get(null)); + assertTrue(saw.get("mary")); + assertTrue(saw.get("bert")); + assertTrue(saw.get(null)); } @Test @@ -152,7 +151,7 @@ public void stringableList() throws Exception { ls.add(new Path("/tmp")); ls.add(new Path("/usr")); s = ls.toString(); - Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, + assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s)); ls = new MRCompactor.StringableList(s); assertEquals(2, ls.size()); @@ -162,8 +161,8 @@ public void stringableList() throws Exception { else if ("/usr".equals(p.toString())) sawUsr = true; else Assert.fail("Unexpected path " + p.toString()); } - Assert.assertTrue(sawTmp); - Assert.assertTrue(sawUsr); + assertTrue(sawTmp); + assertTrue(sawUsr); } @Test @@ -337,8 +336,8 @@ public void minorTableWithBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } @@ -346,8 +345,8 @@ public void minorTableWithBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } @@ -355,7 +354,7 @@ public void minorTableWithBase() throws Exception { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } /** @@ -469,8 +468,8 @@ public void minorPartitionWithBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } @@ -478,15 +477,15 @@ public void minorPartitionWithBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -521,8 +520,8 @@ public void minorTableNoBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } @@ -530,15 +529,15 @@ public void minorTableNoBase() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -574,15 +573,15 @@ public void majorTableWithBase() throws Exception { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(624L, buckets[0].getLen()); assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -675,7 +674,7 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { if(matchesNotFound.size() == 0) { return; } - Assert.assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), false); + assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), false); } @Test public void majorPartitionWithBase() throws Exception { @@ -712,15 +711,15 @@ public void majorPartitionWithBase() throws Exception { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(624L, buckets[0].getLen()); assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -755,15 +754,15 @@ public void majorTableNoBase() throws Exception { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(104L, buckets[0].getLen()); assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } private static String toString(FileStatus[] stat) { @@ -810,15 +809,15 @@ public void majorTableLegacy() throws Exception { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); assertEquals(624L, buckets[0].getLen()); assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -853,13 +852,13 @@ public void minorTableLegacy() throws Exception { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -898,10 +897,10 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); // Bucket 0 should be small and bucket 1 should be large, make sure that's the case - Assert.assertTrue( + assertTrue( ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen() && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L == buckets[1] .getLen()) @@ -914,7 +913,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -1209,22 +1208,23 @@ public void testTimeoutWithoutInterrupt() throws Exception { @Test public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception { - prepareTableForCompactionFailureTests("default", "campcnb"); + prepareTableAndCompaction("default", "tableForCommitAndMarkFailedError"); runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED); List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); assertEquals(TxnStore.WORKING_RESPONSE, compacts.get(0).getState()); - List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); assertEquals(1, openTxns.size()); - assertEquals(compacts.get(0).getTxnId(), openTxns.get(0).getId()); - assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); + TxnInfo txn = openTxns.get(0); + assertEquals(compacts.get(0).getTxnId(), txn.getId()); + assertEquals(TxnState.OPEN, txn.getState()); + txnHandler.abortTxn(new AbortTxnRequest(txn.getId())); } @Test public void testExceptionWhenTxnCommit() throws Exception { - prepareTableForCompactionFailureTests("default", "campcnb"); + prepareTableAndCompaction("default", "tableForCommitError"); runWorkerWithException(MethodToFail.COMMIT_TXN); List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); @@ -1235,24 +1235,109 @@ public void testExceptionWhenTxnCommit() throws Exception { assertEquals(1, openTxns.size()); TxnInfo txn = openTxns.get(0); assertEquals(compaction.getTxnId(), txn.getId()); - assertEquals(TxnState.ABORTED, txn.getState()); + assertEquals(TxnState.OPEN, txn.getState()); + txnHandler.abortTxn(new AbortTxnRequest(txn.getId())); } @Test public void testExceptionWhenMarkCompacted() throws Exception { - prepareTableForCompactionFailureTests("default", "campcnb"); + prepareTableAndCompaction("default", "tableForMarkCompactedError"); runWorkerWithException(MethodToFail.MARK_COMPACTED); List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); ShowCompactResponseElement compaction = compacts.get(0); assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); assertEquals("Simulated failure in markCompacted", compaction.getErrorMessage()); - assertNotNull(compaction.getNextTxnId()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + @Test + public void testExceptionDuringCompact() throws Exception { + prepareTableAndCompaction("default", "tableForCompactError"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_COMPACTION, true); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("HIVE_TEST_MODE_FAIL_COMPACTION=true", compaction.getErrorMessage()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + TxnInfo txn = openTxns.get(0); + assertEquals(compaction.getTxnId(), txn.getId()); + assertEquals(TxnState.ABORTED, txn.getState()); + } + + @Test + public void testWorkerIfIsDynPartAbort() throws Exception { + String dbName = "default"; + String tableName = "tableWithPartition"; + Table t = newTable(dbName, tableName, true); + addBaseFile(t, null, 1L, 3, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tableName, 4, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + rqst.setPartitionname(null); + txnHandler.compact(rqst); + startWorker(); + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState()); + assertTrue(compaction.getNextTxnId() > 0L); List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); assertEquals(0, openTxns.size()); } + @Test + public void testWorkerNotEnoughToCompact() throws Exception { + String dbName = "default"; + String tableName = "tableWithNoDelta"; + Table t = newTable(dbName, tableName, false); + addBaseFile(t, null, 1L, 3, 1); + burnThroughTransactions(dbName, tableName, 1, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.REFUSED_RESPONSE, compaction.getState()); + assertTrue(compaction.getErrorMessage().contains("None of the compaction thresholds met, compaction request is refused!")); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + @Test + public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception { + String dbName = "default"; + String tableName = "tableNeedsCleaning"; + Table t = newTable(dbName, tableName, false); + addDeltaFile(t, null, 1L, 1L, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addBaseFile(t, null, 4L, 3, 6); + burnThroughTransactions(dbName, tableName, 6, null, new HashSet(Arrays.asList(1L, 2L))); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState()); + assertTrue(compaction.getNextTxnId() > 0L); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(2, openTxns.size()); + assertEquals(1L, openTxns.get(0).getId()); + assertEquals(2L, openTxns.get(1).getId()); + } + private void runWorkerWithException(MethodToFail... methodToFail) throws Exception { IMetaStoreClient spyMsc = Mockito.spy(ms); for (MethodToFail method: methodToFail) { @@ -1275,9 +1360,9 @@ private void runWorkerWithException(MethodToFail... methodToFail) throws Excepti ct.run(); } - private void prepareTableForCompactionFailureTests(String dbName, String tableName) throws Exception { + private void prepareTableAndCompaction(String dbName, String tableName) throws Exception { Table t = newTable(dbName, tableName, false); - addBaseFile(t, null, 1L, 3, 2); + addBaseFile(t, null, 1L, 3, 1); addDeltaFile(t, null, 2L, 2L, 1); addDeltaFile(t, null, 3L, 3L, 1); addDeltaFile(t, null, 4L, 4L, 1); From 03ae9f4084bd0ff71d2052e09b96a325ab369cb8 Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Thu, 28 May 2026 20:57:47 +0200 Subject: [PATCH 3/3] Addressing review comment and trying to fix tests --- .../service/AcidCompactionService.java | 1 - .../hive/ql/txn/compactor/TestWorker.java | 46 +++++++++++-------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index ccb8260fa6fe..cf6e4c02e4cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -382,7 +382,6 @@ void markForCommit(ThrowingRunnable action) { } void markForAbort(ThrowingRunnable action) { - this.rollbackOnly = true; this.onAbortSuccess = action; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 7ed0688e0fa8..f97237353f7c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -1208,7 +1208,7 @@ public void testTimeoutWithoutInterrupt() throws Exception { @Test public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception { - prepareTableAndCompaction("default", "tableForCommitAndMarkFailedError"); + prepareTableAndCompaction("default", "tbforcomperror"); runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED); List compacts = @@ -1224,7 +1224,7 @@ public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception { @Test public void testExceptionWhenTxnCommit() throws Exception { - prepareTableAndCompaction("default", "tableForCommitError"); + prepareTableAndCompaction("default", "tbforcomperror"); runWorkerWithException(MethodToFail.COMMIT_TXN); List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); @@ -1241,7 +1241,7 @@ public void testExceptionWhenTxnCommit() throws Exception { @Test public void testExceptionWhenMarkCompacted() throws Exception { - prepareTableAndCompaction("default", "tableForMarkCompactedError"); + prepareTableAndCompaction("default", "tbforcomperror"); runWorkerWithException(MethodToFail.MARK_COMPACTED); List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); @@ -1254,7 +1254,7 @@ public void testExceptionWhenMarkCompacted() throws Exception { @Test public void testExceptionDuringCompact() throws Exception { - prepareTableAndCompaction("default", "tableForCompactError"); + prepareTableAndCompaction("default", "tbforcomperror"); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_COMPACTION, true); startWorker(); @@ -1273,7 +1273,7 @@ public void testExceptionDuringCompact() throws Exception { @Test public void testWorkerIfIsDynPartAbort() throws Exception { String dbName = "default"; - String tableName = "tableWithPartition"; + String tableName = "tbforcomperror"; Table t = newTable(dbName, tableName, true); addBaseFile(t, null, 1L, 3, 1); addDeltaFile(t, null, 2L, 2L, 1); @@ -1297,7 +1297,7 @@ public void testWorkerIfIsDynPartAbort() throws Exception { @Test public void testWorkerNotEnoughToCompact() throws Exception { String dbName = "default"; - String tableName = "tableWithNoDelta"; + String tableName = "tbforcomperror"; Table t = newTable(dbName, tableName, false); addBaseFile(t, null, 1L, 3, 1); burnThroughTransactions(dbName, tableName, 1, null, null); @@ -1317,12 +1317,14 @@ public void testWorkerNotEnoughToCompact() throws Exception { @Test public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception { String dbName = "default"; - String tableName = "tableNeedsCleaning"; + String tableName = "tbforcomperror"; Table t = newTable(dbName, tableName, false); - addDeltaFile(t, null, 1L, 1L, 1); - addDeltaFile(t, null, 2L, 2L, 1); - addBaseFile(t, null, 4L, 3, 6); - burnThroughTransactions(dbName, tableName, 6, null, new HashSet(Arrays.asList(1L, 2L))); + addDeltaFile(t, null, 20L, 20L, 10); + addDeltaFile(t, null, 21L, 21L, 10); + addDeltaFile(t, null, 22L, 22L, 10); + addDeltaFile(t, null, 23L, 23L, 10); + addDeltaFile(t, null, 24L, 24L, 10); + burnThroughTransactions(dbName, tableName, 25, null, new HashSet(Arrays.asList(20L, 21L, 22L, 23L, 24L))); // trigger compaction CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); txnHandler.compact(rqst); @@ -1333,9 +1335,13 @@ public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception { assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState()); assertTrue(compaction.getNextTxnId() > 0L); List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); - assertEquals(2, openTxns.size()); - assertEquals(1L, openTxns.get(0).getId()); - assertEquals(2L, openTxns.get(1).getId()); + assertEquals(5, openTxns.size()); + assertEquals(20L, openTxns.get(0).getId()); + assertEquals(21L, openTxns.get(1).getId()); + assertEquals(22L, openTxns.get(2).getId()); + assertEquals(23L, openTxns.get(3).getId()); + assertEquals(24L, openTxns.get(4).getId()); + } private void runWorkerWithException(MethodToFail... methodToFail) throws Exception { @@ -1362,11 +1368,13 @@ private void runWorkerWithException(MethodToFail... methodToFail) throws Excepti private void prepareTableAndCompaction(String dbName, String tableName) throws Exception { Table t = newTable(dbName, tableName, false); - addBaseFile(t, null, 1L, 3, 1); - addDeltaFile(t, null, 2L, 2L, 1); - addDeltaFile(t, null, 3L, 3L, 1); - addDeltaFile(t, null, 4L, 4L, 1); - burnThroughTransactions(dbName, tableName, 4, null, null); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 23L, 2); + addDeltaFile(t, null, 23L, 23L, 3); + addDeltaFile(t, null, 24L, 24L, 2); + addDeltaFile(t, null, 25L, 25L, 3); + addDeltaFile(t, null, 26L, 26L, 3); + burnThroughTransactions(dbName, tableName, 27, null, null); // trigger compaction CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); txnHandler.compact(rqst);