In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import round
from pyspark.sql.types import IntegerType
from datetime import date, timedelta

In [0]:
ClientID = dbutils.secrets.get(scope = "ecom-key_vault_new", key = "ClientID")
appsecret = dbutils.secrets.get(scope = "ecom-key_vault_new", key = "appsecret")
tenentId = dbutils.secrets.get(scope = "ecom-key_vault_new", key = "tenentId")

In [0]:
spark.conf.set("fs.azure.account.auth.type.datalakeecomproject.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.datalakeecomproject.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.datalakeecomproject.dfs.core.windows.net", ClientID)
spark.conf.set("fs.azure.account.oauth2.client.secret.datalakeecomproject.dfs.core.windows.net", appsecret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.datalakeecomproject.dfs.core.windows.net", "https://login.microsoftonline.com/"+tenentId+"/oauth2/token")


In [0]:
customers_bronze = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@datalakeecomproject.dfs.core.windows.net/customers.csv')


products_bronze = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@datalakeecomproject.dfs.core.windows.net/products.csv')

orders_bronze = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@datalakeecomproject.dfs.core.windows.net/orders.csv')


order_items_bronze = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@datalakeecomproject.dfs.core.windows.net/order_items.csv')

In [0]:
customers_silver = (
    customers_bronze
    .dropDuplicates(["customer_id"])
    .withColumn("signup_date", F.to_date("signup_date"))
)


In [0]:
products_silver = (
    products_bronze
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .dropDuplicates(["product_id"])
)


In [0]:
orders_silver = (
    orders_bronze
    .withColumn("order_date", F.to_date("order_date"))
    .dropDuplicates(["order_id"])
)


In [0]:
order_items_silver = (
    order_items_bronze
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("discount", F.col("discount").cast("double"))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("total_price", round(F.col("quantity") * F.col("unit_price") - F.col("discount"),2))
)
order_items_silver.display()


order_item_id,order_id,product_id,quantity,unit_price,discount,total_price
1,1001,101,1,15.99,0.0,15.99
2,1001,104,2,8.5,0.5,16.5
3,1002,102,1,45.5,0.0,45.5
4,1002,103,1,29.9,2.0,27.9
5,1003,105,3,5.2,0.0,15.6
6,1004,101,2,15.99,1.0,30.98
7,1004,103,1,29.9,0.0,29.9
8,1005,104,3,8.5,0.0,25.5
9,1005,105,2,5.2,0.2,10.2


Create Data Modeling Fact and Dimensions table in gold layer

In [0]:
#dim_customer
dim_customer = customers_silver.select(
    F.col("customer_id").alias("customer_key"),
    "first_name",
    "last_name",
    "email",
    "city",
    "country",
    "signup_date"
)


In [0]:
#dim_product
dim_product = products_silver.select(
    F.col("product_id").alias("product_key"),
    "product_name",
    "category",
    "unit_price"
)


In [0]:
#dim_date
start_date = date(2023, 1, 1)
end_date = date(2025, 12, 31)

date_list = [(start_date + timedelta(days=i)) for i in range((end_date-start_date).days)]

df = spark.createDataFrame([(d,) for d in date_list], ["date"])

dim_date = (
    df
    .withColumn("date_key", F.date_format("date", "yyyyMMdd").cast(IntegerType()))
    .withColumn("day", F.dayofmonth("date"))
    .withColumn("month", F.month("date"))
    .withColumn("year", F.year("date"))
    .withColumn("weekday", F.date_format("date", "E"))
)


In [0]:
#dim_order_status
dim_order_status = (
    orders_silver.select("order_status")
    .distinct()
    .withColumn("order_status_key", F.monotonically_increasing_id())
)


In [0]:
#fact_order

fact_order = (
    order_items_silver.alias("oi")
    .join(orders_silver.alias("o"), "order_id")
    .join(customers_silver.alias("c"), "customer_id")
    .join(products_silver.alias("p"), "product_id")
    .join(dim_date.alias("d"), F.to_date("o.order_date") == F.col("d.date"))
    .select(
        F.col("oi.order_item_id").alias("order_item_key"),
        "oi.order_id",
        F.col("d.date_key").alias("order_date_key"),
        F.col("c.customer_id").alias("customer_key"),
        F.col("p.product_id").alias("product_key"),
        "oi.quantity",
        "oi.unit_price",
        "oi.discount",
        "oi.total_price",
        "o.order_status"
    )
)


In [0]:
fact_order.display()

order_item_key,order_id,order_date_key,customer_key,product_key,quantity,unit_price,discount,total_price,order_status
1,1001,20240320,1,101,1,15.99,0.0,15.99,Completed
2,1001,20240320,1,104,2,8.5,0.5,16.5,Completed
5,1003,20240321,3,105,3,5.2,0.0,15.6,Cancelled
3,1002,20240321,2,102,1,45.5,0.0,45.5,Completed
4,1002,20240321,2,103,1,29.9,2.0,27.9,Completed
6,1004,20240322,4,101,2,15.99,1.0,30.98,Completed
7,1004,20240322,4,103,1,29.9,0.0,29.9,Completed
8,1005,20240325,5,104,3,8.5,0.0,25.5,Completed
9,1005,20240325,5,105,2,5.2,0.2,10.2,Completed


In [0]:
dim_customer.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://gold@datalakeecomproject.dfs.core.windows.net/dim_customer")\
            .save()

In [0]:
dim_product.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://gold@datalakeecomproject.dfs.core.windows.net/dim_product")\
            .save()

In [0]:
dim_date.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://gold@datalakeecomproject.dfs.core.windows.net/dim_date")\
            .save()

In [0]:
fact_order.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://gold@datalakeecomproject.dfs.core.windows.net/fact_order")\
            .save()