In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///applications/hudi/lib/hudi-spark-bundle.jar,hdfs:///applications/hudi/lib/spark-avro.jar",
             "spark.sql.hive.convertMetastoreParquet":"false",     
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer"
           } 
}

In [2]:
# Create a DataFrame that represents Product Inventory
inputDF = spark.createDataFrame(
    [
        ("100", "Furniture", "Product 1", "25", "2021-12-01T09:51:39.340396Z"),
        ("101", "Cosmetic", "Product 2", "20", "2021-12-01T10:14:58.597216Z"),
        ("102", "Furniture", "Product 3", "30", "2021-12-01T11:51:40.417052Z"),
        ("103", "Electronics", "Product 4", "10", "2021-12-01T11:51:40.519832Z"),
        ("104", "Electronics", "Product 5", "50", "2021-12-01T11:58:00.512679Z")
    ],
    ["product_id", "category", "product_name", "quantity_available", "last_update_time"]
)

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'product_inventory',
'hoodie.datasource.write.recordkey.field': 'product_id',
'hoodie.datasource.write.partitionpath.field': 'category',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'product_inventory',
'hoodie.datasource.hive_sync.partition_fields': 'category',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

# Write the product Inventort DataFrame as a Hudi dataset to S3
inputDF.write.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://hudi-data-repository/product-inventory/')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1638650644484_0009,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Read the Hudi dataset from S3 and validate your field output
HudiProductDF = spark.read.format('org.apache.hudi').load('s3://hudi-data-repository/product-inventory' + '/*/*')
HudiProductDF.select("product_id", "category", "product_name", "quantity_available", "last_update_time").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+------------+------------------+--------------------+
|product_id|   category|product_name|quantity_available|    last_update_time|
+----------+-----------+------------+------------------+--------------------+
|       101|   Cosmetic|   Product 2|                20|2021-12-01T10:14:...|
|       103|Electronics|   Product 4|                10|2021-12-01T11:51:...|
|       104|Electronics|   Product 5|                50|2021-12-01T11:58:...|
|       102|  Furniture|   Product 3|                30|2021-12-01T11:51:...|
|       100|  Furniture|   Product 1|                25|2021-12-01T09:51:...|
+----------+-----------+------------+------------------+--------------------+

In [4]:
# Update quanity of product_id 102
from pyspark.sql.functions import col,lit
newDF = inputDF.filter(inputDF.product_id==102).withColumn('quantity_available',lit('29'))
newDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://hudi-data-repository/product-inventory/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Delete product record with ID 101
deleteDF = inputDF.filter(inputDF.product_id==101)
deleteDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://hudi-data-repository/product-inventory/')     

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Read from S3 to validate the update and delete record
HudiProductNewDF = spark.read.format('org.apache.hudi').load('s3://hudi-data-repository/product-inventory' + '/*/*')
HudiProductNewDF.select("product_id", "category", "product_name", "quantity_available", "last_update_time") \
.orderBy("product_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+------------+------------------+--------------------+
|product_id|   category|product_name|quantity_available|    last_update_time|
+----------+-----------+------------+------------------+--------------------+
|       100|  Furniture|   Product 1|                25|2021-12-01T09:51:...|
|       102|  Furniture|   Product 3|                29|2021-12-01T11:51:...|
|       103|Electronics|   Product 4|                10|2021-12-01T11:51:...|
|       104|Electronics|   Product 5|                50|2021-12-01T11:58:...|
+----------+-----------+------------+------------------+--------------------+

In [7]:
# List all columns on the dataframe to showcase additional metadata fields Hudi appends
HudiProductNewDF.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+----------------------+--------------------+----------+-----------+------------+------------------+--------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|product_id|   category|product_name|quantity_available|    last_update_time|
+-------------------+--------------------+------------------+----------------------+--------------------+----------+-----------+------------+------------------+--------------------+
|     20211205222848|  20211205222848_1_2|               103|           Electronics|a7143310-7210-462...|       103|Electronics|   Product 4|                10|2021-12-01T11:51:...|
|     20211205222848|  20211205222848_1_3|               104|           Electronics|a7143310-7210-462...|       104|Electronics|   Product 5|                50|2021-12-01T11:58:...|
|     20211205225705|  20211205225705_0_1|               102|             Furniture|4c7057

In [8]:
# Incremental query output, that fetches change data beyond certain time
incrementalQueryOptions = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': "20211205222848",
}
incQueryDF = spark.read.format('org.apache.hudi').options(**incrementalQueryOptions) \
.load('s3://hudi-data-repository/product-inventory') 
incQueryDF.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+----------------------+--------------------+----------+---------+------------+------------------+--------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|product_id| category|product_name|quantity_available|    last_update_time|
+-------------------+--------------------+------------------+----------------------+--------------------+----------+---------+------------+------------------+--------------------+
|     20211205225705|  20211205225705_0_1|               102|             Furniture|4c705739-be10-491...|       102|Furniture|   Product 3|                29|2021-12-01T11:51:...|
+-------------------+--------------------+------------------+----------------------+--------------------+----------+---------+------------+------------------+--------------------+