Loading silver data to silver data frame

In [0]:
df_silver  = spark.table('data_engineering_project.silver_orders')
df_silver.display()

In [0]:
df_silver.printSchema()

Creating Gold dim customer schema and table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS data_engineering_project.dim_customer(
  customer_key BIGINT,
  customer_id STRING,
  customer_name STRING, 
  segment STRING, 
  country STRING,
  city STRING,
  state STRING,
  postal_code INTEGER,
  region STRING,
  start_date DATE,
  end_date DATE,
  is_current INT
)

USING DELTA;

Reading current data from gold table dim_customer

In [0]:
df_target = spark.table('data_engineering_project.dim_customer')
df_target.display()

Getting MAX customer key from gold table dim_customer

In [0]:
from pyspark.sql.functions import max
max_key = df_target.agg(max('customer_key')).collect()[0][0]

if max_key is None:
    max_key = 0

Selecting and preparing source customer data from silver layer and ensuring they are distinct

In [0]:
from pyspark.sql.functions import col
df_source = df_silver.select(col("customer_id"), col("customer_name"), col("segment"), col("country"), col("city"), col("state"), col("postal_code"), col("region")).distinct()

df_source.display()
 

- Identifying current valid records already present in gold layer and renaming with tgt to identify after join 

In [0]:
df_current = df_target.filter(col("is_current")== 1)\
                      .select(col("customer_key").alias("tgt_customer_key"),
                              col("customer_id").alias("tgt_customer_id"), 
                              col("customer_name").alias("tgt_customer_name"), 
                              col("segment").alias("tgt_segment"), 
                              col("country").alias("tgt_country"), 
                              col("city").alias("tgt_city"), 
                              col("state").alias("tgt_state"), 
                              col("postal_code").alias("tgt_postal_code"), 
                              col("region").alias("tgt_region"),
                              col("start_date").alias("tgt_start_date")
                              )
df_current.display()

Identifying  records in the gold dim_customer to update or insert

In [0]:
from pyspark.sql.functions import col
df_changes =  df_source.join(df_current,
                             on = col('customer_id') == col('tgt_customer_id'), how =  'left')\
                       .where(col('tgt_customer_id').isNull()|
                        (col('customer_name')!= col('tgt_customer_name'))|
                        (col('segment')!= col('tgt_segment')) |
                        (col('country')!= col('tgt_country'))|
                        (col('city')!= col('tgt_city'))|
                        (col('state')!= col('tgt_state'))|
                        (col('postal_code')!= col('tgt_postal_code'))|
                        (col('region')!= col('tgt_region')))

In [0]:
df_changes.display()

Expiring old records

In [0]:
from pyspark.sql.functions import col,date_sub,lit,current_date
df_expire = df_changes.filter(col("tgt_customer_id").isNotNull())\
    .select(
    col('tgt_customer_key').alias('customer_key'),
    col('tgt_customer_id').alias('customer_id'),
    col('tgt_customer_name').alias('customer_name'),
    col('tgt_segment').alias('segment'),
    col('tgt_country').alias('country'),
    col('tgt_city').alias('city'),
    col('tgt_state').alias('state'),
    col('tgt_postal_code').alias('postal_code'),
    col('tgt_region').alias('region'),
    col('tgt_start_date').alias("start_date"),
    date_sub(current_date(),1).alias('end_date'),
    lit(0).alias("is_current")
     ) 

In [0]:
display(df_expire)

Generating surrogate Key for new records

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,lit,current_date

window_spec = Window.orderBy(col('customer_id'))

df_new = df_changes.select("customer_id",
                          "customer_name",
                           "segment",
                           "country",
                           "city",
                           "state",
                           "postal_code",
                           "region").distinct()\
                           .withColumn("customer_key", row_number().over(window_spec)+lit(max_key))\
                            .select("customer_key",
                           "customer_id",
                           "customer_name",
                           "segment",
                           "country",
                           "city",
                           "state",
                           "postal_code",
                           "region",
                           current_date().alias("start_date"),
                           lit("9999-12-31").cast("date").alias("end_date"),
                           lit(1).alias("is_current")
                           )



In [0]:
df_new.display()

Expiring old records in delta table

In [0]:
from delta.tables import DeltaTable

delta_tbl = DeltaTable.forName(
    spark,
    "data_engineering_project.dim_customer"
)
delta_tbl.alias("tgt")\
.merge( df_expire.alias("src"),"tgt.customer_key == src.customer_key AND tgt.is_current == 1")\
.whenMatchedUpdate(set={
    "end_date" : "src.end_date",
    "is_current" : "src.is_current"
}).execute()                         


INSERT ONLY

In [0]:
delta_tbl.alias("tgt").merge(
    df_new.alias("src"),
    "tgt.customer_id = src.customer_id AND tgt.is_current = 1"
).whenNotMatchedInsert(values={
    "customer_key": "src.customer_key",
    "customer_id": "src.customer_id",
    "customer_name": "src.customer_name",
    "segment": "src.segment",
    "country": "src.country",
    "city": "src.city",
    "state": "src.state",
    "postal_code": "src.postal_code",
    "region": "src.region",
    "start_date": "src.start_date",
    "end_date": "src.end_date",
    "is_current": "src.is_current"
}).execute()


In [0]:
%sql
SELECT *
FROM data_engineering_project.dim_customer