
#### Step 1: Create Silver Schema and Volume

- 🗂️ Creates the **`silver_ecommerce`** schema within the catalog to **store cleaned and enriched data**.
- 📦 Creates a **volume** inside the Silver schema to **hold intermediate files**, typically transformed from raw Bronze data.


In [0]:
%sql
-- Create the schema if it doesn't exist
CREATE SCHEMA IF NOT EXISTS dataengineer_databricks_ecommerce.silver_ecommerce;

-- Create a volume inside the schema
CREATE VOLUME IF NOT EXISTS dataengineer_databricks_ecommerce.silver_ecommerce.silver_volume;


#### Step 2: Load Data from Bronze Volume with Schema Inference

- 📥 Loads all CSV files from the **Bronze volume** using Spark with header recognition and automatic schema inference.
- 🧾 Uses `display(df)` to **visually inspect the structured DataFrame**, making it easier to understand the data types and contents.


In [0]:
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load("/Volumes/dataengineer_databricks_ecommerce/bronze_ecommerce/bronze_volume")
display(df)

In [0]:
display(df.printSchema())

In [0]:
display(df.limit(10))

#### Step 3: Rename Columns (Optional but Recommended)

- 🧹 Standardizes column names by **removing spaces, special characters, or inconsistent casing**.
- ✅ Improves **readability, consistency, and compatibility** when performing transformations or writing SQL queries.


In [0]:
df=df.withColumnRenamed("Assigned Supervisor","Assigned_Supervisor")

#### Step 4: Handle Nulls and Missing Values

- 🚫 **Filter out rows** where any column contains null values, if completeness is critical.
- ⚠️ **Drop rows** with nulls in critical fields like `Order_Number` or `Order_Date` to maintain data integrity.
- 🛠️ **Fill or flag** non-critical nulls using default values, imputation, or indicators for further review.


In [0]:
from pyspark.sql.functions import col

# Filter rows where any column is null
df_na = df.filter(
    col("Order_Number").isNull() |
    col("State_Code").isNull() |
    col("Customer_Name").isNull() |
    col("Order_Date").isNull() |
    col("Status").isNull() |
    col("Product").isNull() |
    col("Category").isNull() |
    col("Brand").isNull() |
    col("Cost").isNull() |
    col("Sales").isNull() |
    col("Quantity").isNull() |
    col("Total_Cost").isNull() |
    col("Total_Sales").isNull() |
    col("Assigned Supervisor").isNull()
)

df_na.display()


In [0]:
df=df.dropna(subset=["Order_Number","Order_Date"])

In [0]:
from pyspark.sql.functions import col

# Filter rows where any column is null
df_na = df.filter(
    col("Order_Number").isNull() |
    col("State_Code").isNull() |
    col("Customer_Name").isNull() |
    col("Order_Date").isNull() |
    col("Status").isNull() |
    col("Product").isNull() |
    col("Category").isNull() |
    col("Brand").isNull() |
    col("Cost").isNull() |
    col("Sales").isNull() |
    col("Quantity").isNull() |
    col("Total_Cost").isNull() |
    col("Total_Sales").isNull() |
    col("Assigned Supervisor").isNull()
)

# Display the filtered DataFrame containing rows with at least one null value
df_na.display()


In [0]:
# Filter the DataFrame to exclude rows where the 'Assigned_Supervisor' column has the value 'unassigned'
df_unassigned = df.filter(df["Assigned_Supervisor"] != "unassigned")

# Display the filtered DataFrame in the Databricks notebook
display(df_unassigned)


#### Step 5: Convert Data Types (if needed)

- 🔢 Ensure that **numeric fields** (e.g., prices, quantities) are properly typed as integers or floats.
- 🧪 Helps avoid **calculation errors** and improves performance during aggregations and transformations.



In [0]:
from pyspark.sql.functions import col

df = df.withColumn("Cost", col("Cost").cast("double")) \
       .withColumn("Sales", col("Sales").cast("double")) \
       .withColumn("Quantity", col("Quantity").cast("int")) \
       .withColumn("Total_Cost", col("Total_Cost").cast("double")) \
       .withColumn("Total_Sales", col("Total_Sales").cast("double"))


#### Step 6: Add Derived Columns

- ➕ Add **calculated fields** such as `profit` (e.g., revenue - cost) to enrich the dataset with business insights.
- 📅 Include **dynamic columns** like today’s date to track data processing time or for audit purposes.


In [0]:
from pyspark.sql.functions import expr,current_date

df = df.withColumn("Profit", expr("Total_Sales - Total_Cost"))
df_silver = df.withColumn("Load_Date", current_date())



#### Step 7: Save DataFrame as a CSV File to the Volume

- 💾 Save the transformed DataFrame (e.g., `df_silver`) as a **CSV file** to the Silver volume for downstream processing or analysis.
- 📂 This step ensures the **cleaned and enriched data** is persisted in a structured format within Unity Catalog.


In [0]:
df.write.mode("overwrite").option("header", "true").csv(
    "/Volumes/dataengineer_databricks_ecommerce/silver_ecommerce/silver_volume/orders_cleaned"
)
