In [0]:
%sql
SELECT * FROM Datax.Customers;
SELECT * FROM Datax.Products;
SELECT * FROM Datax.Orders;
SELECT * FROM Datax.Order_Items;


In [0]:
customers_df = spark.read.csv("dbfs:/Workspace/Users/akpatra12345@gmail.com/customers.csv", header=True, inferSchema=True)
products_df = spark.read.csv("dbfs:/Workspace/Users/akpatra12345@gmail.com/products.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("dbfs:/Workspace/Users/akpatra12345@gmail.com/orders.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("dbfs:/Workspace/Users/akpatra12345@gmail.com/order_items.csv", header=True, inferSchema=True)


In [0]:
customers_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/customers.csv", header=True, inferSchema=True)
products_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/products.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/orders.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/order_items.csv", header=True, inferSchema=True)


In [0]:
dbutils.fs.ls("dbfs:/Volumes/workspace/datax/my_catalog")


In [0]:
# Read CSVs into DataFrames
customers_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/customers.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/order_items.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/orders.csv", header=True, inferSchema=True)
products_df = spark.read.csv("dbfs:/Volumes/workspace/datax/my_catalog/products.csv", header=True, inferSchema=True)

# Quick check: show few rows
customers_df.show(5)
orders_df.show(5)


In [0]:
%sql
SELECT * FROM workspace.datax.customers;


In [0]:
customers_df.write.mode("overwrite").saveAsTable("workspace.datax.customers_csv")
orders_df.write.mode("overwrite").saveAsTable("workspace.datax.orders_csv")
order_items_df.write.mode("overwrite").saveAsTable("workspace.datax.order_items_csv")
products_df.write.mode("overwrite").saveAsTable("workspace.datax.products_csv")


In [0]:
%sql
SHOW TABLES IN workspace.datax;


In [0]:
%sql
SELECT * FROM workspace.datax.customers_csv LIMIT 10;
SELECT * FROM workspace.datax.orders_csv LIMIT 10;
SELECT * FROM workspace.datax.order_items_csv LIMIT 10;
SELECT * FROM workspace.datax.products_csv LIMIT 10;


In [0]:
silver_customers = (
    spark.table("workspace.datax.customers_csv")
    .dropDuplicates(["customer_id"])
)

silver_customers.write.mode("overwrite").saveAsTable("workspace.datax.customers_silver")


In [0]:
%sql
-- List all tables in your schema
SHOW TABLES IN workspace.datax;

-- Preview data
SELECT * FROM workspace.datax.customers_csv LIMIT 10;
SELECT * FROM workspace.datax.orders_csv LIMIT 10;
SELECT * FROM workspace.datax.order_items_csv LIMIT 10;
SELECT * FROM workspace.datax.products_csv LIMIT 10;


#🥈 Created Silver Layer

In [0]:
# Customers Silver
customers_silver = (
    spark.table("workspace.datax.customers_csv")
    .dropDuplicates(["customer_id"])       # remove duplicates
    .na.drop(subset=["customer_id"])       # remove nulls in key
)
customers_silver.write.mode("overwrite").saveAsTable("workspace.datax.customers_silver")

# Orders Silver
orders_silver = (
    spark.table("workspace.datax.orders_csv")
    .dropDuplicates(["order_id"])
    .na.drop(subset=["order_id"])
)
orders_silver.write.mode("overwrite").saveAsTable("workspace.datax.orders_silver")

# Order Items Silver
order_items_silver = (
    spark.table("workspace.datax.order_items_csv")
    .dropDuplicates(["order_item_id"])
    .na.drop(subset=["order_item_id"])
)
order_items_silver.write.mode("overwrite").saveAsTable("workspace.datax.order_items_silver")

# Products Silver
products_silver = (
    spark.table("workspace.datax.products_csv")
    .dropDuplicates(["product_id"])
    .na.drop(subset=["product_id"])
)
products_silver.write.mode("overwrite").saveAsTable("workspace.datax.products_silver")


In [0]:
%sql
--Describe table details
DESCRIBE EXTENDED workspace.datax.customers_silver;


In [0]:
from pyspark.sql import functions as F

# Read Bronze (already saved as tables)
customers_bronze = spark.table("workspace.datax.customers_csv")
orders_bronze = spark.table("workspace.datax.orders_csv")
products_bronze = spark.table("workspace.datax.products_csv")
order_items_bronze = spark.table("workspace.datax.order_items_csv")

# Example cleaning (you can add more rules)
customers_silver = customers_bronze.dropDuplicates(["email"]).filter("email IS NOT NULL")
orders_silver = orders_bronze.filter("status IS NOT NULL")
products_silver = products_bronze.dropDuplicates(["product_id"])
order_items_silver = order_items_bronze.filter("quantity > 0")

# Save as Silver Tables
customers_silver.write.mode("overwrite").saveAsTable("workspace.datax.customers_silver")
orders_silver.write.mode("overwrite").saveAsTable("workspace.datax.orders_silver")
products_silver.write.mode("overwrite").saveAsTable("workspace.datax.products_silver")
order_items_silver.write.mode("overwrite").saveAsTable("workspace.datax.order_items_silver")


#🥇 Gold Layer

In [0]:
##Total Sales by Product

from pyspark.sql import functions as F

# Example: total sales by product
gold_product_sales = (
    spark.table("workspace.datax.order_items_silver").alias("oi")
    .join(spark.table("workspace.datax.products_silver").alias("p"),
          F.col("oi.product_id") == F.col("p.product_id"))
    .groupBy("p.product_name")
    .agg(F.sum(F.col("oi.quantity") * F.col("p.price")).alias("total_sales"))
)

gold_product_sales.show()



In [0]:
##2️⃣ Customer Revenue

customers = spark.table("workspace.datax.customers_silver").alias("c")
orders = spark.table("workspace.datax.orders_silver").alias("o")
order_items = spark.table("workspace.datax.order_items_silver").alias("oi")
products = spark.table("workspace.datax.products_silver").alias("p")

gold_customer_revenue = (
    customers
    .join(orders, F.col("c.customer_id") == F.col("o.customer_id"))
    .join(order_items, F.col("o.order_id") == F.col("oi.order_id"))
    .join(products, F.col("oi.product_id") == F.col("p.product_id"))
    .groupBy("c.first_name", "c.last_name")
    .agg(F.sum(F.col("oi.quantity") * F.col("p.price")).alias("total_revenue"))
)

gold_customer_revenue.write.mode("overwrite").saveAsTable("workspace.datax.gold_customer_revenue")

gold_customer_revenue.show()


In [0]:
##Top Cities by Revenue

gold_city_revenue = (
    customers
    .join(orders, F.col("c.customer_id") == F.col("o.customer_id"))
    .join(order_items, F.col("o.order_id") == F.col("oi.order_id"))
    .join(products, F.col("oi.product_id") == F.col("p.product_id"))
    .groupBy("c.city")
    .agg(F.sum(F.col("oi.quantity") * F.col("p.price")).alias("total_revenue"))
    .orderBy(F.desc("total_revenue"))
)

gold_city_revenue.write.mode("overwrite").saveAsTable("workspace.datax.gold_city_revenue")

gold_city_revenue.show()