Skip to content

Commit

Permalink
[HUDI-4758] Add validations to java spark examples (apache#6615)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonvex authored and voonhous committed Oct 7, 2022
1 parent 46afe9b commit cdb8c0b
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

private static final Random RAND = new Random(46474747);
Expand Down Expand Up @@ -130,12 +130,36 @@ public Stream<HoodieRecord<T>> generateInsertsStream(String commitTime, Integer
});
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public List<HoodieRecord<T>> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) {
return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList());
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public Stream<HoodieRecord<T>> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) {
int currSize = getNumExistingKeys();

return IntStream.range(0, n).boxed().map(i -> {
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp);
numExistingKeys++;
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
});
}

/**
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
* list
*
* @param commitTime Commit Timestamp
* @param n Number of updates (including dups)
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
Expand All @@ -148,15 +172,32 @@ public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
return updates;
}

/**
* Generates new updates, one for each of the keys above
* list
*
* @param commitTime Commit Timestamp
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUniqueUpdates(String commitTime) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < numExistingKeys; i++) {
KeyPartition kp = existingKeys.get(i);
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
return updates;
}

public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String commitTime) {
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
}

private Option<String> convertToString(HoodieRecord<T> record) {
try {
String str = HoodieAvroUtils
.bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema)
.toString();
.bytesToAvro(((HoodieAvroPayload) record.getData()).getRecordBytes(), avroSchema)
.toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}"));
} catch (IOException e) {
Expand All @@ -166,7 +207,7 @@ private Option<String> convertToString(HoodieRecord<T> record) {

public List<String> convertToStringList(List<HoodieRecord<T>> records) {
return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
.collect(Collectors.toList());
}

public int getNumExistingKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -65,30 +66,51 @@ public static void main(String[] args) {
public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();

insertData(spark, jsc, tablePath, tableName, dataGen);
String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table";

Dataset<Row> insertDf = insertData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
assert insertDf.except(spark.sql(snapshotQuery)).count() == 0;

updateData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotBeforeUpdate = spark.sql(snapshotQuery);
Dataset<Row> updateDf = updateData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterUpdate = spark.sql(snapshotQuery);
assert snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count();
assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0;

incrementalQuery(spark, tablePath, tableName);
pointInTimeQuery(spark, tablePath, tableName);

delete(spark, tablePath, tableName);
Dataset<Row> snapshotBeforeDelete = snapshotAfterUpdate;
Dataset<Row> deleteDf = delete(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterDelete = spark.sql(snapshotQuery);
assert snapshotAfterDelete.intersect(deleteDf).count() == 0;
assert snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0;

insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotBeforeOverwrite = snapshotAfterDelete;
Dataset<Row> overwriteDf = insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> withoutThirdPartitionDf = snapshotBeforeOverwrite.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'");
Dataset<Row> expectedDf = withoutThirdPartitionDf.union(overwriteDf);
Dataset<Row> snapshotAfterOverwrite = spark.sql(snapshotQuery);
assert snapshotAfterOverwrite.except(expectedDf).count() == 0;


Dataset<Row> snapshotBeforeDeleteByPartition = snapshotAfterOverwrite;
deleteByPartition(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterDeleteByPartition = spark.sql(snapshotQuery);
assert snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath == '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'")).count() == 0;
assert snapshotAfterDeleteByPartition.count() == snapshotBeforeDeleteByPartition.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count();
}

/**
* Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below.
*/
public static void insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
Expand All @@ -101,15 +123,16 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t
.option(TBL_NAME.key(), tableName)
.mode(Overwrite)
.save(tablePath);
return df;
}

/**
* Generate new records, load them into a {@link Dataset} and insert-overwrite it into the Hudi dataset
*/
public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
List<String> inserts = dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, 20, HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));

df.write().format("org.apache.hudi")
Expand All @@ -121,9 +144,9 @@ public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc,
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
return df;
}


/**
* Load the data files into a DataFrame.
*/
Expand Down Expand Up @@ -157,11 +180,11 @@ public static void queryData(SparkSession spark, JavaSparkContext jsc, String ta
* This is similar to inserting new data. Generate updates to existing trips using the data generator,
* load into a DataFrame and write DataFrame into the hudi dataset.
*/
public static void updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {

String commitTime = Long.toString(System.currentTimeMillis());
List<String> updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10));
List<String> updates = dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime));
Dataset<Row> df = spark.read().json(jsc.parallelize(updates, 1));
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
Expand All @@ -171,16 +194,18 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
return df;
}

/**
* Deleta data based in data information.
*/
public static void delete(SparkSession spark, String tablePath, String tableName) {
public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {

Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
roViewDF.createOrReplaceTempView("hudi_ro_table");
Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2");
Dataset<Row> toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2");
Dataset<Row> df = toBeDeletedDf.select("uuid", "partitionpath", "ts");

df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
Expand All @@ -191,10 +216,11 @@ public static void delete(SparkSession spark, String tablePath, String tableName
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.mode(Append)
.save(tablePath);
return toBeDeletedDf;
}

/**
* Delete the data of a single or multiple partitions.
* Delete the data of the first partition.
*/
public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) {
Dataset<Row> df = spark.emptyDataFrame();
Expand All @@ -204,9 +230,8 @@ public static void deleteByPartition(SparkSession spark, String tablePath, Strin
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.option("hoodie.datasource.write.partitions.to.delete",
String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS))
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value())
.option("hoodie.datasource.write.partitions.to.delete", HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.mode(Append)
.save(tablePath);
}
Expand All @@ -223,7 +248,7 @@ public static void incrementalQuery(SparkSession spark, String tablePath, String
.map((Function<Row, String>) row -> row.getString(0))
.take(50);

String beginTime = commits.get(commits.size() - 2); // commit time we are interested in
String beginTime = commits.get(commits.size() - 1); // commit time we are interested in

// incrementally query data
Dataset<Row> incViewDF = spark
Expand All @@ -250,7 +275,7 @@ public static void pointInTimeQuery(SparkSession spark, String tablePath, String
.map((Function<Row, String>) row -> row.getString(0))
.take(50);
String beginTime = "000"; // Represents all commits > this time.
String endTime = commits.get(commits.size() - 2); // commit time we are interested in
String endTime = commits.get(commits.size() - 1); // commit time we are interested in

//incrementally query data
Dataset<Row> incViewDF = spark.read().format("org.apache.hudi")
Expand Down

0 comments on commit cdb8c0b

Please sign in to comment.