<a href="https://colab.research.google.com/github/ayoraheem0000/Micrsoft-Fabric-Data-Transformed/blob/main/MedallionDataArchitecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Medallion Data Architecture with Google Cola**b**

In [1]:
# ================================
# STEP 1: Install & Start Spark
# ================================
!pip install pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MedallionArchitecture") \
    .getOrCreate()


# ================================
# STEP 2: Bronze Layer (Raw Data)
# ================================
# Define schema headers manually since CSVs have no header row
columns = [
    "OrderNumber",
    "OrderQty",
    "OrderDate",
    "CustomerName",
    "CustomerEmail",
    "ProductName",
    "Quantity",
    "UnitPrice",
    "LineTotal"
]

# Load multiple CSV files into one Bronze DataFrame
bronze_df = spark.read.csv(
    ["/content/sample_data/Data/2019.csv", "/content/sample_data/Data/2020.csv", "/content/sample_data/Data/2021.csv"],
    header=False,
    inferSchema=True
).toDF(*columns)

print("=== Bronze Layer Sample ===")
bronze_df.show(5)
bronze_df.printSchema()

=== Bronze Layer Sample ===
+-----------+--------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|OrderNumber|OrderQty| OrderDate|  CustomerName|       CustomerEmail|         ProductName|Quantity|UnitPrice|LineTotal|
+-----------+--------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|    SO49171|       1|2021-01-01| Mariah Foster|mariah21@adventur...|  Road-250 Black, 48|       1|2181.5625|  174.525|
|    SO49172|       1|2021-01-01|  Brian Howard|brian23@adventure...|    Road-250 Red, 44|       1|  2443.35|  195.468|
|    SO49173|       1|2021-01-01| Linda Alvarez|linda19@adventure...|Mountain-200 Silv...|       1|2071.4196| 165.7136|
|    SO49174|       1|2021-01-01|Gina Hernandez|gina4@adventure-w...|Mountain-200 Silv...|       1|2071.4196| 165.7136|
|    SO49178|       1|2021-01-01|     Beth Ruiz|beth4@adventure-w...|Road-550-W Yellow...|       1|1000.4375|   80.035|
+-----------

In [2]:
# ================================
# STEP 3: Silver Layer (Clean & Transform)
# ================================
from pyspark.sql.functions import col, to_date
from pyspark.sql.functions import split

silver_df = bronze_df.dropna(subset=["OrderNumber", "OrderDate", "CustomerEmail"]) \
    .withColumn("Quantity", col("Quantity").cast("double")) \
    .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
    .withColumn("LineTotal", col("LineTotal").cast("double")) \
    .withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd"))

print("=== Silver Layer Sample ===")

silver_df.show(5)
silver_df.printSchema()

=== Silver Layer Sample ===
+-----------+--------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|OrderNumber|OrderQty| OrderDate|  CustomerName|       CustomerEmail|         ProductName|Quantity|UnitPrice|LineTotal|
+-----------+--------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|    SO49171|       1|2021-01-01| Mariah Foster|mariah21@adventur...|  Road-250 Black, 48|     1.0|2181.5625|  174.525|
|    SO49172|       1|2021-01-01|  Brian Howard|brian23@adventure...|    Road-250 Red, 44|     1.0|  2443.35|  195.468|
|    SO49173|       1|2021-01-01| Linda Alvarez|linda19@adventure...|Mountain-200 Silv...|     1.0|2071.4196| 165.7136|
|    SO49174|       1|2021-01-01|Gina Hernandez|gina4@adventure-w...|Mountain-200 Silv...|     1.0|2071.4196| 165.7136|
|    SO49178|       1|2021-01-01|     Beth Ruiz|beth4@adventure-w...|Road-550-W Yellow...|     1.0|1000.4375|   80.035|
+-----------

In [3]:

# ================================
# STEP 4: Gold Layer (Business-Ready Views)
# ================================
from pyspark.sql.functions import avg

# Gold Orders (fact table)
gold_orders = silver_df.select(
    "OrderNumber", "OrderDate", "CustomerName", "CustomerEmail",
    "ProductName", "Quantity", "UnitPrice", "LineTotal"
)

# Gold Customers (dimension table)
gold_customers = silver_df.select(
    "CustomerName", "CustomerEmail"
).dropDuplicates()

# Gold Products (dimension table)
gold_products = silver_df.groupBy("ProductName") \
    .agg(
        avg("UnitPrice").alias("AvgUnitPrice")
    )

print("=== Gold Orders Sample ===")
gold_orders.show(5)

print("=== Gold Customers Sample ===")
gold_customers.show(5)

print("=== Gold Products Sample ===")
gold_products.show(5)

=== Gold Orders Sample ===
+-----------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|OrderNumber| OrderDate|  CustomerName|       CustomerEmail|         ProductName|Quantity|UnitPrice|LineTotal|
+-----------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|    SO49171|2021-01-01| Mariah Foster|mariah21@adventur...|  Road-250 Black, 48|     1.0|2181.5625|  174.525|
|    SO49172|2021-01-01|  Brian Howard|brian23@adventure...|    Road-250 Red, 44|     1.0|  2443.35|  195.468|
|    SO49173|2021-01-01| Linda Alvarez|linda19@adventure...|Mountain-200 Silv...|     1.0|2071.4196| 165.7136|
|    SO49174|2021-01-01|Gina Hernandez|gina4@adventure-w...|Mountain-200 Silv...|     1.0|2071.4196| 165.7136|
|    SO49178|2021-01-01|     Beth Ruiz|beth4@adventure-w...|Road-550-W Yellow...|     1.0|1000.4375|   80.035|
+-----------+----------+--------------+--------------------+--------------------+----

In [4]:
# ================================
# STEP 5: Save Bronze, Silver, Gold Outputs
# ================================
bronze_df.write.mode("overwrite").parquet("/content/bronze")
silver_df.write.mode("overwrite").parquet("/content/silver")
gold_orders.write.mode("overwrite").parquet("/content/gold_orders")
gold_customers.write.mode("overwrite").parquet("/content/gold_customers")
gold_products.write.mode("overwrite").parquet("/content/gold_products")

print("✅ Medallion pipeline completed and saved in /content/")

✅ Medallion pipeline completed and saved in /content/


In [5]:
from pyspark.sql.functions import split

# Split CustomerName into FirstName and LastName
silver_df = silver_df \
    .withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)) \
    .withColumn("LastName", split(col("CustomerName"), " ").getItem(1))


In [6]:
# Gold Customers (dimension table with split names)
gold_customers = silver_df.select(
    "FirstName", "LastName", "CustomerEmail"
).dropDuplicates()

print("=== Gold Customers with First & Last Name ===")
gold_customers.show(5)


=== Gold Customers with First & Last Name ===
+---------+--------+--------------------+
|FirstName|LastName|       CustomerEmail|
+---------+--------+--------------------+
|    Chloe|  Turner|chloe2@adventure-...|
|   Walter| Navarro|walter3@adventure...|
|   Morgan|    Long|morgan79@adventur...|
|  Natalie|Gonzalez|natalie60@adventu...|
|   Bianca|      Lu|bianca9@adventure...|
+---------+--------+--------------------+
only showing top 5 rows



In [9]:
silver_df.write.mode("overwrite").parquet("/content/silver")
gold_customers.write.mode("overwrite").parquet("/content/gold_customers")

In [13]:
load_df = spark.read.parquet("/content/gold_customers")

print("=== Load Gold Customers Sample ===")
load_df.show(100)

=== Load Gold Customers Sample ===
+-----------+---------+--------------------+
|  FirstName| LastName|       CustomerEmail|
+-----------+---------+--------------------+
|      Chloe|   Turner|chloe2@adventure-...|
|     Walter|  Navarro|walter3@adventure...|
|     Morgan|     Long|morgan79@adventur...|
|    Natalie| Gonzalez|natalie60@adventu...|
|     Bianca|       Lu|bianca9@adventure...|
|      Logan|  Jenkins|logan9@adventure-...|
|   Samantha|   Martin|samantha16@advent...|
|      Julia|  Simmons|julia81@adventure...|
|      Molly|    Lopez|molly15@adventure...|
|     Xavier|   Rogers|xavier87@adventur...|
|    Abigail|    Smith|abigail47@adventu...|
|       Tara|     Yuan|tara7@adventure-w...|
|       Kate|     Yuan|kate5@adventure-w...|
|   Terrance|    Lopez|terrance14@advent...|
|      Colin|      She|colin23@adventure...|
|     Steven|   Morgan|steven22@adventur...|
|  Priscilla|     Raje|priscilla12@adven...|
|       Seth|    Clark|seth18@adventure-...|
|    Tiffany|    Zha

In [18]:
gold_products = spark.read.parquet("/content/gold_products")

print("=== Gold Products Sample ===")
gold_products.show(5)



=== Gold Products Sample ===
+--------------------+------------------+
|         ProductName|      AvgUnitPrice|
+--------------------+------------------+
|Mountain-200 Blac...|2176.4802922680383|
|Touring-1000 Yell...|2384.0700000000043|
|Touring-1000 Blue...| 2384.070000000004|
|Short-Sleeve Clas...|53.989999999999824|
|Women's Mountain ...| 69.98999999999984|
+--------------------+------------------+
only showing top 5 rows

