### Retail Sales Project — Databricks PySpark Implementation


##  Retail Sales Project — Databricks PySpark Implementation

###  Objective

The goal of this project is to analyze **Superstore retail sales data** using **PySpark in Databricks**,  
and build an interactive analytical dashboard that provides **monthly and yearly business insights** to stakeholders.

> Since the dataset represents historical data (e.g., past years), the notebook focuses on **trend analysis** rather than live weekly refresh.

---

###  Business Requirements

Stakeholders want to monitor and analyze the following **key business metrics**:

| Metric ID | Description |
|------------|--------------|
| 1 | Total number of unique customers |
| 2 | Total number of orders |
| 3 | Total sales amount (for the selected period) |
| 4 | Total profit |
| 5 | Top sales by country |
| 6 | Most profitable region and country |
| 7 | Top sales by product category |
| 8 | Top 10 products by sub-category |
| 9 | Most frequently ordered product (by quantity) |
| 10 | Top customer based on total sales per city |

**Note:**  
The Databricks notebook supports **manual refresh and dynamic filtering** by **date range**, **month**, and **year** using widgets.  
Future automation (e.g., weekly/monthly updates) can be enabled if live data is introduced.

---

###  Project Workflow

```plaintext
        ┌──────────────────────┐
        │   One-Pager Req.     │  ← (Business / PM / Stakeholders)
        └─────────┬────────────┘
                  │
        ┌─────────▼────────────┐
        │      Data Team       │  ← (Data Engineers receive requirement)
        └─────────┬────────────┘
                  │
        ┌─────────▼────────────┐
        │  Data Understanding  │  ← (Schema review, profiling, cleaning)
        └─────────┬────────────┘
                  │
        ┌─────────▼────────────┐
        │   Dev / QA / Prod    │  ← (Pipeline deployment in Databricks)
        └──────────────────────┘
````

---

###  Technical Stack

| Layer           | Tool / Technology                        |
| --------------- | ---------------------------------------- |
| Data Processing | **Apache Spark (PySpark)**               |
| Platform        | **Databricks**                           |
| Data Storage    | **Delta Lake / Parquet**                 |
| Visualization   | **Databricks SQL / Power BI / Tableau**  |
| Scheduling      | **Databricks Jobs / Airflow (optional)** |
| Version Control | **GitHub**                               |
| Environment     | **Dev → QA → Prod**                      |

---

###  Data Understanding

**Dataset Schema:**

| Column Name   | Data Type | Description                                |
| ------------- | --------- | ------------------------------------------ |
| ID            | Integer   | Record identifier                          |
| Order_ID      | String    | Unique order identifier                    |
| Order_Date    | String    | Order date (string, to be cast as `date`)  |
| Ship_Date     | Date      | Date the order was shipped                 |
| Ship_Mode     | String    | Shipping method                            |
| Customer_ID   | String    | Unique customer identifier                 |
| Customer_Name | String    | Full name of the customer                  |
| Segment       | String    | Market segment (Consumer, Corporate, etc.) |
| Country       | String    | Country of the order                       |
| City          | String    | City of the order                          |
| State         | String    | State or province                          |
| Postal_Code   | Integer   | Postal code                                |
| Region        | String    | Geographic region                          |
| Product_ID    | String    | Product identifier                         |
| Category      | String    | Product category                           |
| Sub_Category  | String    | Product sub-category                       |
| Product_Name  | String    | Product name                               |
| Sales         | String    | Sales amount (convert to numeric)          |
| Quantity      | String    | Quantity ordered (convert to integer)      |
| Discount      | String    | Discount applied                           |
| Profit        | Double    | Profit from the sale                       |
| User_ID       | Double    | User identifier (if applicable)            |
| State_ID      | Double    | State identifier (if applicable)           |
| Order_S       | String    | Additional order reference or remarks      |

> **Note:** Some numeric fields (e.g., `Sales`, `Quantity`, `Discount`) are currently stored as strings and must be cast to appropriate numeric types during transformation.

---

###  Data Transformation Steps

1. **Data Ingestion**

   * Load CSV or Excel data using PySpark (`spark.read.csv`).
   * Store the raw data in the **Bronze Layer**.

2. **Data Cleaning**

   * Handle missing values, duplicates, and invalid records.
   * Convert data types (e.g., `Order_Date` → `date` using `to_date()`).

3. **Data Transformation**

   * Compute KPIs using `groupBy()` and aggregation functions.
   * Derive new columns such as `year`, `month`, and `quarter` for time-based analysis.

4. **Data Enrichment**

   * Add regional or categorical hierarchies (Country → Region → City).
   * Aggregate sales and profit by category, customer, and geography.

5. **Data Storage**

   * Save curated data into the **Silver Layer** (clean data).
   * Publish analytical results to the **Gold Layer** for dashboards.

---

###  Example PySpark KPI Computations

```python
from pyspark.sql.functions import sum, countDistinct, col

# Total number of customers
total_customers = df.select(countDistinct("Customer_ID")).collect()[0][0]

# Total orders
total_orders = df.select(countDistinct("Order_ID")).collect()[0][0]

# Total sales
total_sales = df.agg(sum(col("Sales").cast("double"))).collect()[0][0]

# Total profit
total_profit = df.agg(sum("Profit")).collect()[0][0]

# Top sales by country
top_country_sales = (
    df.groupBy("Country")
      .agg(sum(col("Sales").cast("double")).alias("Total_Sales"))
      .orderBy(col("Total_Sales").desc())
)
```

---

### Expected Outputs

* Databricks SQL or Power BI dashboard with:

  * Total Customers, Orders, Sales, and Profit
  * Top Performing Countries and Regions
  * Top 10 Products by Category and Sub-Category
  * Top Customers by Sales per City
  * Monthly and Yearly trend visualizations

---

### 🗓️ Temporal Analysis

* **Monthly Analysis:** Track sales, profit, and customer growth per month.
* **Quarterly Analysis:** Identify top-performing quarters.
* **Yearly Summary:** Compare metrics across years (if available).

---

### Automation & Refresh

* This project uses **historical data**, so no real-time weekly refresh is required.
* However, the notebook includes widgets for:

  * 📅 **Date range filtering**
  * 🗓️ **Monthly or yearly selection**
* The notebook can be easily extended for **scheduled jobs** in case future data ingestion pipelines are added.

---


In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("SuperstoreAnalysis").getOrCreate()


In [0]:

# Read CSV file
df = spark.read.csv("/Volumes/workspace/dataset/dataset-store/superstore.csv", 
                    header=True,       # use first row as column names
                    inferSchema=True)  # automatically infer data types



In [0]:
# Display all columns 
# df.columns
df.printSchema()

### Prepare and Clean  Data
- Before answering the questions, make i should have to be sure that date columns and numeric columns are correctly cast.

In [0]:
from pyspark.sql.functions import when, expr, col, trim

df_clean = (
    df.withColumn("Order_Date", 
        when(col("Order_Date").rlike(r"^\d{2}/\d{2}/\d{4}$"), expr("try_to_date(Order_Date, 'dd/MM/yyyy')"))
        .when(col("Order_Date").rlike(r"^\d{4}-\d{2}-\d{2}$"), expr("try_to_date(Order_Date, 'yyyy-MM-dd')"))
        .otherwise(None)
    )
    .withColumn("Ship _Date", 
        when(col("Ship _Date").rlike(r"^\d{2}/\d{2}/\d{4}$"), expr("try_to_date(`Ship _Date`, 'dd/MM/yyyy')"))
        .when(col("Ship _Date").rlike(r"^\d{4}-\d{2}-\d{2}$"), expr("try_to_date(`Ship _Date`, 'yyyy-MM-dd')"))
        .otherwise(None)
    )
    .withColumn("Sales", expr("try_cast(trim(Sales) as double)"))
    .withColumn("Quantity", expr("try_cast(trim(Quantity) as int)"))
    .withColumn("Profit", expr("try_cast(trim(Profit) as double)"))
)


#### Save the cleaned DataFrame as CSV

In [0]:
# Save cleaned DataFrame as multiple CSV files (folder)
df_clean.write.mode("overwrite") \
    .option("header", True) \
    .option("sep", ",") \
    .csv("/Volumes/workspace/dataset/dataset-store/cleaned_superstore")



In [0]:
# Optional: check schema and preview data
df_clean.printSchema()

In [0]:

display(df_clean.limit(5))

In [0]:
"""
— is used in PySpark (and Databricks) to make your DataFrame accessible as a SQL table that can be queried from anywhere within your Spark session (and even across notebooks, if global).
"""
# create a global temporary view
df.createOrReplaceTempView("cleaned_superstore")

In [0]:
%sql
SELECT * FROM cleaned_superstore LIMIT 4;


In [0]:
%sql
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'cleaned_superstore';


In [0]:
%sql
DESCRIBE TABLE cleaned_superstore;


##### CLEANING SUPERSTORE DATASET

In [0]:
from pyspark.sql.functions import col, regexp_replace, avg, when, lit
from pyspark.sql.types import DoubleType, IntegerType

# Numeric columns stored as strings
numeric_columns_str = ["Sales", "Quantity", "Discount"]
numeric_columns_double = ["Profit", "user_id", "state_id"]
numeric_columns_int = ["Postal_Code"]

# Text columns to fill nulls with "Unknown"
text_columns = ["Order_id", "Order_Date", "Ship _Date", "Ship_Mode",
                "Customer_id", "Customer_Name", "Segment", "Country",
                "City", "State", "Region", "Product_ ID", "Category",
                "Sub_Category", "Product_Name", "order_s"]

# Step 1: Convert numeric string columns to double
df_cleaned = df_clean
for c in numeric_columns_str:
    df_cleaned = df_cleaned.withColumn(
        c + "_num",
        regexp_replace(col(c), "[^0-9.]", "").cast(DoubleType())
    )

# Step 2: Fill nulls in numeric columns with their average
for c in numeric_columns_str:
    avg_value = df_cleaned.select(avg(col(c + "_num"))).first()[0]
    df_cleaned = df_cleaned.withColumn(
        c + "_num",
        when(col(c + "_num").isNull(), lit(avg_value)).otherwise(col(c + "_num"))
    )

for c in numeric_columns_double:
    avg_value = df_cleaned.select(avg(col(c))).first()[0]
    df_cleaned = df_cleaned.withColumn(
        c,
        when(col(c).isNull(), lit(avg_value)).otherwise(col(c))
    )

for c in numeric_columns_int:
    avg_value = df_cleaned.select(avg(col(c))).first()[0]
    df_cleaned = df_cleaned.withColumn(
        c,
        when(col(c).isNull(), lit(int(avg_value))).otherwise(col(c))
    )

# Step 3: Fill nulls in text columns with "Unknown"
df_cleaned = df_cleaned.fillna("Unknown", subset=text_columns)

# Step 4: Optional - drop old numeric string columns if you want
# df_cleaned = df_cleaned.drop(*numeric_columns_str)

# Step 5: Show top 5 rows
df_cleaned.limit(5).toPandas()


In [0]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts = df_cleaned.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns
])

# null_counts.show(truncate=False)

null_counts.limit(5).toPandas()


#### Data Cleaning Overview: Identifying Missing Values and “Unknown” Entries in Text Columns

In [0]:
from functools import reduce
from pyspark.sql.functions import col

# Only string/text columns
string_columns = [c for c, t in df_cleaned.dtypes if t == "string"]

# Filter rows where any string column has "Unknown"
df_cleaned.filter(
    reduce(lambda a, b: a | b, [(col(c) == "Unknown") for c in string_columns])
)

# Count the number of such rows
num_unknown_rows = df_cleaned.count()
print(f"Number of rows with 'Unknown' in any string column: {num_unknown_rows}")

In [0]:
# Filter rows where ALL string columns are NOT "Unknown"
valid_rows = df_cleaned.filter(
    ~reduce(lambda a, b: a | b, [(col(c) == "Unknown") for c in string_columns])
)

# Count the number of such rows
num_valid_rows = valid_rows.count()
print(f"Number of rows with no 'Unknown' in any string column: {num_valid_rows}")


In [0]:
from functools import reduce
from pyspark.sql.functions import col

df_cleaned.createOrReplaceTempView("cleaned_superstore")

string_columns = [c for c, t in df_cleaned.dtypes if t == "string"]

unknown_sql_condition = " OR ".join([f"`{c}` = 'Unknown'" for c in string_columns])

sql_query = f"""
SELECT 'Rows WITH Unknown' AS Metric, COUNT(*) AS Count
FROM cleaned_superstore
WHERE {unknown_sql_condition}

UNION ALL

SELECT 'Rows WITHOUT Unknown' AS Metric, COUNT(*) AS Count
FROM cleaned_superstore
WHERE NOT ({unknown_sql_condition})

UNION ALL

SELECT 'Total Rows' AS Metric, COUNT(*) AS Count
FROM cleaned_superstore
"""

display(spark.sql(sql_query))


Databricks visualization. Run in Databricks to view.

In [0]:
summary_df = spark.createDataFrame([
    ("Rows WITH 'Unknown'", count_with_unknown, round(count_with_unknown / total_rows * 100, 2)),
    ("Rows WITHOUT 'Unknown'", count_without_unknown, round(count_without_unknown / total_rows * 100, 2)),
    ("Total Rows", total_rows, 100.0)
], ["Metric", "Count", "Percentage"])

summary_df.show(truncate=False)


In [0]:
# .drop(*columns_to_drop)
df_cleaned.limit(5).toPandas()

In [0]:
from pyspark.sql.functions import col, sum as _sum, when

text_columns = ["Order_id", "Order_Date", "Ship _Date", "Ship_Mode",
                "Customer_id", "Customer_Name", "Segment", "Country",
                "City", "State", "Region", "Product_ ID", "Category",
                "Sub_Category", "Product_Name", "order_s"]

# Count nulls per text column
null_counts_text = df_cleaned.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c + "_nulls")
    for c in text_columns
])

null_counts_text.limit(5).toPandas()


In [0]:
# Step 4: Show cleaned DataFrame
df_cleaned.limit(5).toPandas()

In [0]:
from pyspark.sql.functions import col, sum as _sum, when

# Calculate total nulls per column
null_counts = df_cleaned.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c + "_nulls")
    for c in df_cleaned.columns
])

null_counts.limit(5).toPandas()


In [0]:
df_cleaned.limit(1).toPandas()


In [0]:
from pyspark.sql.functions import col, avg, when, lit

# Compute average of Sales, ignoring nulls
avg_sales = df_cleaned.select(avg(col("Sales"))).first()[0]

# Fill nulls in Sales with the average
df_final_cleaned  = df_cleaned.withColumn(
    "Sales",
    when(col("Sales").isNull(), lit(avg_sales)).otherwise(col("Sales"))
)

# Check that nulls are filled
df_final_cleaned.filter(col("Sales").isNull()).count()


#### Final Cleaned Superstore Dataset: Nulls Filled & CSV Export

In [0]:
# Save as multiple CSV files (folder)
df_final_cleaned.write.mode("overwrite") \
    .option("header", True) \
    .option("sep", ",") \
    .csv("/Volumes/workspace/dataset/dataset-store/cleaned_superstore_final")


In [0]:
# Python cell
df_final_cleaned.createOrReplaceTempView("cleaned_superstore_final_view")


In [0]:
%sql
SELECT * 
FROM cleaned_superstore_final_view
LIMIT 5;


In [0]:
%sql
-- 1️ Minimum and Maximum Order_Date
SELECT 
    MIN(Order_Date) AS min_order_date,
    MAX(Order_Date) AS max_order_date
FROM cleaned_superstore_final_view;

##### Counting the total number of unique customers in the Superstore dataset

In [0]:
%sql
-- 2️ List all distinct years in the dataset
SELECT DISTINCT YEAR(Order_Date) AS order_year
FROM cleaned_superstore_final_view
ORDER BY order_year;


In [0]:

%sql select count(distinct customer_id)  AS total_customers from cleaned_superstore_final_view

Databricks visualization. Run in Databricks to view.

##### Counting the number of unique customers who placed orders in the year 2022

In [0]:

%sql
SELECT COUNT(DISTINCT customer_id) AS total_customers_2017
FROM cleaned_superstore_final_view
WHERE order_date BETWEEN '2022-01-01' AND '2022-12-31';

Databricks visualization. Run in Databricks to view.

## Automate data retrieving 

#### 1. Widgets with Date Range and Exception Handling  

Databricks does not provide a native “date picker” widget.  
However, we can easily create **text widgets** for selecting **start** and **end** dates while enforcing the `YYYY-MM-DD` format.  

In this section, we’ll build a **combined interactive system** that allows you to:  
- **Filter data by a specific start and end date**, **or**  
- **Select a specific year and month** for monthly reporting.  


In [0]:
from datetime import datetime

try:
    # Remove previous widgets safely
    dbutils.widgets.removeAll()

    # ===============================
    # Mode Selector (Choose Filter Type)
    # ===============================
    dbutils.widgets.dropdown(
        name="filter_mode",
        defaultValue="date_range",
        choices=["date_range", "monthly"],
        label="🔍 Select Filter Mode"
    )

    filter_mode = dbutils.widgets.get("filter_mode")

    # ===============================
    #  DATE RANGE FILTER MODE
    # ===============================
    if filter_mode == "date_range":
        dbutils.widgets.text(
            name="start_date",
            defaultValue="2023-01-01",
            label=" Start Date (YYYY-MM-DD)"
        )
        dbutils.widgets.text(
            name="end_date",
            defaultValue="2023-12-31",
            label=" End Date (YYYY-MM-DD)"
        )

        start_date = dbutils.widgets.get("start_date")
        end_date = dbutils.widgets.get("end_date")

        # Validate date inputs
        try:
            start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
            end_date_obj = datetime.strptime(end_date, "%Y-%m-%d")

            if start_date_obj > end_date_obj:
                raise ValueError("🚫 Start Date cannot be after End Date!")

            print(f"✅ Date range selected: {start_date} → {end_date}")

        except ValueError as ve:
            print(f"⚠️ Invalid date input: {ve}")
            raise

    # ===============================
    #  MONTHLY FILTER MODE
    # ===============================
    elif filter_mode == "monthly":
        # Year selector
        dbutils.widgets.dropdown(
            name="year",
            defaultValue="2023",
            choices=["2017", "2018", "2019", "2020", "2021", "2022", "2023"],
            label=" Select Year"
        )

        # Month selector
        dbutils.widgets.dropdown(
            name="month",
            defaultValue="01",
            choices=["01","02","03","04","05","06","07","08","09","10","11","12"],
            label=" Select Month"
        )

        year = dbutils.widgets.get("year")
        month = dbutils.widgets.get("month")

        print(f"✅ Monthly filter selected: {year}-{month}")

except Exception as e:
    print(f"❌ Error creating widgets or reading values: {e}")
    raise


#### 2. Retrieve Widget Values in Python

In [0]:
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")

print(f"Filtering data between {start_date} and {end_date}")


#### 3. Use the Date Range in a SQL Query (Python + Spark SQL)

In [0]:
%sql
-- Metric 1: Total Unique Customers, Sales, and Profit within a date range
SELECT  
    format_number(COUNT(DISTINCT Customer_ID), 0) AS total_customers,
    format_number(SUM(try_cast(Sales AS DOUBLE)), 2) AS total_sales,
    format_number(SUM(try_cast(Profit AS DOUBLE)), 2) AS total_profit
FROM cleaned_superstore_final_view
WHERE Order_Date BETWEEN getArgument('start_date') AND getArgument('end_date');


In [0]:
%sql
-- Metric 1: Total Unique Customers, Sales, and Profit within a date range
SELECT  
    format_number(COUNT(DISTINCT Customer_ID), 0) AS total_customers,
    format_number(SUM(try_cast(Sales AS DOUBLE)), 2) AS total_sales,
    format_number(SUM(try_cast(Profit AS DOUBLE)), 2) AS total_profit
FROM cleaned_superstore_final_view
WHERE CAST(Order_Date AS DATE) 
      BETWEEN CAST(getArgument('start_date') AS DATE) 
      AND CAST(getArgument('end_date') AS DATE);


In [0]:
query = f"""
SELECT 
    COUNT(DISTINCT Customer_ID) AS total_customers,
    SUM(COALESCE(try_cast(Sales AS DOUBLE), 0)) AS total_sales,
    SUM(COALESCE(try_cast(Profit AS DOUBLE), 0)) AS total_profit
FROM cleaned_superstore_final_view
WHERE Order_Date BETWEEN '{start_date}' AND '{end_date}'
"""
display(spark.sql(query))


Databricks visualization. Run in Databricks to view.

In [0]:
query = f"""
SELECT 
    COUNT(DISTINCT Customer_ID) AS total_customers,
    SUM(try_cast(Sales AS DOUBLE)) AS total_sales,
    SUM(try_cast(Profit AS DOUBLE)) AS total_profit
FROM cleaned_superstore_final_view
WHERE Order_Date BETWEEN '{start_date}' AND '{end_date}'
"""
display(spark.sql(query))


#### 2. KPI 1 Customer, Sales, and Profit Overview
- Calculates total unique customers, total sales, and total profit, either by date range or by month, with a dynamic, descriptive title.


In [0]:
# Base query
base_query = """
SELECT  
    format_number(COUNT(DISTINCT Customer_ID), 0) AS total_customers,
    format_number(SUM(try_cast(Sales AS DOUBLE)), 2) AS total_sales,
    format_number(SUM(try_cast(Profit AS DOUBLE)), 2) AS total_profit
FROM cleaned_superstore_final_view
"""

if filter_mode == "date_range":
    query = f"""
    {base_query}
    WHERE Order_Date BETWEEN '{start_date}' AND '{end_date}'
    """
elif filter_mode == "monthly":
    query = f"""
    {base_query}
    WHERE month(Order_Date) = {int(month)} AND year(Order_Date) = {int(year)}
    """

# Execute and display
display(spark.sql(query))


In [0]:
if filter_mode == "date_range":
    title = f" Total Number of Unique Customers, Sales, and Profit from {start_date} to {end_date}"
elif filter_mode == "monthly":
    title = f" Total Number of Unique Customers, Sales, and Profit for {year}-{month}"

print(title)


#### KPI Total Unique Customers within a date range

In [0]:
%sql
-- Metric 1: Total Unique Customers within a date range
SELECT  
    format_number(COUNT(DISTINCT Customer_ID), 0) AS total_customers
FROM cleaned_superstore_final_view
WHERE Order_Date BETWEEN getArgument('start_date') AND getArgument('end_date');


In [0]:
from pyspark.sql.functions import countDistinct

total_customers = df_final_cleaned.select(countDistinct("Customer_id").alias("unique_customers")).first()["unique_customers"]

print("\n Metric 1: Total Unique Customers")
print(f"   {total_customers:,}")


In [0]:
# Load your data (adjust table name)
df = spark.table("cleaned_superstore_final_view")  # 

# Apply dynamic date filtering
df_filtered = df.filter(
    (col("order_date") >= start_date) & 
    (col("order_date") <= end_date)
)

# Show record count after filtering
filtered_count = df_filtered.count()
print(f"\n Total records in selected period: {filtered_count:,}\n")

if filtered_count == 0:
    print(" No data found for the selected period!")
    dbutils.notebook.exit("No data available")


##### METRIC 2: Total Number of Orders

In [0]:
# ===============================
# METRIC 2: Total Number of Orders
# ===============================
print("\n" + "="*70)
print(" METRIC 2: Total Number of Orders")
print("="*70)
total_orders = df_final_cleaned.select(countDistinct("Order_id")).collect()[0][0]
print(f"Total Orders: {total_orders:,}")


In [0]:
%sql
SELECT 
    'Total Orders' AS Metric,
    format_number(COUNT(DISTINCT Order_ID), 0) AS total_orders
FROM cleaned_superstore_final_view;


Databricks visualization. Run in Databricks to view.

In [0]:
# ===============================
# METRIC 2: Total Number of Orders using SQL
# ===============================
print("\n" + "="*70)
print(" METRIC 2: Total Number of Orders")
print("="*70)

# Use Spark SQL to get total orders
total_orders_sql = spark.sql("""
    SELECT COUNT(DISTINCT Order_id) AS total_orders
    FROM cleaned_superstore_final_view
""").collect()[0]["total_orders"]

print(f"Total Orders: {total_orders_sql:,}")


In [0]:
%sql
select * from superstore limit 2;

- Repair the Sales column by trying to extract numeric parts (if any):

In [0]:
%sql
-- Get first 100 Sales values
SELECT Sales
FROM cleaned_superstore_final_view
LIMIT 100;


In [0]:
# Get first 100 sales values
sales_list = [row.Sales for row in df_final_cleaned.select("Sales").limit(100).collect()]
print(sales_list)

In [0]:
from pyspark.sql.functions import sum as _sum

# ===============================
# METRIC 3: Total Sales Amount
# ===============================
print("\n" + "="*70)
print(" METRIC 3: Total Sales Amount")
print("="*70)

total_sales = df_final_cleaned.select(_sum("Sales")).collect()[0][0]

if total_sales is None:
    total_sales = 0
print(f"Total Sales: ${total_sales:,.2f}")


In [0]:
%sql
SELECT 
    'Total Sales' AS Metric,
    FORMAT_NUMBER(SUM(TRY_CAST(Sales AS DOUBLE)), 2) AS total_sales
FROM cleaned_superstore_final_view;


Databricks visualization. Run in Databricks to view.

##### To avoid collect()[0][0] by using .first() and a column alias

In [0]:
# Make sure the DataFrame is accessible as a SQL view
df_final_cleaned.createOrReplaceTempView("cleaned_superstore_final_view")

# SQL query to sum Sales
query = """
SELECT SUM(Sales) AS total_sales
FROM cleaned_superstore_final_view
"""

# Execute the query
total_sales = spark.sql(query).collect()[0]["total_sales"]

# Display with formatting
print(f"Total Sales: ${total_sales:,.2f}")


In [0]:
# ===============================
# METRIC 4: Total Profit
# ===============================
print("\n" + "="*70)
print(" METRIC 4: Total Profit")
print("="*70)
total_profit = df_final_cleaned.select(_sum("profit")).collect()[0][0] # 
if total_profit is None:
    total_profit = 0
profit_margin = (total_profit / total_sales * 100) if total_sales > 0 else 0
print(f"Total Profit: ${total_profit:,.2f}")
print(f"Profit Margin: {profit_margin:.2f}%")

In [0]:
%sql
SELECT 
    FORMAT_NUMBER(SUM(TRY_CAST(Profit AS DOUBLE)), 2) AS total_profit,
    FORMAT_NUMBER(SUM(TRY_CAST(Sales AS DOUBLE)), 2) AS total_sales,
    ROUND(
        CASE WHEN SUM(TRY_CAST(Sales AS DOUBLE)) > 0 
             THEN SUM(TRY_CAST(Profit AS DOUBLE)) / SUM(TRY_CAST(Sales AS DOUBLE)) * 100
             ELSE 0
        END,
        2
    ) AS profit_margin_percentage
FROM cleaned_superstore_final_view;


In [0]:
from pyspark.sql.functions import sum as _sum, col

# Use the existing filter_mode and date/month widgets
filter_mode = dbutils.widgets.get("filter_mode")

if filter_mode == "date_range":
    start_date = dbutils.widgets.get("start_date")
    end_date = dbutils.widgets.get("end_date")
    
    df_filtered_dynamic = df_final_cleaned.filter(
        (col("Order_Date") >= start_date) & (col("Order_Date") <= end_date)
    )

elif filter_mode == "monthly":
    year = dbutils.widgets.get("year")
    month = dbutils.widgets.get("month")
    
    # Filter by month and year
    df_filtered_dynamic = df_final_cleaned.filter(
        (col("Order_Date").substr(1, 4) == year) &
        (col("Order_Date").substr(6, 2) == month)
    )

# ===============================
# Metric 4: Total Profit (dynamic)
# ===============================
total_profit = df_filtered_dynamic.select(_sum("Profit")).collect()[0][0] or 0
total_sales = df_filtered_dynamic.select(_sum("Sales")).collect()[0][0] or 0

profit_margin = (total_profit / total_sales * 100) if total_sales > 0 else 0

print("\n" + "="*70)
print(" METRIC 4: Total Profit (Dynamic)")
print("="*70)
print(f"Total Profit: ${total_profit:,.2f}")
print(f"Profit Margin: {profit_margin:.2f}%")


##### METRIC 5: Top Sales by Country

In [0]:
from pyspark.sql.functions import desc, count

# ===============================
# METRIC 5: Top Sales by Country
# ===============================
print("\n" + "="*70)
print(" METRIC 5: Top Sales by Country")
print("="*70)
sales_by_country = df_filtered.groupBy("country") \
    .agg(
        _sum("sales").alias("total_sales"),
        count("order_id").alias("order_count")
    ) \
    .orderBy(desc("total_sales")) \
    .limit(10)

display(sales_by_country)

In [0]:
filter_mode = dbutils.widgets.get("filter_mode")

if filter_mode == "date_range":
    start_date = dbutils.widgets.get("start_date")
    end_date = dbutils.widgets.get("end_date")

    query = f"""
    SELECT 
        Country,
        FORMAT_NUMBER(SUM(TRY_CAST(Sales AS DOUBLE)), 2) AS total_sales,
        COUNT(Order_ID) AS order_count
    FROM cleaned_superstore_final_view
    WHERE Order_Date BETWEEN '{start_date}' AND '{end_date}'
    GROUP BY Country
    ORDER BY total_sales DESC
    LIMIT 10
    """

elif filter_mode == "monthly":
    year = dbutils.widgets.get("year")
    month = dbutils.widgets.get("month")

    query = f"""
    SELECT 
        Country,
        FORMAT_NUMBER(SUM(TRY_CAST(Sales AS DOUBLE)), 2) AS total_sales,
        COUNT(Order_ID) AS order_count
    FROM cleaned_superstore_final_view
    WHERE YEAR(Order_Date) = {year} AND MONTH(Order_Date) = {month}
    GROUP BY Country
    ORDER BY total_sales DESC
    LIMIT 10
    """

display(spark.sql(query))


##### METRIC 6: Most Profitable Region and Country

In [0]:
# ===============================
# METRIC 6: Most Profitable Region and Country
# ===============================
print("\n" + "="*70)
print(" METRIC 6: Most Profitable Region and Country")
print("="*70)
most_profitable = df_final_cleaned.groupBy("region", "country") \
    .agg(
        _sum("profit").alias("total_profit"),
        _sum("sales").alias("total_sales")
    ) \
    .orderBy(desc("total_profit")) \
    .limit(5)

display(most_profitable)

In [0]:
query = """
SELECT 
    region,
    country,
    SUM(profit) AS total_profit,
    SUM(sales) AS total_sales
FROM cleaned_superstore_final_view
GROUP BY region, country
ORDER BY total_profit DESC
LIMIT 5
"""

display(spark.sql(query))


##### METRIC 7: Top Sales by Product Category

In [0]:
# ===============================
# METRIC 7: Top Sales by Product Category
# ===============================
print("\n" + "="*70)
print(" METRIC 7: Top Sales by Product Category")
print("="*70)
sales_by_category = df_final_cleaned.groupBy("category") \
    .agg(
        _sum("sales").alias("total_sales"),
        _sum("profit").alias("total_profit"),
        count("order_id").alias("order_count")
    ) \
    .orderBy(desc("total_sales"))

display(sales_by_category)


##### METRIC 8: Top 10 Products by Sub-Category

In [0]:
from pyspark.sql.functions import regexp_replace, col, sum as _sum

# ===============================
# METRIC 8: Top 10 Products by Sub-Category
# ===============================

# Remove non-numeric characters (keep digits and decimal point)
df_cleaned = df_final_cleaned.withColumn(
    "Sales_clean",
    regexp_replace(col("Sales"), "[^0-9.]", "")
)

# Convert to float/double
df_cleaned = df_cleaned.withColumn("Sales_double", col("Sales_clean").cast("double"))

# Aggregate top 10 sub-categories
top10 = df_cleaned.groupBy("Sub_Category").agg(_sum("Sales_double").alias("total_sales"))
top10.orderBy(col("total_sales").desc()).show(10)


#### METRIC 9: Most Frequently Ordered Product (by Quantity)

In [0]:
%sql
-- Get first 100 Sales values
SELECT *
FROM cleaned_superstore_final_view
LIMIT 5;

In [0]:

# =============================== 
# METRIC 9: Most Frequently Ordered Product (by Quantity) 
# ===============================
from pyspark.sql.functions import col, regexp_replace, sum as _sum, count, desc, coalesce, lit, when

# Aggregate top products safely
most_ordered_product = df_final_cleaned.groupBy("Product_Name", "Category") \
    .agg(
        _sum("Quantity").alias("total_quantity"),
        count("Order_id").alias("order_count"),
        _sum("Sales").alias("total_sales")
    ) \
    .orderBy(desc("total_quantity")) \
    .limit(10)

display(most_ordered_product)



##### METRIC 10: Top Customer by Total Sales per City


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, sum as _sum, count, desc, col


# ===============================
# METRIC 10: Top Customer by Total Sales per City
# ===============================
print("\n" + "="*70)
print(" METRIC 10: Top Customer by Total Sales per City")
print("="*70)

# Calculate total sales per customer per city
customer_sales_by_city = df_final_cleaned.groupBy("city", "customer_id", "customer_name") \
    .agg(
        _sum("sales").alias("total_sales"),
        count("order_id").alias("order_count")
    )

# Rank customers within each city
window_spec = Window.partitionBy("city").orderBy(desc("total_sales"))
top_customer_per_city = customer_sales_by_city.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") == 1) \
 .drop("rank") \
 .orderBy(desc("total_sales")) \
 .limit(20)

display(top_customer_per_city)


##### FINAL SUMMARY DASHBOARD

In [0]:
# ===============================
#  FINAL SUMMARY DASHBOARD
# ===============================
print("\n" + "="*70)
print("📋 EXECUTIVE SUMMARY")
print("="*70)
print(f"📅 Period: {start_date} to {end_date}")
print(f"🔍 Filter Mode: {filter_mode.upper()}")
print("-"*70)
print(f"👥 Unique Customers:     {total_customers:,}")
print(f"📦 Total Orders:         {total_orders:,}")
print(f"💰 Total Sales:          ${total_sales:,.2f}")
print(f"💵 Total Profit:         ${total_profit:,.2f}")
print(f"📊 Profit Margin:        {profit_margin:.2f}%")
if total_orders > 0:
    print(f"🛒 Avg Order Value:      ${(total_sales/total_orders):,.2f}")
    print(f"👤 Avg Sales/Customer:   ${(total_sales/total_customers):,.2f}")
print("="*70)