In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# Initialize Spark session
spark = SparkSession.builder.appName("ETL_FactTable").getOrCreate()

# Load SalesOrderHeader and SalesOrderDetail from local paths in DBFS
sales_order_header_df = spark.read.csv("dbfs:/FileStore/tables/production/salesorderheader.csv", header=True, inferSchema=True) \
    .filter(col("OnlineOrderFlag") == 1) \
    .select(
        col("SalesOrderID"),
        col("SalesOrderNumber"),
        to_date(col("OrderDate")).alias("OrderDate"),
        col("CustomerID"),
        col("TerritoryID"),
        col("TaxAmt"),
        col("Freight")
    )

sales_order_detail_df = spark.read.csv("dbfs:/FileStore/tables/production/salesorderdetails.csv", header=True, inferSchema=True)
product_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/production/Production_product.csv")


In [0]:
# Sort data by SalesOrderID
sales_order_header_sorted = sales_order_header.orderBy("SalesOrderID")
sales_order_detail_sorted = sales_order_detail.orderBy("SalesOrderID")

# Merge Join based on SalesOrderID
merged_data = sales_order_header_sorted.join(
    sales_order_detail_sorted,
    "SalesOrderID",
    "inner"
)


In [0]:
# Load product dimension data
product_dim = spark.read.csv("dbfs:/FileStore/tables/dimension/dim_product_sale", header=True, inferSchema=True)

# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID
merged_data_with_product_sk = merged_data.join(
    product_df.select("productID", "product_sk"),
    merged_data.ProductID == product_df.product_id,
    "left"
).drop(product_df.product_id)

# 9.4 DRC - Replace nulls
merged_data_validated = merged_data_with_product_sk \
    .withColumn("TaxAmt", coalesce(col("TaxAmt"), lit(0))) \
    .withColumn("Freight", coalesce(col("Freight"), lit(0))) \
    .withColumn("product_sk", coalesce(col("product_sk"), lit(-1)))  # Use -1 for missing product_sk

# 9.5 DRC - Add additional fields like extended_sales and extended_cost
merged_data_final = merged_data_validated \
    .withColumn("extended_sales", col("OrderQty") * col("UnitPrice")) \
    .withColumn("extended_cost", col("OrderQty") * col("standard_cost"))


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2715838316518987>:6[0m
[1;32m      2[0m product_dim [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mcsv([38;5;124m"[39m[38;5;124mdbfs:/FileStore/tables/dimension/dim_product_sale[39m[38;5;124m"[39m, header[38;5;241m=[39m[38;5;28;01mTrue[39;00m, inferSchema[38;5;241m=[39m[38;5;28;01mTrue[39;00m)
[1;32m      4[0m [38;5;66;03m# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID[39;00m
[1;32m      5[0m merged_data_with_product_sk [38;5;241m=[39m merged_data[38;5;241m.[39mjoin(
[0;32m----> 6[0m     product_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mproductID[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mproduct_sk[39m[38;5;124m"[39m),
[1;32m      7[0m     merged_data[38;5;241m.[39mProductID [38;5;241m==[39m

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit, to_date, expr

# Initialize Spark session
spark = SparkSession.builder.appName("FactTableETL").getOrCreate()

# Load the data into DataFrames
#product_df = spark.read.option("header", "true").csv("/dbfs/FileStore/tables/dim_product_table.csv", inferSchema=True)
#sales_order_header_df = spark.read.option("header", "true").csv("/dbfs/FileStore/tables/SalesOrderHeader.csv", inferSchema=True)
#sales_order_detail_df = spark.read.option("header", "true").csv("/dbfs/FileStore/tables/SalesOrderDetail.csv", inferSchema=True)

# 9.1 Extract Phase - Filter online sales from SalesOrderHeader
sales_order_header_filtered = sales_order_header_df.filter(col("OnlineOrderFlag") == 1) \
    .select(
        col("SalesOrderID"),
        col("SalesOrderNumber"),
        to_date(col("OrderDate")).alias("OrderDate"),
        col("CustomerID"),
        col("TerritoryID"),
        col("TaxAmt"),
        col("Freight")
    )

# 9.2 Transform Phase - Sort and Merge Join on SalesOrderID
# Sort both dataframes by SalesOrderID
sales_order_header_sorted = sales_order_header_filtered.orderBy("SalesOrderID")
sales_order_detail_sorted = sales_order_detail_df.orderBy("SalesOrderID")

# Merge join on SalesOrderID
merged_data = sales_order_header_sorted.join(
    sales_order_detail_sorted,
    "SalesOrderID",
    "inner"
)

# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID
merged_data_with_product_sk = merged_data.join(
    product_df.select("product_id", "product_sk"),
    merged_data.ProductID == product_df.product_id,
    "left"
).drop(product_df.product_id)

# 9.4 DRC - Replace nulls
merged_data_validated = merged_data_with_product_sk \
    .withColumn("TaxAmt", coalesce(col("TaxAmt"), lit(0))) \
    .withColumn("Freight", coalesce(col("Freight"), lit(0))) \
    .withColumn("product_sk", coalesce(col("product_sk"), lit(-1)))  # Use -1 for missing product_sk

# 9.5 DRC - Add additional fields like extended_sales and extended_cost
merged_data_final = merged_data_validated \
    .withColumn("extended_sales", col("OrderQty") * col("UnitPrice")) \
    .withColumn("extended_cost", col("OrderQty") * col("standard_cost"))

# 9.6 Load Phase - Select columns for the fact table
fact_table = merged_data_final.select(
    col("product_sk"),
    col("SalesOrderID").alias("sales_order_id"),
    col("SalesOrderDetailID").alias("line_number"),
    col("OrderQty").alias("quantity"),
    col("UnitPrice").alias("unit_price"),
    col("standard_cost").alias("unit_cost"),
    col("TaxAmt").alias("tax_amount"),
    col("Freight").alias("freight"),
    col("extended_sales"),
    col("OrderDate").alias("created_at"),
    col("extended_cost")
)

# Writing to a Fact Table - Replace placeholders with appropriate details
fact_table.write.format("jdbc").options(
    url="jdbc:sqlserver://<server>;database=<database>",
    dbtable="FactSalesTable",
    user="<user>",
    password="<password>"
).mode("append").save()

# Check the result
fact_table.show(5)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2715838316518990>:38[0m
[1;32m     30[0m merged_data [38;5;241m=[39m sales_order_header_sorted[38;5;241m.[39mjoin(
[1;32m     31[0m     sales_order_detail_sorted,
[1;32m     32[0m     [38;5;124m"[39m[38;5;124mSalesOrderID[39m[38;5;124m"[39m,
[1;32m     33[0m     [38;5;124m"[39m[38;5;124minner[39m[38;5;124m"[39m
[1;32m     34[0m )
[1;32m     36[0m [38;5;66;03m# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID[39;00m
[1;32m     37[0m merged_data_with_product_sk [38;5;241m=[39m merged_data[38;5;241m.[39mjoin(
[0;32m---> 38[0m     product_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mproduct_id[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mproduct_sk[39m[38;5;124m"[39m),
[1;32m     39[0m     merged_data[38;5;241m

In [0]:
# Merge join on SalesOrderID
merged_data = sales_order_header_sorted.join(
    sales_order_detail_sorted,
    "SalesOrderID",
    "inner"
)

# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID
merged_data_with_product_sk = merged_data.join(
    product_df.select("product_id"),
    merged_data.Product_id == product_df.product_id,
    "left"
).drop(product_df.product_id)

# 9.4 DRC - Replace nulls
merged_data_validated = merged_data_with_product_sk \
    .withColumn("TaxAmt", coalesce(col("TaxAmt"), lit(0))) \
    .withColumn("Freight", coalesce(col("Freight"), lit(0))) \
    .withColumn("product_sk", coalesce(col("product_sk"), lit(-1)))  

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2715838316518993>:10[0m
[1;32m      2[0m merged_data [38;5;241m=[39m sales_order_header_sorted[38;5;241m.[39mjoin(
[1;32m      3[0m     sales_order_detail_sorted,
[1;32m      4[0m     [38;5;124m"[39m[38;5;124mSalesOrderID[39m[38;5;124m"[39m,
[1;32m      5[0m     [38;5;124m"[39m[38;5;124minner[39m[38;5;124m"[39m
[1;32m      6[0m )
[1;32m      8[0m [38;5;66;03m# 9.3 Lookup Transformation - Add product_sk from product_df based on ProductID[39;00m
[1;32m      9[0m merged_data_with_product_sk [38;5;241m=[39m merged_data[38;5;241m.[39mjoin(
[0;32m---> 10[0m     product_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mproduct_id[39m[38;5;124m"[39m),
[1;32m     11[0m     merged_data[38;5;241m.[39mProduct_id [38;5;241m==[39m product_df[38;5;241m.[39

In [0]:

# 9.6 Load Phase - Select columns for the fact table
fact_table = merged_data_final.select(
    col("product_sk"),
    col("SalesOrderID").alias("sales_order_id"),
    col("SalesOrderDetailID").alias("line_number"),
    col("OrderQty").alias("quantity"),
    col("UnitPrice").alias("unit_price"),
    col("standard_cost").alias("unit_cost"),
    col("TaxAmt").alias("tax_amount"),
    col("Freight").alias("freight"),
    col("extended_sales"),
    col("OrderDate").alias("created_at"),
    col("extended_cost")
)


In [0]:
product_dim.show(5)
sales_order_header.show(5)
sales_order_detail.show(5)

+----------+------------+-------------------+-------------+-------------+
|product_id|product_name|product_description|reorder_point|standard_cost|
+----------+------------+-------------------+-------------+-------------+
|         1|        null|               null|          750|          0.0|
|         2|        null|               null|          750|          0.0|
|         3|        null|               null|          600|          0.0|
|         4|        null|               null|          600|          0.0|
|       316|        null|               null|          600|          0.0|
+----------+------------+-------------------+-------------+-------------+
only showing top 5 rows

+------------+----------------+----------+----------+-----------+--------+-------+
|SalesOrderID|SalesOrderNumber| OrderDate|CustomerID|TerritoryID|  TaxAmt|Freight|
+------------+----------------+----------+----------+-----------+--------+-------+
|       43697|         SO43697|2011-05-31|     21768|       