# Building Data Lakes on AWS with Kafka Connect, Debezium, Apicurio Registry, and Apache Hudi

Author: Gary Stafford

Date: 2023-02-26

EMR v6.9: hudi-spark: 0.12.1-amzn-0, spark-client: 3.3.0-amzn-1

In [None]:
%%configure -f

{
    "conf": {
        "spark.jars": "hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
        "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
base_path = "s3://open-data-lake-demo-us-east-1/cdc_hudi_data_lake/silver/tickit.ecomm.sale"

In [None]:
%%sql

-- query aws glue data catalog #1

select _hoodie_commit_time, 
    __op, 
    salesid, 
    qtysold, 
    pricepaid, 
    commission
from `tickit_cdc_hudi`.`sale`
where salesid = 200

In [None]:
%%sql

-- query aws glue data catalog #2

select _hoodie_commit_time, 
    __op, 
    salesid, 
    qtysold, 
    pricepaid, 
    commission
from `tickit_cdc_hudi`.`sale`
where salesid = 200
    and _hoodie_commit_time = "2023-02-27 03:17:13:915"

In [None]:
columns = ["_hoodie_commit_time", "__op", "salesid", "qtysold", "pricepaid", "commission"]

In [None]:
# load the hudi data files dataframe
df = (spark.read
      .format("hudi")
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())

In [None]:
# load the hudi data files dataframe

df = (spark.read
      .format("hudi")
      .load(base_path))

(df.filter((df.salesid == 200) & (df.__op != "d"))
    .select(columns)
    .show())

In [None]:
# hudi time travel query #1

instant_time = "2023-02-27 03:17:13.915"

read_options = {
    "as.of.instant": instant_time,
}

df = (spark.read
      .format("hudi")
      .options(**read_options)
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())

In [None]:
# hudi time travel query #1

instant_time = "2023-02-27 03:17:13.915"

read_options = {
    "as.of.instant": instant_time,
}

df = (spark.read
      .format("hudi")
      .options(**read_options)
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())

In [None]:
# hudi time travel query #2

instant_time = "2023-02-27 03:15:00.000"

read_options = {
    "as.of.instant": instant_time,
}

df = (spark.read
      .format("hudi")
      .options(**read_options)
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())

In [None]:
# hudi point in time query #2

begin_time = "202302270313" # 2023-02-27 03:13
end_time = "202302270315" # 2023-02-27 03:15

read_options = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": begin_time,
    "hoodie.datasource.read.end.instanttime": end_time
}

df = (spark.read
      .format("hudi")
      .options(**read_options)
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())

In [None]:
# hudi point in time query #1

begin_time = "000"
end_time = "20230228" # 2023-02-28 00:00

read_options = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": begin_time,
    "hoodie.datasource.read.end.instanttime": end_time
}

df = (spark.read
      .format("hudi")
      .options(**read_options)
      .load(base_path))

(df.filter(df.salesid == 200)
    .select(columns)
    .show())