In [None]:
%%sql

CREATE SCHEMA BRONZE;

In [None]:
%%sql

CREATE SCHEMA SILVER;

In [10]:
from pyspark.sql import functions as F

customers_bronze_df = spark.table("BRONZE.customers")
orders_bronze_df = spark.table("BRONZE.orders")
products_bronze_df = spark.table("BRONZE.products")


# Step 1: Clean and Standardize Customers Data
# Remove duplicates based on CustomerID and standardize phone numbers and emails
customers_silver_df = (
    customers_bronze_df
    .dropDuplicates(["CustomerID"])
    .filter(F.col("Email").contains("@"))  # Basic email validation
    .withColumn("PhoneNumber", F.regexp_replace("PhoneNumber", r"[^\d]", ""))  # Remove non-numeric characters
)

# Step 2: Clean and Standardize Orders Data
# Remove any records with null OrderID, CustomerID, ProductID, and ensure valid quantity and total amount
orders_silver_df = (
    orders_bronze_df
    .dropDuplicates(["OrderID"])
    .filter(F.col("CustomerID").isNotNull() & F.col("ProductID").isNotNull())
    .filter((F.col("Quantity") > 0) & (F.col("TotalAmount") > 0))  # Ensure valid values
    .withColumn("OrderDate", F.to_date("OrderDate"))  # Standardize date format
)

# Step 3: Clean and Standardize Products Data
# Remove duplicates based on ProductID and ensure positive stock and price
products_silver_df = (
    products_bronze_df
    .dropDuplicates(["ProductID"])
    .filter(F.col("Price") > 0)  # Ensure positive price
    .filter(F.col("Stock") >= 0)  # Stock should not be negative
)

# Define the schema name
schema_name = "SILVER"  # Replace with your actual schema name

# Save DataFrames as tables within the schema
customers_silver_df.write.mode("overwrite").saveAsTable(f"{schema_name}.customers")
orders_silver_df.write.mode("overwrite").saveAsTable(f"{schema_name}.orders")
products_silver_df.write.mode("overwrite").saveAsTable(f"{schema_name}.products")


StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 12, Finished, Available, Finished)

In [13]:
customers_bronze_df.show(3)
customers_silver_df.show(3)

StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 15, Finished, Available, Finished)

+----------+--------------+--------------------+-----------------+--------------------+---------+
|CustomerID|  CustomerName|               Email|      PhoneNumber|             Address|  Country|
+----------+--------------+--------------------+-----------------+--------------------+---------+
|        50|Joseph Coleman|  carl51@example.net| 001-696-854-5725|284 Ortiz Drive A...| Ethiopia|
|        95|Joseph Coleman|crystal52@example...|     931-370-2960|30802 Rebecca Str...|  Germany|
|       117|Joseph Coleman|robersonmaria@exa...|233.388.5398x2613|30802 Rebecca Str...|Nicaragua|
+----------+--------------+--------------------+-----------------+--------------------+---------+
only showing top 3 rows

+----------+--------------+--------------------+----------------+--------------------+---------+
|CustomerID|  CustomerName|               Email|     PhoneNumber|             Address|  Country|
+----------+--------------+--------------------+----------------+--------------------+---------

In [14]:
%%sql
CREATE SCHEMA GOLD;

StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 16, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [17]:
print(f"customers_silver_df: {customers_silver_df.describe()}")
print(f"orders_silver_df: {orders_silver_df.describe()}")
print(f"products_silver_df: {products_silver_df.describe()}")

StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 19, Finished, Available, Finished)

customers_silver_df: DataFrame[summary: string, CustomerID: string, CustomerName: string, Email: string, PhoneNumber: string, Address: string, Country: string]
orders_silver_df: DataFrame[summary: string, OrderID: string, CustomerID: string, ProductID: string, Quantity: string, TotalAmount: string]
products_silver_df: DataFrame[summary: string, ProductID: string, ProductName: string, Category: string, Price: string, Stock: string]


In [28]:
%%sql
CREATE OR REPLACE TABLE GOLD.orders_enriched AS
SELECT o.OrderID,
       o.CustomerID,
       o.ProductID,
       o.Quantity,
       o.TotalAmount,
       o.OrderDate,
       c.CustomerName,
       c.Email,
       c.Country,
       p.ProductName,
       p.Category
FROM SILVER.orders o
LEFT JOIN SILVER.customers c ON o.CustomerID = c.CustomerID
LEFT JOIN SILVER.products p ON o.ProductID = p.ProductID;


StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 30, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [29]:
%%sql
CREATE OR REPLACE  TABLE GOLD.customer_summary AS
SELECT CustomerID,
       CustomerName,
       Email,
       Country,
       COUNT(OrderID) AS TotalOrders,
       SUM(Quantity) AS TotalQuantityPurchased,
       SUM(TotalAmount) AS TotalAmountSpent
FROM GOLD.orders_enriched
GROUP BY CustomerID, CustomerName, Email, Country;

StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 31, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [30]:
%%sql
CREATE OR REPLACE  TABLE GOLD.product_summary AS
SELECT ProductID,
       ProductName,
       Category,
       SUM(Quantity) AS TotalQuantitySold,
       SUM(TotalAmount) AS TotalRevenue
FROM GOLD.orders_enriched
GROUP BY ProductID, ProductName, Category;

StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 32, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [31]:
%%sql
CREATE OR REPLACE  TABLE GOLD.daily_sales_summary AS
SELECT OrderDate,
       COUNT(OrderID) AS TotalOrders,
       SUM(Quantity) AS TotalQuantitySold,
       SUM(TotalAmount) AS TotalRevenue
FROM GOLD.orders_enriched
GROUP BY OrderDate;


StatementMeta(, 185771ef-4b9f-46f3-af70-7245b610258e, 33, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>