###Importing the modules

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

###Reading sale data from silver layer

In [0]:
sales_df = (
    spark.read.table("retail_analytics.silver.transactions")
    .filter(upper(col("Transaction_Type")) == "SALE")
)

###Creating date_sk from date column

In [0]:
sales_df = sales_df.withColumn(
    "date_sk",
    date_format(col("Date").cast("date"), "yyyyMMdd").cast("int")
)

###Joining Dimension tables

In [0]:
fact_joined_df = (
    sales_df.alias("s")
    .join(spark.table("retail_analytics.gold.dim_customers").alias("c"),
          col("s.Customer_ID") == col("c.customer_id"), "left")
    .join(spark.table("retail_analytics.gold.dim_products").alias("p"),
          col("s.Product_ID") == col("p.product_id"), "left")
    .join(spark.table("retail_analytics.gold.dim_stores").alias("st"),
          col("s.Store_ID") == col("st.store_id"), "left")
    .join(spark.table("retail_analytics.gold.dim_employees").alias("e"),
          col("s.Employee_ID") == col("e.employee_id"), "left")
)


###Selecting the needed columns

In [0]:
fact_sales_df = (
    fact_joined_df.select(
        col("s.date_sk"),
        col("c.customer_sk"),
        col("p.product_sk"),
        col("st.store_sk"),
        col("e.employee_sk"),
        col("s.Invoice_ID").alias("invoice_id"),
        col("s.Line").alias("line"),
        col("s.Size").alias("size"),
        col("s.Quantity").alias("quantity_sold"),
        col("s.Unit_Price").alias("unit_price"),
        col("s.Discount").alias("discount"),
        col("s.Line_Total").alias("sales_amount"),
        col("s.Currency").alias("currency"),
        col("s.Date").alias("sales_date"),
        current_timestamp().alias("_created_at")
    )
)

###Creating fact_sales with sk key

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS retail_analytics.gold.fact_sales (
    sales_sk BIGINT GENERATED ALWAYS AS IDENTITY,
    date_sk INT,
    customer_sk BIGINT,
    product_sk BIGINT,
    store_sk BIGINT,
    employee_sk BIGINT,
    invoice_id STRING,
    line INT,
    size STRING,
    quantity_sold INT,
    unit_price DOUBLE,
    discount DOUBLE,
    sales_amount DOUBLE,
    currency STRING,
    sales_date TIMESTAMP,
    _created_at TIMESTAMP
)
USING DELTA
""")

DataFrame[]

###Merge process(SCD-1)

In [0]:
fact_sales_tbl = DeltaTable.forName(spark, "retail_analytics.gold.fact_sales")

fact_sales_tbl.alias("tgt").merge(
    fact_sales_df.alias("src"),
    """
    tgt.invoice_id = src.invoice_id AND
    tgt.line = src.line AND
    tgt.sales_date = src.sales_date
    """
).whenNotMatchedInsert(values={
    "date_sk": col("src.date_sk"),
    "customer_sk": col("src.customer_sk"),
    "product_sk": col("src.product_sk"),
    "store_sk": col("src.store_sk"),
    "employee_sk": col("src.employee_sk"),
    "invoice_id": col("src.invoice_id"),
    "line": col("src.line"),
    "size": col("src.size"),
    "quantity_sold": col("src.quantity_sold"),
    "unit_price": col("src.unit_price"),
    "discount": col("src.discount"),
    "sales_amount": col("src.sales_amount"),
    "currency": col("src.currency"),
    "sales_date": col("src.sales_date"),
    "_created_at": col("src._created_at")
}).execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.read.table("retail_analytics.gold.fact_sales").limit(5).display()

sales_sk,date_sk,customer_sk,product_sk,store_sk,employee_sk,invoice_id,line,size,quantity_sold,unit_price,discount,sales_amount,currency,sales_date,_created_at
1,20241127,1184265,9123,33,125,INV-US-005-04342313,1,M,1,66.0,0.6,26.4,USD,2024-11-27T20:28:00.000Z,2026-01-20T12:29:08.791Z
3,20241201,238369,1761,33,61,INV-US-005-04343156,7,38,3,44.5,0.0,133.5,USD,2024-12-01T17:34:00.000Z,2026-01-20T12:29:08.791Z
5,20241202,1009569,3149,33,61,INV-US-005-04343334,1,M,1,33.0,0.0,33.0,USD,2024-12-02T16:33:00.000Z,2026-01-20T12:29:08.791Z
7,20241203,324363,3040,33,137,INV-US-005-04343551,1,S,2,33.0,0.0,66.0,USD,2024-12-03T17:53:00.000Z,2026-01-20T12:29:08.791Z
9,20241206,342334,3281,33,302,INV-US-005-04344018,1,M,1,43.5,0.0,43.5,USD,2024-12-06T17:43:00.000Z,2026-01-20T12:29:08.791Z


In [0]:
spark.read.table("retail_analytics.gold.fact_sales").count()

6077200