Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3369,7 +3369,7 @@ public void testMinorCompactionAfterMajorWithMerge() throws Exception {
testCompactionWithMerge(CompactionType.MINOR, false, false, null,
Collections.singletonList("bucket_00000"),
Arrays.asList("delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
Collections.singletonList("delta_0000001_0000006_v0000013"), false, true, false);
Collections.singletonList("delta_0000004_0000006_v0000013"), false, true, false);
}

@Test
Expand Down Expand Up @@ -3709,4 +3709,97 @@ public void testMajorCompactionUpdateMissingColumnStatsOfPartition() throws Exce

Assert.assertEquals(3, StatsSetupConst.getColumnsHavingStats(partition.getParameters()).size());
}

@Test
public void testMinorWithAbortedAndOpenTnx() throws Exception {
String dbName = "default";
String tableName = "testAbortedAndOpenTnxTbl";
// Create test table
TestDataProvider testDataProvider = new TestDataProvider();
testDataProvider.createFullAcidTable(tableName, false, false);
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
FileSystem fs = FileSystem.get(conf);

// Abort the first insert transaction
driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true);
testDataProvider.insertOnlyTestData(tableName, 1);
driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, false);
// Do threee successful insert to create 3 deltas
testDataProvider.insertOnlyTestData(tableName, 3);

// Start an insert and leave it open when the compaction is running
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName)
.withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer)
.withTransactionBatchSize(1).connect();
connection.beginTransaction();
connection.write("4,4".getBytes());
// Run query-based MINOR compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
// Finish the open transaction
connection.commitTransaction();
connection.close();
List<String> expectedData = testDataProvider.getAllData(tableName, false);
// Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction.
CompactorTestUtil.runCleaner(conf);

verifySuccessfulCompaction(1);
List<String> resultData = testDataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, resultData);
List<String> deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
Assert.assertEquals(2, deltas.size());
Assert.assertEquals("Delta directory names are not matching after compaction",
Arrays.asList("delta_0000002_0000004_v0000007", "delta_0000005_0000005"), deltas);
for (String delta: deltas) {
// Check if none of the delta directories are empty
List<String> files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta);
Assert.assertFalse(files.isEmpty());
}
}

@Test
public void testMinorWithOpenTnx() throws Exception {
String dbName = "default";
String tableName = "testAbortedAndOpenTnxTbl";
// Create test table
TestDataProvider testDataProvider = new TestDataProvider();
testDataProvider.createFullAcidTable(tableName, false, false);
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
FileSystem fs = FileSystem.get(conf);

// Do threee successful insert to create 3 deltas
testDataProvider.insertOnlyTestData(tableName, 3);

// Start an insert and leave it open when the compaction is running
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName)
.withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer)
.withTransactionBatchSize(1).connect();
connection.beginTransaction();
connection.write("4,4".getBytes());
// Run query-based MINOR compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
// Finish the open transaction
connection.commitTransaction();
connection.close();
List<String> expectedData = testDataProvider.getAllData(tableName, false);
// Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction.
CompactorTestUtil.runCleaner(conf);

verifySuccessfulCompaction(1);
List<String> resultData = testDataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, resultData);
List<String> deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
Assert.assertEquals(2, deltas.size());
Assert.assertEquals("Delta directory names are not matching after compaction",
Arrays.asList("delta_0000001_0000003_v0000006", "delta_0000004_0000004"), deltas);
for (String delta: deltas) {
// Check if none of the delta directories are empty
List<String> files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta);
Assert.assertFalse(files.isEmpty());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.ArrayList;
Expand All @@ -39,6 +41,9 @@
import java.util.stream.Collectors;

abstract class CompactionQueryBuilder {

private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());

// required fields, set in constructor
protected Operation operation;
protected String resultTableName;
Expand Down Expand Up @@ -317,15 +322,20 @@ protected void addTblProperties(StringBuilder query, Map<String, String> tblProp

private void buildAddClauseForAlter(StringBuilder query) {
if (validWriteIdList == null || dir == null) {
LOG.warn("There is no delta to be added as partition to the temp external table used by the minor compaction. " +
"This may result an empty compaction directory.");
query.setLength(0);
return; // avoid NPEs, don't throw an exception but return an empty query
}
long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
long highWatermark = validWriteIdList.getHighWatermark();
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark)
.collect(Collectors.toList());
if (deltas.isEmpty()) {
String warnMsg = String.format("No %s delta is found below the highWaterMark %s to be added as partition " +
"to the temp external table, used by the minor compaction. This may result an empty compaction directory.",
isDeleteDelta ? "delete" : "", highWatermark);
LOG.warn(warnMsg);
query.setLength(0); // no alter query needed; clear StringBuilder
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public boolean run(CompactorContext context) throws IOException {

String tmpTableName = getTempTableName(table);
Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
conf, true, false, false, null);
conf, true, false, null);

List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString());
List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException,

if (isMergeCompaction(hiveConf, dir, storageDescriptor)) {
// Only inserts happened, it is much more performant to merge the files than running a query
Path outputDirPath = getOutputDirPath(hiveConf, writeIds,
compactionInfo.isMajorCompaction(), storageDescriptor);
Path outputDirPath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
hiveConf, compactionInfo.isMajorCompaction(), false, dir);
try {
return mergeFiles(hiveConf, compactionInfo.isMajorCompaction(),
dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters()));
Expand Down Expand Up @@ -161,27 +161,6 @@ private Map<Integer, List<Reader>> getBucketFiles(HiveConf conf, Path dirPath, b
return bucketIdToBucketFilePath;
}

/**
* Generate output path for compaction. This can be used to generate delta or base directories.
* @param conf hive configuration, must be non-null
* @param writeIds list of valid write IDs
* @param isBaseDir if base directory path should be generated
* @param sd the resolved storadge descriptor
* @return output path, always non-null
*/
private Path getOutputDirPath(HiveConf conf, ValidWriteIdList writeIds, boolean isBaseDir,
StorageDescriptor sd) {
long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
long highWatermark = writeIds.getHighWatermark();
long compactorTxnId = Compactor.getCompactorTxnId(conf);
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.writingBase(isBaseDir).writingDeleteDelta(false)
.isCompressed(false)
.minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark)
.statementId(-1).visibilityTxnId(compactorTxnId);
return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), options);
}

/**
* Merge files from base/delta directories. If the directories contains multiple buckets, the result will also
* contain the same amount.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public boolean run(CompactorContext context) throws IOException {
table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();

Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
writeIds, conf, false, false, false, dir);
writeIds, conf, false, false, dir);
Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
writeIds, conf, false, true, false, dir);
writeIds, conf, false, true, dir);

List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds,
resultDeltaDir, resultDeleteDeltaDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public boolean run(CompactorContext context) throws IOException {
// "insert overwrite directory" command if there were no bucketing or list bucketing.
String tmpTableName = getTempTableName(table);
Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir(
storageDescriptor, writeIds, driverConf, true, true, false, null);
storageDescriptor, writeIds, driverConf, true, true, null);

List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor,
resultBaseDir.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean run(CompactorContext context) throws IOException {
String tmpTableName = getTempTableName(table);
String resultTmpTableName = tmpTableName + "_result";
Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf,
false, false, false, dir);
false, false, dir);

List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir,
writeIds, resultDeltaDir);
Expand All @@ -79,8 +79,9 @@ protected HiveConf setUpDriverSession(HiveConf hiveConf) {
/**
* Clean up the empty table dir of 'tmpTableName'.
*/
@Override protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException {
Util.cleanupEmptyTableDir(conf, tmpTableName);
@Override
protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException {
Util.cleanupEmptyTableDir(conf, tmpTableName + "_result");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
Expand All @@ -43,8 +42,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import static org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.overrideConfProps;

Expand Down Expand Up @@ -188,23 +185,19 @@ public static class Util {
* @param conf HiveConf
* @param writingBase if true, we are creating a base directory, otherwise a delta
* @param createDeleteDelta if true, the delta dir we are creating is a delete delta
* @param bucket0 whether to specify 0 as the bucketid
* @param directory AcidUtils.Directory - only required for minor compaction result (delta) dirs
*
* @return Path of new base/delta/delete delta directory
*/
public static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf,
boolean writingBase, boolean createDeleteDelta, boolean bucket0, AcidDirectory directory) {
boolean writingBase, boolean createDeleteDelta, AcidDirectory directory) {
long minWriteID = writingBase ? 1 : getMinWriteID(directory);
long highWatermark = writeIds.getHighWatermark();
long compactorTxnId = Compactor.getCompactorTxnId(conf);
AcidOutputFormat.Options options =
new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minWriteID)
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId)
.writingBase(writingBase).writingDeleteDelta(createDeleteDelta);
if (bucket0) {
options = options.bucket(0);
}
Path location = new Path(sd.getLocation());
return AcidUtils.baseOrDeltaSubdirPath(location, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean run(CompactorContext context)

String tmpTableName = getTempTableName(table);
Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
conf, true, false, false, null);
conf, true, false, null);
int numBuckets = context.getCompactionInfo().numberOfBuckets;
if (numBuckets <= 0) {
//TODO: This is quite expensive, a better way should be found to get the number of buckets for an implicitly bucketed table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private AcidDirectory getAcidStateForWorker(CompactionInfo ci, StorageDescriptor
public void cleanupResultDirs(CompactionInfo ci) {
// result directory for compactor to write new files
Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf,
ci.type == CompactionType.MAJOR, false, false, dir);
ci.type == CompactionType.MAJOR, false, dir);
LOG.info("Deleting result directories created by the compactor:\n");
try {
FileSystem fs = resultDir.getFileSystem(conf);
Expand All @@ -114,7 +114,7 @@ public void cleanupResultDirs(CompactionInfo ci) {

if (ci.type == CompactionType.MINOR) {
Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf,
false, true, false, dir);
false, true, dir);

LOG.info(deleteDeltaDir.toString());
fs.delete(deleteDeltaDir, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testAlter() {
queryBuilder.setIsDeleteDelta(true);
String query = queryBuilder.build();
String expectedQuery =
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
Assert.assertEquals(expectedQuery, query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testAlter() {
queryBuilder.setIsDeleteDelta(true);
String query = queryBuilder.build();
String expectedQuery =
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
Assert.assertEquals(expectedQuery, query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void testAlterMajorCompaction() {
queryBuilder.setIsDeleteDelta(true);
String query = queryBuilder.build();
String expectedQuery =
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
Assert.assertEquals(expectedQuery, query);
}

Expand All @@ -386,7 +386,7 @@ public void testAlterMinorCompaction() {
queryBuilder.setIsDeleteDelta(true);
String query = queryBuilder.build();
String expectedQuery =
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
Assert.assertEquals(expectedQuery, query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testAlter() {
queryBuilder.setIsDeleteDelta(true);
String query = queryBuilder.build();
String expectedQuery =
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
"ALTER table comp_test_result_table add partition (file_name='test_delta_1') location '/compaction/test/table/test_delta_1' partition (file_name='test_delta_2') location '/compaction/test/table/test_delta_2' partition (file_name='test_delta_3') location '/compaction/test/table/test_delta_3' ";
Assert.assertEquals(expectedQuery, query);
}

Expand Down