In [5]:
%session_id_prefix product_cdc_upsert_iceberg_01
%glue_version 3.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg",
  "--job-bookmark-option": "job-bookmark-enable",
  "--JOB_NAME": "product_cdc_upsert_iceberg"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Setting session ID prefix to product_cdc_upsert_iceberg_01
Setting Glue version to: 3.0
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions', '--datalake-formats': 'iceberg', '--job-bookmark-option': 'job-bookmark-enable', '--JOB_NAME': 'product_cdc_upsert_iceberg'}


In [1]:
from awsglue.job import Job

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 1f933960-5bf4-4e55-bf4f-2c8f8998ae08
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--datalake-formats iceberg
--job-bookmark-option job-bookmark-enable
--JOB_NAME product_cdc_upsert_iceberg
Waiting for session 1f933960-5bf4-4e55-bf4f-2c8f8998ae08 to get into ready status...
Session 1f933960-5bf4-4e55-bf4f-2c8f8998ae08 has been created.



In [2]:
catalog_name = "glue_catalog"
bucket_name = "chiholee-datalake001"
database_name = "ecommerce"

table_name = "product"
pk = 'product_id'
last_update_time = 'last_update_time'

source_bucket_prefix = "transaction/cdc/raw"
source_path = f"s3://{bucket_name}/{source_bucket_prefix}"
source_table_name = table_name

iceberg_bucket_prefix = "transaction/iceberg"
warehouse_path = f"s3://{bucket_name}/{iceberg_bucket_prefix}"
iceberg_table_name = f"{table_name}_cdc_glue_iceberg"




In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()




In [4]:
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions


glueContext = GlueContext(spark)

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)
job.init(args['JOB_NAME'], args)





In [5]:
cdcDyf = glueContext.create_dynamic_frame_from_options(
    connection_type='s3',
    connection_options={
        'paths': [f'{source_path}/{database_name}/{source_table_name}/'],
        'groupFiles': 'none',
        'recurse': True
    },
    format='parquet',
    transformation_ctx='cdcDyf')




In [6]:
print(f"## Count of CDC data after last job bookmark:{cdcDyf.count()}")

## Count of CDC data after last job bookmark:0


In [7]:
cdcDf = cdcDyf.toDF()




In [8]:
cdcDf.show()

++
||
++
++


In [9]:
import sys
from pyspark.sql import Window
from pyspark.sql import functions as F 




In [10]:
cdcDf.createOrReplaceTempView("cdcDf")




In [12]:
cdcDf = spark.sql("""
select *
from cdcDf
where (product_id, last_update_time) in
(
    select product_id, max(last_update_time) max_op_time
    from cdcDf
    group by product_id
)
"""
)

AnalysisException: cannot resolve '`product_id`' given input columns: []; line 4 pos 7;
'Project [*]
+- 'Filter named_struct(product_id, 'product_id, last_update_time, 'last_update_time) IN (list#5 [])
   :  +- 'Aggregate ['product_id], ['product_id, 'max('last_update_time) AS max_op_time#4]
   :     +- 'UnresolvedRelation [cdcDf], [], false
   +- SubqueryAlias cdcdf
      +- LogicalRDD false



In [13]:
cdcInsertCount = cdcDf.filter("Op = 'I'").count()
cdcUpdateCount = cdcDf.filter("Op = 'U'").count()
cdcDeleteCount = cdcDf.filter("Op = 'D'").count()
print(f"Inserted count: {cdcInsertCount}")
print(f"Updated count: {cdcUpdateCount}")
print(f"Deleted count: {cdcDeleteCount}")
print(f"Total CDC count: {cdcDf.count()}")

AnalysisException: cannot resolve '`Op`' given input columns: []; line 1 pos 0;
'Filter ('Op = I)
+- LogicalRDD false



In [None]:
dropColumnList = ['Op','dms_update_time']

In [None]:
from datetime import datetime
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import concat, col, lit, to_timestamp

current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cdcDf = cdcDf.withColumn('order_dt',to_timestamp(col('order_dt')))
cdcDf = (cdcDf
      .withColumn('year', year(col('order_dt')))
      .withColumn('month', month(col('order_dt')))
      .withColumn('day', dayofmonth(col('order_dt')))
     )
cdcDf = cdcDf.withColumn('last_applied_date',to_timestamp(lit(current_datetime)))



In [None]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_name}.{database_name}")
existing_tables = spark.sql(f"SHOW TABLES IN {catalog_name}.{database_name};")
df_existing_tables = existing_tables.select('tableName').rdd.flatMap(lambda x:x).collect()

In [None]:
upsertDf = cdcDf.filter("Op != 'D'").drop(*dropColumnList)
upsertDf.createOrReplaceTempView(f"{source_table_name}_upsert")

In [None]:
upsertDf.show()

In [6]:
# spark.sql(f"""
# select order_id, count(*)
# from {catalog_name}.{database_name}.{iceberg_table_name}
# group by order_id
# having count(*) > 1
# """).show()

In [7]:
# spark.sql(f"""
# select order_id, count(*)
# from {source_table_name}_upsert
# group by order_id
# having count(*) > 1
# """).show()

In [14]:
deleteDf = cdcDf.filter("Op = 'D'").drop(*dropColumnList)
deleteDf.createOrReplaceTempView(f"{source_table_name}_delete")

AnalysisException: cannot resolve '`Op`' given input columns: []; line 1 pos 0;
'Filter ('Op = D)
+- LogicalRDD false



In [8]:
# deleteDf.show()

In [15]:
print(f"Table {source_table_name}_iceberg is upserting...")
spark.sql(f"""MERGE INTO {catalog_name}.{database_name}.{iceberg_table_name} t
    USING {source_table_name}_upsert s ON s.{pk} = t.{pk}
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """)

AnalysisException: Table or view not found: product_upsert; line 1 pos 0;
'MergeIntoTable ('s.product_id = 't.product_id), [updateaction(None)], [insertaction(None)]
:- SubqueryAlias t
:  +- SubqueryAlias glue_catalog.ecommerce.product_cdc_iceberg
:     +- RelationV2[product_id#6, name#7, img_path#8, category#9, price#10, last_update_time#11, year#12, month#13, day#14, last_applied_date#15] glue_catalog.ecommerce.product_cdc_iceberg
+- 'SubqueryAlias s
   +- 'UnresolvedRelation [product_upsert], [], false



In [16]:
spark.sql(f"""
select min(last_update_time), max(last_update_time)
from {catalog_name}.{database_name}.{iceberg_table_name}
""").show()


+---------------------+---------------------+
|min(last_update_time)|max(last_update_time)|
+---------------------+---------------------+
|  2023-04-07 03:06:17|  2023-04-10 14:25:33|
+---------------------+---------------------+
