## Implementing SCD-2 with Apache Iceberg

In [1]:
# Define the AWS env variables if you are using AWS Auth:
%env AWS_REGION= region
%env AWS_ACCESS_KEY_ID= key
%env AWS_SECRET_ACCESS_KEY= secret

env: AWS_REGION=region
env: AWS_ACCESS_KEY_ID=key
env: AWS_SECRET_ACCESS_KEY=secret


# Define configurations for Spark, Iceberg & Catalog (Glue)

In [2]:
import pyspark
from pyspark.sql import SparkSession
import os


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
    
    # first we will define the packages that we need. Iceberg Spark runtime
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178,org.apache.hadoop:hadoop-aws:3.3.1')
        
    # This property allows us to add any extensions that we want to use
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    
    # configures a new catalog to a particular implementation of SparkCatalog
        .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog')
    
    # particular type of catalog we are using
        .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog')
    
    # engine writes to the warehouse
        .set('spark.sql.catalog.glue.warehouse', 's3://my-bucket/warehouse/')
    
    # changes IO impl of catalog, mainly for changing writing data to object storage
        .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
print("Spark Running")

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c543c82c-5cdb-445a-a9af-9ad92e17b8d5;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.2.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found software.amazon.awssdk#annotations;2.17.178 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found software.amazon.awssdk#

23/09/15 17:03:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/15 17:03:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/15 17:03:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/09/15 17:03:56 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/09/15 17:03:56 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/09/15 17:03:56 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
Spark Running


In [3]:
from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
    DoubleType,
    DateType,
    BooleanType,
    TimestampType
)

# Define customer schema

In [4]:
dim_customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

# Create customer records

In [5]:
from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

In [6]:
from datetime import datetime

customer_dim_df = spark.createDataFrame([('1', 'Morgan', 'Brown', 'Toronto', 'Canada', datetime.strptime('2020-09-27', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                       ('2', 'Angie', 'Keller', 'Chicago', 'US', datetime.strptime('2020-10-14', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True)], dim_customer_schema)

customer_ice_df = customer_dim_df.withColumn("customer_dim_key", random_udf())

customer_ice_df.cache()

customer_ice_df.show(5, False)

<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
                                                                                

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|city   |country|eff_start_date|eff_end_date|timestamp          |is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|1          |Morgan    |Brown    |Toronto|Canada |2020-09-27    |2999-12-31  |2020-12-08 09:15:32|true      |1694797722535090|
|2          |Angie     |Keller   |Chicago|US     |2020-10-14    |2999-12-31  |2020-12-08 09:15:32|true      |1694797722510645|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+



# Store customer records as an Iceberg table

In [7]:
from pyspark.sql.functions import col
customer_ice_df.writeTo("glue.dip.customers").partitionedBy(col("country")).create()

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [8]:
spark.sql("SELECT * FROM glue.dip.customers").toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,customer_id,first_name,last_name,city,country,eff_start_date,eff_end_date,timestamp,is_current,customer_dim_key
0,1,Morgan,Brown,Toronto,Canada,2020-09-27,2999-12-31,2020-12-08 09:15:32,True,1694797722535090
1,2,Angie,Keller,Chicago,US,2020-10-14,2999-12-31,2020-12-08 09:15:32,True,1694797722510645


# Sales data

In [9]:
from pyspark.sql.functions import to_timestamp

fact_sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_fact_df = spark.createDataFrame([('111', 40, 90.5, datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('112', 250, 80.65, datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('113', 10, 600.5, datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], fact_sales_schema)

sales_fact_df.show()

                                                                                

+-------+--------+-----+-------------------+-----------+
|item_id|quantity|price|          timestamp|customer_id|
+-------+--------+-----+-------------------+-----------+
|    111|      40| 90.5|2020-11-17 09:15:32|          1|
|    112|     250|80.65|2020-10-28 09:15:32|          1|
|    113|      10|600.5|2020-12-08 09:15:32|          2|
+-------+--------+-----+-------------------+-----------+



# Customer dimension key lookup

In [10]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_ice_df.customer_id,
             sales_fact_df.timestamp >= customer_ice_df.eff_start_date,
             sales_fact_df.timestamp < customer_ice_df.eff_end_date]

customers_dim_key_df = (sales_fact_df
                          .join(customer_ice_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_ice_df.customer_dim_key.isNull(), '-1')
                                  .otherwise(customer_ice_df.customer_dim_key)
                                  .alias("customer_dim_key") )
                       )

In [11]:
customers_dim_key_df.show()

                                                                                

+-------+--------+-----+-------------------+-----------+----------------+
|item_id|quantity|price|          timestamp|customer_id|customer_dim_key|
+-------+--------+-----+-------------------+-----------+----------------+
|    111|      40| 90.5|2020-11-17 09:15:32|          1|1694797722535090|
|    112|     250|80.65|2020-10-28 09:15:32|          1|1694797722535090|
|    113|      10|600.5|2020-12-08 09:15:32|          2|1694797722510645|
+-------+--------+-----+-------------------+-----------+----------------+



# Save sales data as another Iceberg table

In [12]:
customers_dim_key_df.writeTo("glue.dip.sales2").create()

                                                                                

# Read sales table

In [13]:
spark.sql("SELECT * from glue.dip.sales2").show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------+--------+-----+-------------------+-----------+----------------+
|item_id|quantity|price|          timestamp|customer_id|customer_dim_key|
+-------+--------+-----+-------------------+-----------+----------------+
|    111|      40| 90.5|2020-11-17 09:15:32|          1|1694797722535090|
|    112|     250|80.65|2020-10-28 09:15:32|          1|1694797722535090|
|    113|      10|600.5|2020-12-08 09:15:32|          2|1694797722510645|
+-------+--------+-----+-------------------+-----------+----------------+



                                                                                

# Get number of sales per country

In [14]:
spark.sql(
    'SELECT ct.country, '
    'SUM(st.quantity) as sales_quantity,'
    'COUNT(*) as count_sales '
    'FROM glue.dip.sales2 st '
    'INNER JOIN glue.dip.customers ct on st.customer_dim_key = ct.customer_dim_key '
    'group by ct.country').show()

                                                                                

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
| Canada|           290|          2|
|     US|            10|          1|
+-------+--------------+-----------+



# Customer Angie changed the country from US to FR, new customer Sebastian created

In [15]:
new_customer_dim_df = spark.createDataFrame([('3', 'Sebastian', 'White', 
                    'Rome', 'IT',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Angie', 'Keller',
                    'Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                dim_customer_schema)

new_customer_dim_df = new_customer_dim_df.withColumn("customer_dim_key", random_udf())

new_customer_dim_df.cache()

new_customer_dim_df.show()

                                                                                

+-----------+----------+---------+-----+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name| city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-----+-------+--------------+------------+-------------------+----------+----------------+
|          3| Sebastian|    White| Rome|     IT|    2023-09-15|  2999-12-31|2020-12-09 09:15:32|      true|1694798225942495|
|          2|     Angie|   Keller|Paris|     FR|    2023-09-15|  2999-12-31|2020-12-09 10:15:32|      true|1694798226033799|
+-----------+----------+---------+-----+-------+--------------+------------+-------------------+----------+----------------+



# Customers UPSERT

In [16]:
from pyspark.sql.functions import lit

join_cond = [customer_ice_df.customer_id == new_customer_dim_df.customer_id, customer_ice_df.is_current == True]

## Find customer records to update
customers_to_update_df = (customer_ice_df
                          .join(new_customer_dim_df, join_cond)
                          .select(customer_ice_df.customer_id,
                                  customer_ice_df.first_name,
                                  customer_ice_df.last_name,
                                  customer_ice_df.city,
                                  customer_ice_df.country,
                                  customer_ice_df.eff_start_date,
                                  new_customer_dim_df.eff_start_date.alias("eff_end_date"),
                                  customer_ice_df.customer_dim_key,
                                  customer_ice_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )


## Union with new customer records
merged_customers_df = new_customer_dim_df.unionByName(customers_to_update_df)

In [17]:
merged_customers_df.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,customer_id,first_name,last_name,city,country,eff_start_date,eff_end_date,timestamp,is_current,customer_dim_key
0,3,Sebastian,White,Rome,IT,2023-09-15,2999-12-31,2020-12-09 09:15:32,True,1694798225942495
1,2,Angie,Keller,Paris,FR,2023-09-15,2999-12-31,2020-12-09 10:15:32,True,1694798226033799
2,2,Angie,Keller,Chicago,US,2020-10-14,2023-09-15,2020-12-08 09:15:32,False,1694797722510645


In [18]:
# Convert the merged_customers_df to SQL view
merged_customers_df.createOrReplaceTempView("merged_customers_view")

# Construct the MERGE INTO statement
merge_sql = """
MERGE INTO glue.dip.customers AS target
USING merged_customers_view AS source
ON target.customer_dim_key = source.customer_dim_key
WHEN MATCHED THEN
    UPDATE SET 
        target.first_name = source.first_name,
        target.last_name = source.last_name,
        target.city = source.city,
        target.country = source.country,
        target.eff_start_date = source.eff_start_date,
        target.eff_end_date = source.eff_end_date,
        target.timestamp = source.timestamp,
        target.is_current = source.is_current
WHEN NOT MATCHED THEN
    INSERT *
"""

# Execute the SQL statement
spark.sql(merge_sql)


                                                                                

DataFrame[]

# Read customers Iceberg table:

In [19]:
spark.sql("SELECT * FROM glue.dip.customers").toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,customer_id,first_name,last_name,city,country,eff_start_date,eff_end_date,timestamp,is_current,customer_dim_key
0,1,Morgan,Brown,Toronto,Canada,2020-09-27,2999-12-31,2020-12-08 09:15:32,True,1694797722535090
1,2,Angie,Keller,Chicago,US,2020-10-14,2023-09-15,2020-12-08 09:15:32,False,1694797722510645
2,2,Angie,Keller,Paris,FR,2023-09-15,2999-12-31,2020-12-09 10:15:32,True,1694798226033799
3,3,Sebastian,White,Rome,IT,2023-09-15,2999-12-31,2020-12-09 09:15:32,True,1694798225942495


# Add new sales for Susan

In [21]:
sales_fact_df = spark.createDataFrame([('103', 300, 15.8, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 12:15:42', '%Y-%m-%d %H:%M:%S'), '2'),
                                       ('104', 10, 800.5, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 06:35:32', '%Y-%m-%d %H:%M:%S'), '2')], fact_sales_schema)

sales_fact_df.show()

                                                                                

+-------+--------+-----+-------------------+-----------+
|item_id|quantity|price|          timestamp|customer_id|
+-------+--------+-----+-------------------+-----------+
|    103|     300| 15.8|2023-09-15 12:15:42|          2|
|    104|      10|800.5|2023-09-15 06:35:32|          2|
+-------+--------+-----+-------------------+-----------+



In [22]:
customer_ice_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          1|    Morgan|    Brown|Toronto| Canada|    2020-09-27|  2999-12-31|2020-12-08 09:15:32|      true|1694797722535090|
|          2|     Angie|   Keller|Chicago|     US|    2020-10-14|  2999-12-31|2020-12-08 09:15:32|      true|1694797722510645|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+



# Reload the dataframe with latest data:

In [23]:
customer_ice_df = spark.sql("SELECT * FROM glue.dip.customers")

In [24]:
customer_ice_df.show()

                                                                                

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          2|     Angie|   Keller|Chicago|     US|    2020-10-14|  2023-09-15|2020-12-08 09:15:32|     false|1694797722510645|
|          2|     Angie|   Keller|  Paris|     FR|    2023-09-15|  2999-12-31|2020-12-09 10:15:32|      true|1694798226033799|
|          3| Sebastian|    White|   Rome|     IT|    2023-09-15|  2999-12-31|2020-12-09 09:15:32|      true|1694798225942495|
|          1|    Morgan|    Brown|Toronto| Canada|    2020-09-27|  2999-12-31|2020-12-08 09:15:32|      true|1694797722535090|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+--

# Customer dimension key lookup

In [25]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_ice_df.customer_id, sales_fact_df.timestamp >= customer_ice_df.eff_start_date, sales_fact_df.timestamp <= customer_ice_df.eff_end_date]


customers_dim_key_df = (sales_fact_df
                          .join(customer_ice_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_ice_df.customer_dim_key.isNull(), '-1').otherwise(customer_ice_df.customer_dim_key).alias("customer_dim_key") )
                         )

customers_dim_key_df.show()

                                                                                

+-------+--------+-----+-------------------+-----------+----------------+
|item_id|quantity|price|          timestamp|customer_id|customer_dim_key|
+-------+--------+-----+-------------------+-----------+----------------+
|    103|     300| 15.8|2023-09-15 12:15:42|          2|1694798226033799|
|    104|      10|800.5|2023-09-15 06:35:32|          2|1694798226033799|
+-------+--------+-----+-------------------+-----------+----------------+



# Append the new sales data

In [26]:
customers_dim_key_df.writeTo("glue.dip.sales2").append()

                                                                                

# Read the new sales table

In [27]:
spark.sql("select * from glue.dip.sales2").toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,item_id,quantity,price,timestamp,customer_id,customer_dim_key
0,103,300,15.8,2023-09-15 12:15:42,2,1694798226033799
1,104,10,800.5,2023-09-15 06:35:32,2,1694798226033799
2,111,40,90.5,2020-11-17 09:15:32,1,1694797722535090
3,112,250,80.65,2020-10-28 09:15:32,1,1694797722535090
4,113,10,600.5,2020-12-08 09:15:32,2,1694797722510645


# Get number of sales per country

In [28]:
spark.sql(
    'SELECT ct.country, SUM(st.quantity) as sales_quantity, COUNT(*) as count_sales '
    'FROM glue.dip.sales2 st '
    'INNER JOIN glue.dip.customers ct on st.customer_dim_key = ct.customer_dim_key group by ct.country').show()

[Stage 44:>                                                         (0 + 1) / 1]

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
|     US|            10|          1|
|     FR|           310|          2|
| Canada|           290|          2|
+-------+--------------+-----------+



                                                                                