Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.integ;

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

Expand All @@ -34,20 +35,23 @@
*/
public class ITTestHoodieSanity extends ITTestBase {

private static final String HDFS_BASE_URL = "hdfs://namenode";
private static final String HDFS_STREAMING_SOURCE = HDFS_BASE_URL + "/streaming/source/";
private static final String HDFS_STREAMING_CKPT = HDFS_BASE_URL + "/streaming/ckpt/";

enum PartitionType {
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
}

@ParameterizedTest
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}
Expand All @@ -59,9 +63,9 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) thr
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}
Expand All @@ -73,21 +77,20 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) thr
* console.
*/
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
String hiveTableName = "docker_hoodie_non_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@ParameterizedTest
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}
Expand All @@ -100,7 +103,7 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) thr
* in hive console.
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
Expand All @@ -113,7 +116,7 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) thr
* console.
*/
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_mor_test";
String hiveTableName = "docker_hoodie_non_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}
Expand All @@ -127,7 +130,7 @@ public void testRunHoodieJavaApp(String command, String hiveTableName, String ta
throws Exception {

String hdfsPath = "/" + hiveTableName;
String hdfsUrl = "hdfs://namenode" + hdfsPath;
String hdfsUrl = HDFS_BASE_URL + hdfsPath;

// Drop Table if it exists
try {
Expand Down Expand Up @@ -155,6 +158,13 @@ public void testRunHoodieJavaApp(String command, String hiveTableName, String ta
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
}

if (command.equals(HOODIE_JAVA_STREAMING_APP)) {
String streamingSourcePath = HDFS_STREAMING_SOURCE + hiveTableName;
String streamingCkptPath = HDFS_STREAMING_CKPT + hiveTableName;
cmd = cmd + " --streaming-source-path " + streamingSourcePath
+ " --streaming-checkpointing-path " + streamingCkptPath;
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);

String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, Strin
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION));
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
} else {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
Expand Down
4 changes: 4 additions & 0 deletions hudi-spark/src/test/java/HoodieJavaApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/

import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
Expand Down Expand Up @@ -120,6 +121,9 @@ public void run() throws Exception {
dataGen = new HoodieTestDataGenerator();
}

// Explicitly clear up the hoodie table path if it exists.
fs.delete(new Path(tablePath), true);

/**
* Commit with only inserts
*/
Expand Down
4 changes: 3 additions & 1 deletion hudi-spark/src/test/java/HoodieJavaStreamingApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, in
public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath,
int initialCommits, int expRecords,
Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
inputDF1.write().mode(SaveMode.Append).json(srcPath);
// Ensure, we always write only one file. This is very important to ensure a single batch is reliably read
// atomically by one iteration of spark streaming.
inputDF1.coalesce(1).write().mode(SaveMode.Append).json(srcPath);

int numExpCommits = initialCommits + 1;
// wait for spark streaming to process one microbatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
}

val f2 = Future {
inputDF1.write.mode(SaveMode.Append).json(sourcePath)
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
Expand All @@ -112,7 +112,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
.load(destPath + "/*/*/*/*")
assert(hoodieROViewDF1.count() == 100)

inputDF2.write.mode(SaveMode.Append).json(sourcePath)
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
Expand Down