###Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import count, sum



### Creating folders 


In [0]:
dbutils.fs.mkdirs("/Volumes/workspace/default/pipeine/raw")
dbutils.fs.mkdirs("/Volumes/workspace/default/pipeine/bronze")
dbutils.fs.mkdirs("/Volumes/workspace/default/pipeine/silver")
dbutils.fs.mkdirs("/Volumes/workspace/default/pipeine/gold")


### Spark Session

In [0]:
spark = SparkSession.builder.appName("RetailOrders").getOrCreate()

customers = spark.read.parquet("/Volumes/workspace/default/pipeine/raw/customer_first.parquet")
orders = spark.read.parquet("/Volumes/workspace/default/pipeine/raw/orders_first.parquet")
regions = spark.read.parquet("/Volumes/workspace/default/pipeine/raw/regions.parquet")
products = spark.read.parquet("/Volumes/workspace/default/pipeine/raw/products_first.parquet")
display(customers)
display(orders)
display(regions)
display(products)



### VERIFYING EVERYTHING FIRST

In [0]:
dbutils.fs.ls("/Volumes/workspace/default/pipeine/raw")


### Base path


In [0]:
base_path = "/Volumes/workspace/default/pipeine"


### LookUp Layer

In [0]:

lookup_customers = customers_first.unionByName(customers_second)
lookup_orders = orders_first.unionByName(orders_second)
lookup_products = products_first.unionByName(products_second)
lookup_regions = regions

lookup_customers.write.mode("overwrite").saveAsTable("lookup_customers")
lookup_orders.write.mode("overwrite").saveAsTable("lookup_orders")
lookup_products.write.mode("overwrite").saveAsTable("lookup_products")
lookup_regions.write.mode("overwrite").saveAsTable("lookup_regions")


### Bronze Layer – Raw Ingestion

In [0]:
customer_1 = spark.read.parquet(f"{base_path}/raw/customer_first.parquet")
customer_2 = spark.read.parquet(f"{base_path}/raw/customers_second.parquet")

orders_1 = spark.read.parquet(f"{base_path}/raw/orders_first.parquet")
orders_2 = spark.read.parquet(f"{base_path}/raw/orders_second.parquet")

products_1 = spark.read.parquet(f"{base_path}/raw/products_first.parquet")
products_2 = spark.read.parquet(f"{base_path}/raw/products_second.parquet")

regions = spark.read.parquet(f"{base_path}/raw/regions.parquet")


### Combine First + Second Files

In [0]:
customers_bronze = customer_1.unionByName(customer_2)
orders_bronze = orders_1.unionByName(orders_2)
products_bronze = products_1.unionByName(products_2)


### Bronze Delta Tables

In [0]:
customers_bronze.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/bronze/customers")

orders_bronze.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/bronze/orders")

products_bronze.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/bronze/products")

regions.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/bronze/regions")


###Silver Layer

In [0]:
customers = spark.read.format("delta").load(f"{base_path}/bronze/customers")
orders = spark.read.format("delta").load(f"{base_path}/bronze/orders")
products = spark.read.format("delta").load(f"{base_path}/bronze/products")
regions = spark.read.format("delta").load(f"{base_path}/bronze/regions")


### Clean data

In [0]:
from pyspark.sql.functions import *

customers_silver = customers.dropDuplicates(["customer_id"]) \
    .filter(col("customer_id").isNotNull())

orders_silver = orders.dropDuplicates(["order_id"]) \
    .withColumn("order_date", to_date("order_date"))

products_silver = products.dropDuplicates(["product_id"])
regions_silver = regions.dropDuplicates(["region_id"])


### Writing silver tables

In [0]:
customers_silver.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/silver/customers")

orders_silver.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/silver/orders")

products_silver.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/silver/products")

regions_silver.write.format("delta").mode("overwrite") \
    .save(f"{base_path}/silver/regions")


### Gold layer

In [0]:
customers = spark.read.format("delta").load(f"{base_path}/silver/customers")
orders = spark.read.format("delta").load(f"{base_path}/silver/orders")
products = spark.read.format("delta").load(f"{base_path}/silver/products")
regions = spark.read.format("delta").load(f"{base_path}/silver/regions")


###Join & Aggregate
Sales by Region

In [0]:
sales_by_region = orders \
    .join(customers, "customer_id") \
    .join(regions, "region_id") \
    .groupBy("region_name") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("order_amount").alias("total_sales")
    )


In [0]:
# Example mapping: customer -> region
customer_region = spark.createDataFrame([
    ("C00710", "R01"),
    ("C00954", "R02"),
    ("C01578", "R03"),
    ("C00962", "R04"),
    # ... for all customers
], ["customer_id", "region_id"])

# Join orders → customers → customer_region → regions
orders_with_region = orders \
    .join(customer_region, "customer_id") \
    .join(regions, "region_id") \
    .groupBy("region") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("total_amount").alias("total_sales")
    )




### Displaying Data

In [0]:
display(orders_with_region)