<a href="https://colab.research.google.com/github/Jwanisha27/jwanisha-/blob/main/Medallian_Architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

BRONZE LAYER IN MEDALLIAN ARCHITECTURE:

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, current_timestamp
from pyspark.sql.functions import col, year, month,dayofmonth, current_timestamp, expr

In [None]:
spark = SparkSession.builder \
    .appName("SuperstoreBronzeLayer") \
    .getOrCreate()

In [None]:
raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/train.csv")


In [None]:
raw_df.columns

['Row ID',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Customer Name',
 'Segment',
 'Country',
 'City',
 'State',
 'Postal Code',
 'Region',
 'Product ID',
 'Category',
 'Sub-Category',
 'Product Name',
 'Sales']

In [None]:
raw_df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)



In [None]:
raw_df.select("Order Date").show(5)

+----------+
|Order Date|
+----------+
|08/11/2017|
|08/11/2017|
|12/06/2017|
|11/10/2016|
|11/10/2016|
+----------+
only showing top 5 rows


In [None]:
from pyspark.sql.functions import (
    col, year, month,dayofmonth, current_timestamp,
    to_date, when
)

In [None]:
bronze_df = raw_df \
    .withColumn(
        "order_date_parsed",
        when(
            col("Order Date").rlike("^[0-9]{2}/[0-9]{2}/[0-9]{4}$"),
            when(
                col("Order Date").substr(1, 2).cast("int") <= 12,
                to_date(col("Order Date"), "MM/dd/yyyy")
            ).otherwise(
                to_date(col("Order Date"), "dd/MM/yyyy")
            )
        )
    ) \
    .withColumn("ingestion_time", current_timestamp()) \
    .withColumn("year", year(col("order_date_parsed"))) \
    .withColumn("month", month(col("order_date_parsed"))) \
    .withColumn("day", dayofmonth(col("order_date_parsed")))

In [None]:
bronze_df.select(
    "Order Date",
    "order_date_parsed",
    "year",
    "month",
    "day"
).show(10, truncate=False)

+----------+-----------------+----+-----+---+
|Order Date|order_date_parsed|year|month|day|
+----------+-----------------+----+-----+---+
|08/11/2017|2017-08-11       |2017|8    |11 |
|08/11/2017|2017-08-11       |2017|8    |11 |
|12/06/2017|2017-12-06       |2017|12   |6  |
|11/10/2016|2016-11-10       |2016|11   |10 |
|11/10/2016|2016-11-10       |2016|11   |10 |
|09/06/2015|2015-09-06       |2015|9    |6  |
|09/06/2015|2015-09-06       |2015|9    |6  |
|09/06/2015|2015-09-06       |2015|9    |6  |
|09/06/2015|2015-09-06       |2015|9    |6  |
|09/06/2015|2015-09-06       |2015|9    |6  |
+----------+-----------------+----+-----+---+
only showing top 10 rows


In [None]:
bronze_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .parquet("/content/bronze_superstore_sales")
bronze_df.show()

+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+-----------------+--------------------+----+-----+---+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|order_date_parsed|      ingestion_time|year|month|day|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+-----------------+--------------------+----+-----+---+
|     1|CA-2017-152156|08/11/2017|11/11/2017|  Second Class|   CG-12520|       Claire Gute|   Consumer|Uni

In [None]:
new_data = [
    ("CA-2022-100001", "12/05/2022", "California", 500),
    ("CA-2022-100002", "15/12/2022", "Texas", 750),
    ("CA-2022-100003", "10/12/2022", "Texas", 750)
]

new_columns = ["Order ID", "Order Date", "State", "Sales"]

new_raw_df = spark.createDataFrame(new_data, new_columns)
new_raw_df.show()

+--------------+----------+----------+-----+
|      Order ID|Order Date|     State|Sales|
+--------------+----------+----------+-----+
|CA-2022-100001|12/05/2022|California|  500|
|CA-2022-100002|15/12/2022|     Texas|  750|
|CA-2022-100003|10/12/2022|     Texas|  750|
+--------------+----------+----------+-----+



In [None]:
new_bronze_df = new_raw_df \
    .withColumn("Sales", col("Sales").cast("string")) \
    .withColumn(
        "order_date_parsed",
        when(
            col("Order Date").substr(1, 2).cast("int") <= 12,
            to_date(col("Order Date"), "MM/dd/yyyy")
        ).otherwise(
            to_date(col("Order Date"), "dd/MM/yyyy")
        )
    ) \
    .withColumn("ingestion_time", current_timestamp()) \
    .withColumn("year", year(col("order_date_parsed"))) \
    .withColumn("month", month(col("order_date_parsed"))) \
    .withColumn("day", dayofmonth(col("order_date_parsed")))

In [None]:
!rm -rf /content/bronze_superstore_sales

bronze_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .parquet("/content/bronze_superstore_sales")

new_bronze_df.write \
    .mode("append") \
    .partitionBy("year", "month", "day") \
    .parquet("/content/bronze_superstore_sales")

In [None]:
df=spark.read.parquet("/content/bronze_superstore_sales/year=2017/month=10/day=1")
df.show()

+------+--------------+----------+----------+--------------+-----------+-------------+---------+-------------+--------+----------+-----------+------+---------------+----------+------------+--------------------+-----+-----------------+--------------------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|Customer Name|  Segment|      Country|    City|     State|Postal Code|Region|     Product ID|  Category|Sub-Category|        Product Name|Sales|order_date_parsed|      ingestion_time|
+------+--------------+----------+----------+--------------+-----------+-------------+---------+-------------+--------+----------+-----------+------+---------------+----------+------------+--------------------+-----+-----------------+--------------------+
|  4667|CA-2017-116547|10/01/2017|17/01/2017|Standard Class|   KB-16585|    Ken Black|Corporate|United States| Seattle|Washington|      98115|  West|FUR-FU-10000076| Furniture| Furnishings|24-Hour Round Wal...|79.92|       2017-10-0

In [None]:
from pyspark.sql.functions import col

df = spark.read.parquet("/content/bronze_superstore_sales") \
    .filter(
        (col("year") == 2017) &
        (col("month") == 1) &
        (col("day") == 10)
    )

df.show()

+------+--------------+----------+----------+--------------+-----------+---------------+--------+-------------+-------------+----------+-----------+-------+---------------+---------------+------------+--------------------+-------+-----------------+--------------------+----+-----+---+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|  Customer Name| Segment|      Country|         City|     State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|  Sales|order_date_parsed|      ingestion_time|year|month|day|
+------+--------------+----------+----------+--------------+-----------+---------------+--------+-------------+-------------+----------+-----------+-------+---------------+---------------+------------+--------------------+-------+-----------------+--------------------+----+-----+---+
|  1068|CA-2017-157686|01/10/2017|02/10/2017|   First Class|   BD-11620|Brian DeCherney|Consumer|United States|San Francisco|California|      941

SILVER LAYER IN MEDALLIAN ARCHITECTURE:

In [None]:
bronze_df = spark.read.parquet("/content/bronze_superstore_sales")

In [None]:
from pyspark.sql.functions import col, expr

silver_df = bronze_df \
    .filter(col("order_date_parsed").isNotNull()) \
    .dropDuplicates(["Order ID"]) \
    .withColumn("Sales", expr("try_cast(Sales as double)")) \
    .select(
        "Order ID",
        "order_date_parsed",
        "State",
        "Category",
        "Sub-Category",
        "Sales",
        "ingestion_time",
        "year",
        "month",
        "day"
    )

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS superstore_silver")

DataFrame[]

In [None]:
silver_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .saveAsTable("superstore_silver.sales_cleaned")

In [None]:
spark.sql("SELECT * FROM superstore_silver.sales_cleaned LIMIT 5").show()

+--------------+-----------------+------------+---------------+------------+------+--------------------+----+-----+---+
|      Order ID|order_date_parsed|       State|       Category|Sub-Category| Sales|      ingestion_time|year|month|day|
+--------------+-----------------+------------+---------------+------------+------+--------------------+----+-----+---+
|CA-2017-110366|       2017-05-09|Pennsylvania|      Furniture| Furnishings|  82.8|2026-02-03 05:09:...|2017|    5|  9|
|CA-2017-115756|       2017-05-09|    Michigan|      Furniture| Furnishings| 12.22|2026-02-03 05:09:...|2017|    5|  9|
|CA-2017-133319|       2017-05-09|    New York|Office Supplies|       Paper|192.16|2026-02-03 05:09:...|2017|    5|  9|
|CA-2017-136231|       2017-05-09|   Tennessee|Office Supplies|   Envelopes|23.472|2026-02-03 05:09:...|2017|    5|  9|
|CA-2017-136595|       2017-05-09|       Texas|      Furniture| Furnishings|21.204|2026-02-03 05:09:...|2017|    5|  9|
+--------------+-----------------+------

In [None]:
spark.sql("""
SELECT *
FROM superstore_silver.sales_cleaned
WHERE year=2017 AND month=1 AND day=10
""").show()

+--------------+-----------------+----------+---------------+------------+-------+--------------------+----+-----+---+
|      Order ID|order_date_parsed|     State|       Category|Sub-Category|  Sales|      ingestion_time|year|month|day|
+--------------+-----------------+----------+---------------+------------+-------+--------------------+----+-----+---+
|CA-2017-111794|       2017-01-10|     Texas|     Technology| Accessories| 79.512|2026-02-03 05:09:...|2017|    1| 10|
|CA-2017-116918|       2017-01-10|   Florida|Office Supplies|     Binders|  5.388|2026-02-03 05:09:...|2017|    1| 10|
|CA-2017-139157|       2017-01-10|  New York|      Furniture|      Tables|330.588|2026-02-03 05:09:...|2017|    1| 10|
|CA-2017-155992|       2017-01-10|   Indiana|     Technology|      Phones|   69.9|2026-02-03 05:09:...|2017|    1| 10|
|CA-2017-157686|       2017-01-10|California|      Furniture|      Chairs|194.848|2026-02-03 05:09:...|2017|    1| 10|
+--------------+-----------------+----------+---

In [None]:
bronze_df = spark.read.parquet("/content/bronze_superstore_sales")

In [None]:
last_time = spark.sql("""
SELECT max(ingestion_time)
FROM superstore_silver.sales_cleaned
""").collect()[0][0]
print("Last processed time:", last_time)

Last processed time: 2026-02-03 05:09:53.224247


In [None]:
incremental_df = bronze_df.filter(
    col("ingestion_time") > last_time
)

In [None]:
silver_new = incremental_df \
    .filter(col("order_date_parsed").isNotNull()) \
    .dropDuplicates(["Order ID"]) \
    .withColumn("Sales", col("Sales").cast("double")) \
    .select(
        "Order ID",
        "order_date_parsed",
        "State",
        "Category",
        "Sub-Category",
        "Sales",
        "year",
        "month",
        "day",
        "ingestion_time"
    )

In [None]:
silver_new.write \
    .mode("append") \
    .partitionBy("year", "month", "day") \
    .saveAsTable("superstore_silver.sales_cleaned")

GOLD LAYER IN MEDALLIAN ARCHITECTURE:

In [None]:
from pyspark.sql.functions import sum, avg, count, col

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS superstore_gold")

DataFrame[]

In [None]:
monthly_sales_df = silver_df \
    .groupBy("year", "month") \
    .agg(
        sum("Sales").alias("total_sales"),
        count("Order ID").alias("total_orders"),
        avg("Sales").alias("avg_sales")
    ) \
    .orderBy("year", "month")
monthly_sales_df.show(5)

+----+-----+------------------+------------+------------------+
|year|month|       total_sales|total_orders|         avg_sales|
+----+-----+------------------+------------+------------------+
|2015|    1|14183.755999999998|          55| 262.6621481481481|
|2015|    2| 6525.511999999999|          46|148.30709090909087|
|2015|    3|19169.484999999997|          77| 255.5931333333333|
|2015|    4|         10434.738|          60|          173.9123|
|2015|    5|15208.611999999997|          75|205.52178378378375|
+----+-----+------------------+------------+------------------+
only showing top 5 rows


In [None]:
monthly_sales_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.monthly_sales")

In [None]:
state_sales_df = silver_df \
    .groupBy("State") \
    .agg(sum("Sales").alias("state_sales")) \
    .orderBy(col("state_sales").desc())
state_sales_df.show(5)

state_sales_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.state_sales")

+------------+------------------+
|       State|       state_sales|
+------------+------------------+
|  California|206275.40849999987|
|    New York|145312.66700000002|
|       Texas|        78480.8442|
|Pennsylvania| 54253.78499999998|
|  Washington|52869.380000000005|
+------------+------------------+
only showing top 5 rows


In [None]:
category_sales_df = silver_df \
    .groupBy("Category") \
    .agg(sum("Sales").alias("category_sales")) \
    .orderBy(col("category_sales").desc())
category_sales_df.show(5)

category_sales_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.category_sales")

+---------------+------------------+
|       Category|    category_sales|
+---------------+------------------+
|     Technology|378159.83200000046|
|      Furniture| 360793.3967000012|
|Office Supplies| 332117.6720000004|
|           NULL|            2000.0|
+---------------+------------------+



In [None]:
top_products_df = silver_df \
    .groupBy("Sub-Category") \
    .agg(sum("Sales").alias("sales")) \
    .orderBy(col("sales").desc()) \
    .limit(10)
top_products_df.show(5)

top_products_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.top_products")

+------------+------------------+
|Sub-Category|             sales|
+------------+------------------+
|      Chairs| 162876.8319999999|
|      Phones|161950.18999999992|
|     Binders|101384.09200000003|
|      Tables|100529.32800000004|
|     Storage| 99929.26799999984|
+------------+------------------+
only showing top 5 rows


In [None]:
region_data = [
    ("California", "West"),
    ("Texas", "Central"),
    ("New York", "East")
]

region_df = spark.createDataFrame(region_data, ["State", "Region"])
region_df.show(3)


+----------+-------+
|     State| Region|
+----------+-------+
|California|   West|
|     Texas|Central|
|  New York|   East|
+----------+-------+



In [None]:
region_sales_df = silver_df \
    .join(region_df, on="State", how="left") \
    .groupBy("Region") \
    .agg(sum("Sales").alias("region_sales")) \
    .orderBy(col("region_sales").desc())
region_sales_df.show(5)

region_sales_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.region_sales")


+-------+------------------+
| Region|      region_sales|
+-------+------------------+
|   NULL| 643001.9809999997|
|   West|206275.40849999987|
|   East|145312.66700000002|
|Central|        78480.8442|
+-------+------------------+



In [None]:
pivot_df = silver_df \
    .groupBy("year", "month") \
    .pivot("Category") \
    .sum("Sales")

pivot_df.write.mode("overwrite") \
    .saveAsTable("superstore_gold.category_pivot")
pivot_df.show()

+----+-----+-----+------------------+------------------+------------------+
|year|month| null|         Furniture|   Office Supplies|        Technology|
+----+-----+-----+------------------+------------------+------------------+
|2022|   10|750.0|              NULL|              NULL|              NULL|
|2015|    2| NULL|            321.92|          2309.024|          3894.568|
|2017|    3| NULL| 8798.804999999998|11022.254000000003| 4444.896000000001|
|2017|    8| NULL| 8241.460000000001| 4775.705000000001| 8794.127999999999|
|2017|   10| NULL| 8955.258999999998|          4628.226|          8074.069|
|2018|   10| NULL|          8480.077|10951.885999999993|16983.357000000004|
|2016|    7| NULL|4040.7690000000002| 3375.747000000001| 5107.920000000001|
|2015|   12| NULL|15064.658000000001| 7875.075999999998|10852.232000000002|
|2016|   11| NULL|         10520.581| 7784.954999999998| 5278.836999999999|
|2018|    1| NULL|          3647.885| 9817.210000000001| 9708.940000000002|
|2018|    3|

In [None]:
spark.sql("""
SELECT *
FROM superstore_gold.top_products
""").show()


+------------+------------------+
|Sub-Category|             sales|
+------------+------------------+
|      Chairs| 162876.8319999999|
|      Phones|161950.18999999992|
|     Binders|101384.09200000003|
|      Tables|100529.32800000004|
|     Storage| 99929.26799999984|
|    Machines|         75844.422|
| Accessories| 70236.19000000003|
|     Copiers|          70129.03|
|   Bookcases|54547.458699999996|
|  Appliances| 52438.48600000005|
+------------+------------------+



In [None]:
spark.sql("""
SELECT year, SUM(total_sales) AS yearly_sales
FROM superstore_gold.monthly_sales
GROUP BY year
ORDER BY year
""").show()

+----+------------------+
|year|      yearly_sales|
+----+------------------+
|2015|204533.68599999996|
|2016|212474.82299999997|
|2017|       305938.1918|
|2018|348124.19989999995|
|2022|            2000.0|
+----+------------------+

