**Create dimension table for product.**

In [3]:
# Create dimension table - product
from pyspark.sql.functions import *

df_product = spark.sql(
    """
    SELECT ProductID, Name, ProductSubcategoryID
    FROM silver_production_product
    """
)

df_product.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_product")

# display(df_product.head(10))


StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 5, Finished, Available, Finished)

**Create dimension table for Category**

In [9]:
# Create dimension for Category


df_product_category = spark.sql(
    """
    SELECT ps.ProductsubcategoryID, pc.ProductCategoryID, ps.Name AS Subcategory, pc.Name AS Category
    FROM silver_production_productcategory pc
    INNER JOIN silver_production_productsubcategory ps ON pc.ProductCategoryID = ps.ProductCategoryID
    """
)

df_product_category .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_product_category")

# display(df_product_category)


StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 11, Finished, Available, Finished)

**Create dimension table for vendor**

In [19]:

df_vendor = spark.sql(
    """
    SELECT BusinessEntityID, Name
    FROM silver_purchasing_vendor
    """
)

df_vendor.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_vendor")

# display(df_vendor)

StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 21, Finished, Available, Finished)

**Create dimension table for Purchase Order Header**

In [12]:
df_orderheader = spark.sql(
    """
    SELECT PurchaseOrderID, VendorID, ShipMethodID, ShipDate, OrderDate
    FROM silver_purchasing_purchaseorderheader
    """
)

df_orderheader.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_orderheader")

# display(df_orderheader)

StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dff887c2-e649-4c30-8861-b68f0431783a)

**Dimension table for vendor lead times**

In [14]:
df_vendorleadtimes = spark.sql(
    """
    SELECT ProductID, BusinessEntityID, AverageLeadTime
    FROM silver_purchasing_productvendor
    """
)

df_vendorleadtimes.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_vendorleadtime")

display(df_vendorleadtimes)


StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 23e9bbc8-bdae-4911-9848-5f179503729c)

**Create dimension table for dates**

In [15]:
from pyspark.sql.functions import col, min as spark_min, max as spark_max, least, greatest

# Create a union of all dates from both tables
date_ranges = spark.sql("""
    SELECT ShipDate as date FROM silver_purchasing_purchaseorderheader
    UNION ALL
    SELECT OrderDate as date FROM silver_purchasing_purchaseorderheader
    UNION ALL 
    SELECT DueDate as date FROM silver_purchasing_purchaseorderdetail
""")
#ShipDate, OrderDate
# Get global min and max
result = date_ranges.agg(
    spark_min("date").alias("min_date"),
    spark_max("date").alias("max_date")
).collect()[0]

max_date = result['max_date']
min_date = result['min_date']


StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 17, Finished, Available, Finished)

In [16]:
from pyspark.sql.functions import col, expr, monotonically_increasing_id
from pyspark.sql.types import DateType
from datetime import datetime, timedelta

# Generate a list of dates in Python
date_list = [min_date + timedelta(days=x) for x in range(0, (max_date - min_date).days + 1)]

# Create DataFrame
date_df = spark.createDataFrame([(d,) for d in date_list], ["Date"])

# Add breakdown columns
date_dim = date_df \
    .withColumn("Year", expr("year(Date)")) \
    .withColumn("Month", expr("month(Date)")) \
    .withColumn("Day", expr("day(Date)")) \
    .withColumn("Weekday", expr("date_format(Date, 'E')")) \
    .withColumn("MonthName", expr("date_format(Date, 'MMMM')")) \
    .withColumn("Quarter", expr("quarter(Date)"))

# Save as Delta table
date_dim.write.format("delta").mode("overwrite").saveAsTable("dim_date")

StatementMeta(, f6c2fba1-c44d-455a-91fe-ceb82848f4d8, 18, Finished, Available, Finished)

**Create Fact table**

In [1]:
# Create temp view to flag vendors when the rejection value is 3.5%
from pyspark.sql.functions import *

df_vendor_flags = spark.sql("""
    SELECT 
        poh.VendorID,
        v.Name AS VendorName,
        SUM(pod.RejectedQty * pod.UnitPrice) AS TotalRejectedValue,
        SUM(pod.OrderQty * pod.UnitPrice) AS TotalPurchaseValue,
        CASE 
            WHEN SUM(pod.RejectedQty * pod.UnitPrice) / NULLIF(SUM(pod.OrderQty * pod.UnitPrice), 0) < 0.035 
                THEN 'Reliable (Below 3.5%)'
            ELSE 'Needs Review'
        END AS VendorQualityFlag
    FROM silver_purchasing_purchaseorderdetail pod
    JOIN silver_purchasing_purchaseorderheader poh 
        ON pod.PurchaseOrderID = poh.PurchaseOrderID
    JOIN silver_purchasing_vendor v 
        ON poh.VendorID = v.BusinessEntityID
    GROUP BY poh.VendorID, v.Name
""")

df_vendor_flags.createOrReplaceTempView("vendor_flags_temp")


StatementMeta(, fcfba68a-4227-4350-8f55-0db4814a67c3, 3, Finished, Available, Finished)

In [3]:
df_gold = spark.sql("""
    SELECT 
        pod.PurchaseOrderID,
        pod.PurchaseOrderDetailID,
        poh.OrderDate,
        poh.ShipDate,
        pod.DueDate,
        pod.ProductID,
        poh.VendorID,
        vf.VendorName,
        vf.VendorQualityFlag,
        pod.OrderQty,
        pod.UnitPrice,
        pod.ReceivedQty,
        pod.RejectedQty,

        -- TotalPurchases = OrderQty * UnitPrice
        (pod.OrderQty * pod.UnitPrice) AS TotalPurchases,

        -- RejectedValue = RejectedQty * UnitPrice
        (pod.RejectedQty * pod.UnitPrice) AS RejectedValue,

        -- % Items Rejected = RejectedQty / ReceivedQty
        ROUND(pod.RejectedQty / NULLIF(pod.ReceivedQty, 0), 4) AS PercentItemsRejected,

        -- % Rejected Value = RejectedValue / TotalPurchases
        ROUND(
            (pod.RejectedQty * pod.UnitPrice) / NULLIF((pod.OrderQty * pod.UnitPrice), 0),
            4
        ) AS PercentRejectedValue,

        -- Late/Not Late
        CASE 
            WHEN poh.ShipDate > pod.DueDate THEN 'Late'
            ELSE 'On Time'
        END AS DeliveryStatus,

        -- DeliveryDuration
        DATEDIFF(poh.ShipDate, poh.OrderDate) AS DeliveryDuration

    FROM silver_purchasing_purchaseorderdetail pod
    JOIN silver_purchasing_purchaseorderheader poh
        ON pod.PurchaseOrderID = poh.PurchaseOrderID
    LEFT JOIN vendor_flags_temp vf
        ON poh.VendorID = vf.VendorID
""")

# Save the final fact table
df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_fact_procurement")


StatementMeta(, fcfba68a-4227-4350-8f55-0db4814a67c3, 5, Finished, Available, Finished)