### Importing the Needed Modules

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

import sys
sys.path.append('/Workspace/Users/mohammedthoufiq9360@gmail.com/Retail-And-Ecommerce-Analytics-Platform')

from src.paths import SILVER_TRANSACTIONS_PATH, DIM_CUSTOMERS_PATH, DIM_PRODUCTS_PATH, DIM_EMPLOYEES_PATH, DIM_STORES_PATH, DIM_DATES_PATH, DIM_DISCOUNTS_PATH, FACT_SALES_PATH
from src.schema_definitions import FACT_SALES_SCHEMA
from delta.tables import DeltaTable

### Querying and Filtering only the sales data from the Silver Transactions Table

In [0]:
sales_df = (
    spark.read.table(SILVER_TRANSACTIONS_PATH)
    .filter(col("transaction_type") == "Sale")
)

### Fact_sales Schema Reference

In [0]:
FACT_SALES_SCHEMA

{'sales_sk': 'long',
 'date_sk': 'integer',
 'customer_sk': 'long',
 'product_sk': 'long',
 'store_sk': 'long',
 'employee_sk': 'long',
 'discount_sk': 'long',
 'invoice_id': 'string',
 'line': 'integer',
 'size': 'string',
 'unit_price': 'double',
 'quantity': 'integer',
 'line_total': 'double',
 'invoice_total': 'double',
 'currency': 'string',
 'payment_method': 'string',
 'sale_date': 'timestamp',
 '_created_at': 'timestamp'}

### creating date_sk from date column

In [0]:
sales_df = sales_df.withColumn(
    "date_sk",
    date_format(col("date").cast("date"), "yyyyMMdd").cast("int")
)
sales_df.limit(5).display()

invoice_id,line,customer_id,product_id,size,color,unit_price,quantity,date,discount,line_total,store_id,employee_id,currency,currency_symbol,sku,transaction_type,payment_method,invoice_total,ingestion_ts,_source_file,date_sk
INV-US-005-04340828,2,294688,13275,XXL,UnKnown,34.0,1,2024-11-16T18:48:00.000Z,0.0,34.0,5,57,USD,$,MASH13275-XXL-,Sale,Credit Card,97.5,2026-01-14T05:54:30.121Z,dbfs:/Volumes/retail_analytics/raw/kaggle/global_fashion/transactions.csv,20241116
INV-US-005-04341072,1,282769,11963,M,TURQUOISE,69.5,1,2024-11-18T09:01:00.000Z,0.0,69.5,5,56,USD,$,FEDR11963-M-TURQUOISE,Sale,Credit Card,69.5,2026-01-14T05:54:30.121Z,dbfs:/Volumes/retail_analytics/raw/kaggle/global_fashion/transactions.csv,20241118
INV-US-005-04341521,1,298325,13002,M,BEIGE,56.5,1,2024-11-22T10:30:00.000Z,0.0,56.5,5,58,USD,$,FESW13002-M-BEIGE,Sale,Credit Card,56.5,2026-01-14T05:54:30.121Z,dbfs:/Volumes/retail_analytics/raw/kaggle/global_fashion/transactions.csv,20241122
INV-US-005-04342277,1,293812,13482,L,LILAC,64.0,1,2024-11-27T18:11:00.000Z,0.6,25.6,5,55,USD,$,MAT-13482-L-LILAC,Sale,Credit Card,25.6,2026-01-14T05:54:30.121Z,dbfs:/Volumes/retail_analytics/raw/kaggle/global_fashion/transactions.csv,20241127
INV-US-005-04343357,3,291867,13410,UnKnown,UnKnown,11.0,1,2024-12-02T17:06:00.000Z,0.0,11.0,5,54,USD,$,MAAC13410--,Sale,Credit Card,237.0,2026-01-14T05:54:30.121Z,dbfs:/Volumes/retail_analytics/raw/kaggle/global_fashion/transactions.csv,20241202


### Joining Dimension for Fact Sales creation

In [0]:
fact_joined_df = (
    sales_df.alias("sa")
    .join(
        spark.table(DIM_CUSTOMERS_PATH).alias("c"),
        col("sa.customer_id") == col("c.customer_id"),
        "left"
    )
    .join(
        spark.table(DIM_PRODUCTS_PATH).alias("p"),
        col("sa.product_id") == col("p.product_id"),
        "left"
    )
    .join(
        spark.table(DIM_STORES_PATH).alias("st"),
        col("sa.store_id") == col("st.store_id"),
        "left"
    )
    .join(
        spark.table(DIM_EMPLOYEES_PATH).alias("e"),
        col("sa.employee_id") == col("e.employee_id"),
        "left"
    )
    .join(
        spark.table(DIM_DISCOUNTS_PATH).alias("d"),
        (
            (col("sa.date").cast("date") >= col("d.discount_start_date")) &
            (col("sa.date").cast("date") <= col("d.discount_end_date")) &
            (col("p.category") == col("d.category")) &
            (col("p.sub_category") == col("d.sub_category")) &
            (col("sa.discount") >= 0)
        ),
        "left"
    )
)


### Selecting the Needed columns for fact_sales

In [0]:
fact_sales_df = (
    fact_joined_df
    .select(
        col("sa.date_sk"),
        col("c.customer_sk"),
        col("p.product_sk"),
        col("st.store_sk"),
        col("e.employee_sk"),
        col("d.discount_sk"),
        col("sa.invoice_id"),
        col("sa.line"),
        col("sa.size"),
        col("sa.unit_price"),
        col("sa.quantity"),
        col("sa.line_total"),
        col("sa.invoice_total"),
        col("sa.currency"),
        col("sa.payment_method"),
        col("sa.date").alias("sale_date"),
        current_timestamp().alias("_created_at")
    )
)


### Creating Fact_sales Table with surrogate key

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {FACT_SALES_PATH} (
    sales_sk Long GENERATED ALWAYS AS IDENTITY,
    date_sk INTEGER,
    customer_sk LONG,
    product_sk LONG,
    store_sk LONG,
    employee_sk LONG,
    discount_sk LONG,
    invoice_id STRING,
    line INTEGER,
    size STRING,
    unit_price DOUBLE,
    quantity INTEGER,
    line_total DOUBLE,
    invoice_total DOUBLE,
    currency STRING,
    payment_method STRING,
    sale_date TIMESTAMP,
    _created_at TIMESTAMP
)
USING DELTA
""")

DataFrame[]

### Updating the Fact_sales Table

In [0]:
fact_sales_tbl = DeltaTable.forName(spark, FACT_SALES_PATH)

fact_sales_tbl.alias("tgt").merge(
    fact_sales_df.alias("src"),
    "tgt.invoice_id = src.invoice_id AND tgt.line = src.line"
).whenNotMatchedInsert(values={
    "date_sk": col("src.date_sk"),
    "customer_sk": col("src.customer_sk"),
    "product_sk": col("src.product_sk"),
    "store_sk": col("src.store_sk"),
    "employee_sk": col("src.employee_sk"),
    "discount_sk": col("src.discount_sk"),
    "invoice_id": col("src.invoice_id"),
    "line": col("src.line"),
    "size": col("src.size"),
    "unit_price": col("src.unit_price"),
    "quantity": col("src.quantity"),
    "line_total": col("src.line_total"),
    "invoice_total": col("src.invoice_total"),
    "currency": col("src.currency"),
    "payment_method": col("src.payment_method"),
    "sale_date": col("src.sale_date"),
    "_created_at": col("src._created_at")    
}).execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.read.table(FACT_SALES_PATH).limit(5).display()

sales_sk,date_sk,customer_sk,product_sk,store_sk,employee_sk,discount_sk,invoice_id,line,size,unit_price,quantity,line_total,invoice_total,currency,payment_method,sale_date,_created_at
1,20241001,239186,8558,6,289,166,INV-CN-006-03045061,1,M,319.0,3,765.6,765.6,CNY,Cash,2024-10-01T10:03:00.000Z,2026-01-18T10:58:48.820Z
3,20241003,23664,1987,6,88,168,INV-CN-006-03047155,4,UnKnown,78.5,1,62.8,1186.0,CNY,Credit Card,2024-10-03T08:53:00.000Z,2026-01-18T10:58:48.820Z
5,20241003,1040906,14926,6,393,166,INV-CN-006-03048056,1,M,498.0,1,398.4,398.4,CNY,Credit Card,2024-10-03T12:25:00.000Z,2026-01-18T10:58:48.820Z
7,20241009,1074085,920,6,201,168,INV-CN-006-03050967,1,UnKnown,192.5,1,154.0,608.4,CNY,Credit Card,2024-10-09T08:54:00.000Z,2026-01-18T10:58:48.820Z
9,20240505,963986,6092,7,35,154,INV-CN-007-02226887,1,P,173.0,3,389.25,389.25,CNY,Cash,2024-05-05T20:28:00.000Z,2026-01-18T10:58:48.820Z


In [0]:
spark.read.table(FACT_SALES_PATH).count()

6077200