# Demo - Debezium + AWS Glue + Apache Iceberg

####  1/ Configuring AWS Glue session


In [None]:
%session_id_prefix cdc-debezium-kinesis-iceberg
%glue_version 3.0
%idle_timeout 30
%number_of_workers 6
%streaming
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

####  2/ Defining parameters. Replace \<bucket_name> with your bucket name and \<stream_name> with your Amazon Kinesis stream name.

In [None]:
bucket_name = "<bucket_name>" 
stream_name = "<stream_name>" #"debezium-demo.DemoDBZ.MYTABLE"

catalog_name = "glue_catalog"
bucket_prefix = "cdc"
database_name = "demo_cdc_debezium"
table_name = "mytable"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"

####  3/ Importing libraries, starting Spark session and Glue context.

In [None]:
import sys
import json
import boto3

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config(f"spark.sql.catalog.{catalog_name}.lock.table", "iceberg_metastore") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

glueContext = GlueContext(spark)
job = Job(glueContext)

####  4/ Droping table on AWS Glue catalog if exists.

In [None]:
query = f"""
DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}
"""
spark.sql(query)

####  5/ Creating database on AWS Glue catalog if not exists.

In [None]:
query = f"""
CREATE DATABASE IF NOT EXISTS {database_name}
"""
spark.sql(query)   


####  6/ Creating data frame (DF) from Kinesis Stream.

In [None]:
stream_arn= boto3.client('kinesis').describe_stream(StreamName=stream_name)["StreamDescription"]["StreamARN"]

conn_={
    "typeOfData": "kinesis",
    "streamARN": stream_arn,
    "classification": "json",
    "startingPosition": "earliest",#latest, "2023-09-25T19:35:00-03:00" Glue 4.0+
    "inferSchema": "true"
}

kinesis_data = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options=conn_,
    transformation_ctx="kinesis_data",
)

####  7/ Sampling input stream for interactive development

In [None]:
options = {
    "pollingTimeInMs": "10000",
    "windowSize": "5 seconds",
}

dyf = glueContext.getSampleStreamingDynamicFrame(kinesis_data, options, None)
#To run streaming applications instead of just sampling, change getSampleStreaming to forEachBatch method 

print(dyf.count())

When the sampled Dynamic Frame is empty (0), the polling time could not be enough to process the records it ingested. Increase poolingTimeInMs value and try again.

####  8/ Reading DF schema

In [None]:
df2 = DynamicFrame.fromDF(dyf.toDF().select("payload"), glueContext, 'df2')
df2.printSchema()

####  9/ (Optional) Stream data filterd by date

##### Because getSampleStreamingDynamicFrame method was configured with the value "earlist" for the "startingPosition" parameter, every time you run the method it will read all valid data records. So, it can be useful apply filters on data frame, specially if you are interested in removing some data related to other tests. If it's your case, uncomment the lines below.

In [None]:
#from pyspark.sql import functions as F
#df3 = df2.toDF().withColumn("datetime_utc", F.to_utc_timestamp(F.from_unixtime(F.col("payload.ts_ms")/1000,'yyyy-MM-dd HH:mm:ss'),'UTC')).withColumn("epoch",F.col("payload.ts_ms"))
#df3.sort("epoch").show(n=100,truncate=30)

####  10/ Attention here! Creating temporary view from streaming data.
##### Only if you performed last step (optional), uncomment the second line below and replace XXXXXXXX by your desired interval.

In [None]:
df4 = df2.toDF()
#df4 = df3.filter(df3.epoch >= XXXXXXXX)
df4.createOrReplaceTempView("DEBEZIUM0")

####  11/ Simplifying the reading of streaming data

In [None]:
sql1 = """
SELECT
    payload.ts_ms as ts_ms
    ,payload.op as op
    ,payload.after as after
    ,payload.before as before
    ,payload.after.DATA as data
    ,payload.after.DATA_ID as id
    ,payload.after.*
FROM DEBEZIUM0
WHERE payload.op is not null
ORDER BY ts_ms
"""
dfX = spark.sql(sql1)
dfX.show(n=100,truncate=200)

####  12/ Creating iceberg table from temporary view

In [None]:
query = f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.{table_name}
USING iceberg
TBLPROPERTIES ('table_type'='ICEBERG', 'format-version'='2')
LOCATION '{warehouse_path}'
AS SELECT payload.after.* FROM DEBEZIUM0 where 1=0
"""
spark.sql(query)

spark.catalog.listTables(database_name)

####  13/ Reading iceberg metadata

In [None]:
spark.sql(f"select * from {catalog_name}.{database_name}.{table_name}.history").show(truncate=False)

In [None]:
spark.sql(f"select * from {catalog_name}.{database_name}.{table_name}.snapshots").show(truncate=False,vertical=True)

In [None]:
spark.sql(f"select * from {catalog_name}.{database_name}.{table_name}.files").show(truncate=False,vertical=True)

####  14/ UPSERT (UPDATE OR INSERT) operation to add or update data into iceberg table
##### Only the last updated data will be inserted on iceberg table, this can improve a lot the time spend to process data, therefore cost savings.

In [None]:
sql1 = """
with cte as
(
SELECT
    max(payload.ts_ms) as ts_ms
    ,payload.after.DATA_ID
FROM DEBEZIUM0
WHERE payload.op is not null and payload.op != "d"
GROUP BY payload.after.DATA_ID
)
select distinct 
    d.payload.after.DATA
    ,d.payload.after.DATA_ID
from cte as c 
inner join DEBEZIUM0 as d
    on c.ts_ms = d.payload.ts_ms
    and c.DATA_ID = payload.after.DATA_ID
"""

df_upsert = spark.sql(sql1)

df_upsert.createOrReplaceTempView("UPSERT0")

####  15/ Reading UPSERT data

In [None]:
spark.sql("SELECT * FROM UPSERT0").show(n=100)

####  16/ Performing the UPSERT operation on iceberg table

In [None]:
spark.sql(f"""MERGE INTO {catalog_name}.{database_name}.{table_name} t
            USING UPSERT0 u ON u.DATA_ID = t.DATA_ID
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
            """)

glueContext.create_data_frame.from_catalog(database=f'{database_name}', table_name=f'{table_name}').show(n=100)

####  17/ DELETE operation to remove data from iceberg table

In [None]:
sql1 = """
with cte as
(
SELECT
    max(payload.ts_ms) as ts_ms
    ,COALESCE(payload.after.DATA_ID,payload.before.DATA_ID) as DATA_ID
FROM DEBEZIUM0
WHERE payload.op is not null
GROUP BY COALESCE(payload.after.DATA_ID,payload.before.DATA_ID)
)
select distinct c.DATA_ID
from cte as c 
inner join DEBEZIUM0 as d
    on c.ts_ms = d.payload.ts_ms
    and c.DATA_ID = COALESCE(payload.after.DATA_ID,payload.before.DATA_ID)
where payload.op = "d"
"""

if df2.toDF().select(F.col("payload.before")).dtypes[0][1] != 'string':
    df_delete = spark.sql(sql1)
    df_delete.createOrReplaceTempView("DELETE0")
else:
    print("Nothing to DELETE.")

####  18/ Reading DELETE data

In [None]:
if df2.toDF().select(F.col("payload.before")).dtypes[0][1] != 'string':
    spark.sql("SELECT * FROM DELETE0").show(n=100)
else:
    print("Nothing to DELETE.")

####  19/ Performing the DELETE operation on iceberg table

In [None]:
if df2.toDF().select(F.col("payload.before")).dtypes[0][1] != 'string':
    spark.sql(f"""MERGE INTO {catalog_name}.{database_name}.{table_name} t
                USING DELETE0 d ON d.DATA_ID = t.DATA_ID
                WHEN MATCHED THEN DELETE
                """)

glueContext.create_data_frame.from_catalog(database=f'{database_name}', table_name=f'{table_name}').show(n=100)

####  20/ Reading iceberg metadata

In [None]:
spark.sql(f"select * from {catalog_name}.{database_name}.{table_name}.snapshots").show(truncate=False,vertical=True)

In [None]:
spark.sql(f"select * from {catalog_name}.{database_name}.{table_name}.history").show(truncate=False)

####  21/ Stopping Glue session
##### To avoid additional costs, when you finish testing, execute command below to close the Glue session

In [None]:
%stop_session