## Demonstration: Apache Hudi with PySpark, Kafka, Hive, and S3

__Purpose:__ Read messages from Kafka topic in JSON format and write to Amazon S3 as Parquet using Apache Hudi: Upserts and Delete  
__Author:__  Gary A. Stafford  
__Date:__ 2021-10-03  
__References:__  
- https://hudi.apache.org/docs/quick-start-guide/
- https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html
- https://hudi.apache.org/docs/configurations#SPARK_DATASOURCE

#### Run commands from master node

SSH to EMR master node as `hadoop` user.

```shell
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/jars/spark-avro.jar /apps/hudi/lib/spark-avro.jar
```

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .appName("pagila-sales-hudi") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars":
            "hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
        "spark.serializer":
            "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.hive.convertMetastoreParquet":
            "false"
    }
}

In [None]:
sc.getConf().getAll()

In [None]:
from datetime import datetime
import os
import time

import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType, TimestampType

In [None]:
def get_parameters():
    """Load parameter values from AWS Systems Manager (SSM) Parameter Store"""

    os.environ["AWS_DEFAULT_REGION"] = ec2_metadata.region
    ssm_client = boto3.client("ssm")

    parameters = {
        "kafka_servers":
            ssm_client.get_parameter(Name="/kafka_spark_demo/kafka_servers")
            ["Parameter"]["Value"],
        "kafka_demo_bucket":
            ssm_client.get_parameter(Name="/kafka_spark_demo/kafka_demo_bucket")
            ["Parameter"]["Value"],
        "schema_registry_url":
            ssm_client.get_parameter(
                Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]
            ["Value"],
    }

    return parameters

In [None]:
def read_from_kafka(topic, schema):
    """Batch read messages from Kafka topic, convert from binary and deserialize JSON"""

    options_read_kafka = {
        "kafka.bootstrap.servers":
            params["kafka_servers"],
        "subscribe":
            kafka_topic,
        "startingOffsets":
            "earliest",
        "endingOffsets":
            "latest",
        "kafka.ssl.truststore.location":
            "/tmp/kafka.client.truststore.jks",
        "kafka.security.protocol":
            "SASL_SSL",
        "kafka.sasl.mechanism":
            "AWS_MSK_IAM",
        "kafka.sasl.jaas.config":
            "software.amazon.msk.auth.iam.IAMLoginModule required;",
        "kafka.sasl.client.callback.handler.class":
            "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    }

    df = spark.read \
        .format("kafka") \
        .options(**options_read_kafka) \
        .load() \
        .selectExpr("CAST(value AS STRING)", "timestamp") \
        .select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
        .select("data.*", "timestamp")

    return df

In [None]:
def write_to_kafka(df):
    """Serialize JSON-format messages and batch write to Kafka topic"""

    options_write = {
        "kafka.bootstrap.servers":
            params["kafka_servers"],
        "topic":
            kafka_topic,
        "kafka.ssl.truststore.location":
            "/tmp/kafka.client.truststore.jks",
        "kafka.security.protocol":
            "SASL_SSL",
        "kafka.sasl.mechanism":
            "AWS_MSK_IAM",
        "kafka.sasl.jaas.config":
            "software.amazon.msk.auth.iam.IAMLoginModule required;",
        "kafka.sasl.client.callback.handler.class":
            "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
    }

    df \
        .selectExpr("CAST(payment_id AS STRING) AS key",
                    "to_json(struct(*)) AS value") \
        .write \
        .format("kafka") \
        .options(**options_write) \
        .save()

In [None]:
params = get_parameters()

kafka_topic = "pagila.sales.spark.streaming"

schema = StructType([
    StructField("payment_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("amount", FloatType(), False),
    StructField("payment_date", TimestampType(), False),
    StructField("city", StringType(), True),
    StructField("district", StringType(), True),
    StructField("country", StringType(), False),
])

In [None]:
# batch read all messages from kafka topic

df_sales = read_from_kafka(kafka_topic, schema)

In [None]:
%%display -n 5
df_sales

In [None]:
# set hudi write config

table_name = "hudi.hudi_pagila_sales"
base_path = f"s3://{params['kafka_demo_bucket']}/hudi/"

options_write_hudi = {
    "hoodie.table.name": table_name,
    "hoodie.datasource.write.recordkey.field": "payment_id",
    "hoodie.datasource.write.table.name": table_name,
    "hoodie.datasource.write.partitionpath.field": "country",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "timestamp",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
    "hoodie.datasource.hive_sync.enable": True,
    "hoodie.datasource.hive_sync.assume_date_partitioning": False,
    "hoodie.datasource.hive_sync.database": "hudi",
    "hoodie.datasource.hive_sync.auto_create_database": True,
    "hoodie.datasource.hive_sync.table": "sales",
    "hoodie.datasource.hive_sync.partition_fields": "country",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
}

In [None]:
# overwrite all records to s3 as parquet, partitioned by country, using hudi

df_sales.write \
    .format("org.apache.hudi") \
    .options(**options_write_hudi) \
    .mode("overwrite") \
    .save(base_path)

In [None]:
%%sh

# preview hudi files in s3

BASE_PATH=$(aws ssm get-parameter \
            --name "/kafka_spark_demo/kafka_demo_bucket" \
            --query "Parameter.Value" \
            --region "us-east-1" \
            --output text)

aws s3api list-objects-v2 \
    --bucket $BASE_PATH --prefix "hudi/" \
    --query "Contents[].Key" --max-items 25

In [None]:
# demonstrate hive integration (assumes hive installed on emr)

spark.sql("SHOW databases").show(25, truncate=False)
spark.sql("USE `hudi`")
spark.sql("SHOW tables").show(25, truncate=False)
spark.sql("DESCRIBE sales").show(25, truncate=False)
spark.sql("MSCK REPAIR TABLE sales")
spark.sql("SELECT * FROM sales").show(5, truncate=False)

In [None]:
# read data back from s3

df_sales_snapshot = spark \
    .read \
    .format("org.apache.hudi") \
    .load(f"{base_path}/*/*")

df_sales_snapshot.createOrReplaceTempView("hudi_sales_snapshot")

In [None]:
%%sql

SELECT payment_id, payment_date, amount, city, district, country 
FROM hudi_sales_snapshot 
WHERE country="Japan" 
ORDER BY payment_date 
LIMIT 10

In [None]:
# update a row in dataframe with different payment amount and date vs. new message in kafka

test_payment_id = df_sales.first()["payment_id"]
print(test_payment_id)

df_update = df_sales \
    .filter(F.col("payment_id") == test_payment_id) \
    .withColumn("payment_date", F.current_timestamp()) \
    .withColumn("amount", (F.lit(9.99)).cast(FloatType()))

In [None]:
# show updated row

df_update.show(truncate=False)

In [None]:
# upsert modified record to S3 using hudi

df_update.write \
    .format("org.apache.hudi") \
    .options(**options_write_hudi) \
    .mode("append") \
    .save(base_path)

In [None]:
# read updated data back from s3

df_updated_sales_snapshot = spark \
    .read \
    .option("mergeSchema", "true") \
    .format("org.apache.hudi") \
    .load(f"{base_path}/*/*")

df_updated_sales_snapshot.createOrReplaceTempView("df_updated_sales_snapshot")

In [None]:
spark.sql(f"""SELECT payment_id, customer_id, payment_date, amount, city, district, country 
                FROM df_updated_sales_snapshot 
                WHERE payment_id={test_payment_id}""") \
    .show(truncate=False)

In [None]:
# delete the same record from s3 using hudi

df_update.write \
    .format("org.apache.hudi") \
    .option("hoodie.datasource.write.payload.class",
            "org.apache.hudi.common.model.EmptyHoodieRecordPayload") \
    .options(**options_write_hudi) \
    .mode("append") \
    .save(base_path)

In [None]:
# read updated data back from S3

df_updated_sales_snapshot = spark \
    .read \
    .format("org.apache.hudi") \
    .load(f"{base_path}/*/*")

df_updated_sales_snapshot.createOrReplaceTempView("df_updated_sales_snapshot")

In [None]:
# this should return zero results since record was deleted

spark.sql(f"""SELECT COUNT(payment_id) AS count
                FROM df_updated_sales_snapshot
                WHERE payment_id={test_payment_id}""") \
    .show()

In [None]:
# create a dataframe with three new records

# time.sleep(120)

vals = [(int(99997), int(997), float(10.97), datetime.now(), "Mumbai", "Maharashtra", "India"),
        (int(99998), int(998), float(2.98), datetime.now(), "Paris", "Île-de-France", "France"),
        (int(99999), int(999), float(3.99), datetime.now(), "Sunnyvale", "California", "United States")]

df_new_row = spark.createDataFrame(vals, schema)

df_new_row.show(truncate=False)

In [None]:
# write the new records to kafka topic

write_to_kafka(df_new_row)

In [None]:
# batch read all messages back from kafka

df_sales_new = read_from_kafka(kafka_topic, schema)

df_sales_new.createOrReplaceTempView("df_sales_new_view")

spark.sql(f"SELECT * FROM df_sales_new_view WHERE payment_id>=99990") \
    .show(truncate=False)

In [None]:
# write new rows of data to kafka topic

df_sales_new.write \
    .format("org.apache.hudi") \
    .options(**options_write_hudi) \
    .mode("append") \
    .save(base_path)

In [None]:
# incremental query with hudi to get a stream of records that have changed since a given commit timestamp

begin_instant_time = "2021-10-07 19:00:00:000"

options_read_hudi = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": begin_instant_time,
}

df_sales_new_hudi = spark.read \
    .format("org.apache.hudi") \
    .options(**options_read_hudi) \
    .load(base_path)

df_sales_new_hudi.createOrReplaceTempView("df_sales_new_hudi_view")

spark.sql(f"SELECT * FROM df_sales_new_hudi_view WHERE payment_id>=99997").show()

In [None]:
# query hudi table in hive metastore for latest records vs. spark dataframe

spark.sql(f"""SELECT payment_id, customer_id,
                from_unixtime(payment_date/1000000) AS payment_date, 
                amount, city, district, country
              FROM hudi.sales 
              WHERE payment_id>=99997
              ORDER BY payment_id""") \
    .show(truncate=False)