# Apache Hudi Query Tests

Author: Gary Stafford

Date: 2022-12-08

Version matrix:
- EMR 6.8 - hudi-spark: 0.11.1-amzn-0, spark-client: 3.3.0-amzn-0
- EMR 6.9 - hudi-spark: 0.12.1-amzn-0, spark-client: 3.3.0-amzn-1

In [4]:
%%configure -f

{
    "conf": {
        "spark.jars": "hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
        "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
13,application_1670889798990_0015,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
13,application_1670889798990_0015,pyspark,idle,Link,Link,,✔


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .enableHiveSupport() \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
base_path = "s3://open-data-lake-demo-us-east-1/emr_hudi_glue"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
data_specific_options = {
    "hoodie.datasource.hive_sync.database": "emr_demo",
    "hoodie.datasource.hive_sync.partition_fields": "creation_date",
    "hoodie.datasource.hive_sync.table": "emr_hudi_glue",
    "hoodie.datasource.write.partitionpath.field": "creation_date",
    "hoodie.datasource.write.precombine.field": "last_update_time",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.table.name": "emr_hudi_glue",
}

hudi_write_options = {
    "hoodie.bloom.index.filter.type": "DYNAMIC_V0",
    "hoodie.bloom.index.update.partition.path": "false",
    "hoodie.compact.inline.max.delta.commits": 1,
    "hoodie.compact.inline.trigger.strategy": "NUM_COMMITS",
    "hoodie.datasource.hive_sync.assume_date_partitioning": "false",
    "hoodie.datasource.compaction.async.enable": "false",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.write.drop.partition.columns": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.index.type": "GLOBAL_BLOOM",
    "hoodie.parquet.small.file.limit": 1024 * 1024 * 128,
    "hoodie.upsert.shuffle.parallelism": 2,
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Insert a first record
input_df = spark.createDataFrame(
    [
        ("100", "Carlos", "2022-11-28", "2022-11-29T13:50:39.340399Z"),
    ],
    ["id", "user", "creation_date", "last_update_time"],
)

# Write a DataFrame as a Hudi dataset
input_df.write \
    .format("org.apache.hudi") \
    .options(**{**data_specific_options, **hudi_write_options}) \
    .option("hoodie.datasource.write.operation", "insert") \
    .mode("append") \
    .save(base_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Insert a second record
input_df = spark.createDataFrame(
    [
        ("101", "Sri", "2022-11-29", "2022-12-01T12:14:58.597219Z"),
    ],
    ["id", "user", "creation_date", "last_update_time"],
)

# Write a DataFrame as a Hudi dataset
input_df.write \
    .format("org.apache.hudi") \
    .options(**{**data_specific_options, **hudi_write_options}) \
    .option("hoodie.datasource.write.operation", "insert") \
    .mode("append") \
    .save(base_path)

In [None]:
%%sql
select * from `emr_demo`.`emr_hudi_glue` order by _hoodie_commit_time;

In [None]:
# Update the both record
input_df = spark.createDataFrame(
    [
        ("100", "Carlos", "2022-11-28", "2022-11-29T13:50:39.340399Z"),
        ("101", "Sri", "2022-11-29", "2022-12-01T12:14:58.597219Z"),
    ],
    ["id", "user", "creation_date", "last_update_time"],
)

# Write a DataFrame as a Hudi dataset
input_df.write \
    .format("org.apache.hudi") \
    .options(**{**data_specific_options, **hudi_write_options}) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save(base_path)

In [None]:
%%sql
select * from `emr_demo`.`emr_hudi_glue` order by _hoodie_commit_time;

In [None]:
# does not return correct records (returns latest snapshot)

begin_time = 20221216000000000

incremental_read_options = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": begin_time,
}

incremental_df = (
    spark.read.format("org.apache.hudi")
    .options(**incremental_read_options)
    .table("emr_demo.emr_hudi_glue")
)

incremental_df.sort("_hoodie_commit_time").show()

In [None]:
# does not return correct records (returns latest snapshot)

begin_time = 20221216020114173
end_time = 20221216020114174

incremental_read_options = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": begin_time,
    "hoodie.datasource.read.end.instanttime": end_time,
}

incremental_df = (
    spark.read.format("org.apache.hudi")
    .options(**incremental_read_options)
    .table("emr_demo.emr_hudi_glue")
)

incremental_df.sort("_hoodie_commit_time").show()

In [None]:
# does not return correct records (returns latest snapshot)

incremental_df = (
    spark.read.format("org.apache.hudi")
    .option("as.of.instant", 20221216000000000)
    .table("emr_demo.emr_hudi_glue")
)

incremental_df.sort("_hoodie_commit_time").show()

In [None]:
%%sql
-- # throws error: java.lang.UnsupportedOperationException: Table emr_demo.emr_hudi_glue does not support time travel.
select * from `emr_demo`.`emr_hudi_glue` timestamp as of 20221216020114174;