Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,83 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws
dataProvider.dropTable(tableName);
}

@Test
public void testMajorCompactionAfterTwoMergeStatements() throws Exception {
String dbName = "default";
String tableName = "comp_and_merge_test";
TestDataProvider dataProvider = new TestDataProvider();
// Create a non bucketed test table and insert some initial data
executeStatementOnDriver(
"CREATE TABLE " + tableName + "(id int,value string) STORED AS ORC TBLPROPERTIES ('transactional'='true')",
driver);
executeStatementOnDriver("insert into " + tableName
+ " values(1, 'value_1'),(2, 'value_2'),(3, 'value_3'),(4, 'value_4'),(5, 'value_5')", driver);

// Find the location of the table
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
Table table = msClient.getTable(dbName, tableName);
FileSystem fs = FileSystem.get(conf);

runMergeStatement(tableName,
Arrays.asList("1, 'newvalue_1'", "2, 'newvalue_2'", "3, 'newvalue_3'", "6, 'value_6'", "7, 'value_7'"));
runMergeStatement(tableName, Arrays.asList("1, 'newestvalue_1'", "2, 'newestvalue_2'", "5, 'newestvalue_5'",
"7, 'newestvalue_7'", "8, 'value_8'"));

List<String> expectedData = dataProvider.getAllData(tableName);

// Run a query-based MAJOR compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
// Clean up resources
CompactorTestUtil.runCleaner(conf);
// Only 1 compaction should be in the response queue with succeeded state
verifySuccessfulCompaction(1);

// Verify delta directories after compaction
List<String> actualDeltasAfterComp =
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null);
Assert.assertEquals("Base directory does not match after compaction",
Collections.singletonList("base_0000003_v0000014"), actualDeltasAfterComp);

// Verify bucket files in delta dirs
List<String> expectedBucketFiles = Collections.singletonList("bucket_00000");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));

// Verify contents of bucket files.
List<String> expectedRsBucket0 = Arrays.asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t4\tvalue_4",
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t6\tvalue_6",
"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":2}\t3\tnewvalue_3",
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t8\tvalue_8",
"{\"writeid\":3,\"bucketid\":536870913,\"rowid\":0}\t5\tnewestvalue_5",
"{\"writeid\":3,\"bucketid\":536870913,\"rowid\":1}\t7\tnewestvalue_7",
"{\"writeid\":3,\"bucketid\":536870913,\"rowid\":2}\t1\tnewestvalue_1",
"{\"writeid\":3,\"bucketid\":536870913,\"rowid\":3}\t2\tnewestvalue_2");
List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tableName, driver);
Assert.assertEquals(expectedRsBucket0, rsBucket0);
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
// Clean up
dataProvider.dropTable(tableName);
msClient.close();
}

private void runMergeStatement(String tableName, List<String> values) throws Exception {
executeStatementOnDriver("DROP TABLE IF EXISTS merge_source", driver);
executeStatementOnDriver("CREATE TABLE merge_source(id int,value string) STORED AS ORC", driver);
StringBuilder sb = new StringBuilder();
for (String value : values) {
sb.append("(");
sb.append(value);
sb.append("),");
}
executeStatementOnDriver("INSERT INTO merge_source VALUES " + sb.toString().substring(0, sb.length() - 1), driver);
executeStatementOnDriver("MERGE INTO " + tableName
+ " AS T USING merge_source AS S ON T.ID = S.ID WHEN MATCHED AND (T.value != S.value AND S.value IS NOT NULL) THEN UPDATE SET value = S.value WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.value)",
driver);
executeStatementOnDriver("DROP TABLE merge_source", driver);
}

/**
* Make sure db is specified in compaction queries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (previousWriteIdRowId != null) {
// Verify sort order for this new row
if (current.compareTo(previousWriteIdRowId) <= 0) {
throw new HiveException("Wrong sort order of Acid rows detected for the rows: " + previousWriteIdRowId + " and "
+ current);
throw new HiveException("Wrong sort order of Acid rows detected for the rows: "
+ previousWriteIdRowId.toString() + " and " + current.toString());
}
}
previousWriteIdRowId = current;
Expand All @@ -104,16 +104,29 @@ final static class WriteIdRowId implements Comparable<WriteIdRowId> {

@Override
public int compareTo(WriteIdRowId other) {
if (this.bucketProperty != other.bucketProperty) {
return this.bucketProperty < other.bucketProperty ? -1 : 1;
}
if (this.writeId != other.writeId) {
return this.writeId < other.writeId ? -1 : 1;
}
if (this.bucketProperty != other.bucketProperty) {
return this.bucketProperty < other.bucketProperty ? -1 : 1;
}
if (this.rowId != other.rowId) {
return this.rowId < other.rowId ? -1 : 1;
}
return 0;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[writeId=");
sb.append(writeId);
sb.append(", bucketProperty=");
sb.append(bucketProperty);
sb.append(", rowId=");
sb.append(rowId);
sb.append("]");
return sb.toString();
}
}
}