## **_Creating Schema And Loading Data_**

In [0]:
from pyspark.sql.types import *
import warnings
warnings.filterwarnings('ignore')

dim_customers_schema = StructType([
    StructField("customer_key",IntegerType()),
    StructField("customer_id",IntegerType()),
    StructField("customer_number",StringType()),
    StructField("first_name",StringType()),
    StructField("last_name",StringType()),
    StructField("country",StringType()),
    StructField("marital_status",StringType()),
    StructField("gender",StringType()),
    StructField("birthdate",DateType()),
    StructField("create_date",DateType()),
])

product_schema = StructType([
    StructField("product_key", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("product_number", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True),
    StructField("maintenance", StringType(), True),
    StructField("cost", IntegerType(), True),
    StructField("product_line", StringType(), True),
    StructField("start_date", DateType(), True)
])

sales_Schema = StructType([
    StructField("order_number", StringType()),
    StructField("product_key", IntegerType()),
    StructField("customer_key", IntegerType()),
    StructField("order_date", DateType()),
    StructField("shipping_date", DateType()),
    StructField("due_date", DateType()),
    StructField("sales_amount", IntegerType()),
    StructField("quantity", IntegerType()),
    StructField("price", IntegerType())

])

customers_df = spark.read \
    .format("csv") \
    .option("header",True) \
    .schema(dim_customers_schema) \
    .load("/Volumes/workspace/default/sql_data/gold.dim_customers.csv")

product_df = spark.read \
    .format("csv") \
    .option("header",True) \
    .schema(product_schema) \
    .load("/Volumes/workspace/default/sql_data/gold.dim_products.csv")

sales_df = spark.read \
    .format("csv") \
    .option("header",True) \
    .schema(sales_Schema) \
    .load("/Volumes/workspace/default/sql_data/gold.fact_sales.csv")

## _**Magnitude Analysis**_

### **_Purpose :_**
- **_To quantify data and group results by specific dimensions._**
- **_For understanding data distribution across categories._**

### **_SQL Functions Used :_**
- **_Aggregate Functions: SUM(), COUNT(), AVG()_**
- **_GROUP BY, ORDER BY_**

In [0]:
# Find total customers by countries
from pyspark.sql.functions import *

customers_df.groupBy("country") \
    .agg(count("customer_id").alias("total_customer")) \
    .orderBy(col("total_customer").desc()) \
    .show()

+--------------+--------------+
|       country|total_customer|
+--------------+--------------+
| United States|          7482|
|     Australia|          3591|
|United Kingdom|          1913|
|        France|          1810|
|       Germany|          1780|
|        Canada|          1571|
|           n/a|           337|
+--------------+--------------+



In [0]:
#Find total customers by gender
customers_df.groupBy("gender") \
    .agg(count("customer_id").alias("total_customer")) \
    .orderBy(col("total_customer").desc()) \
    .show()

+------+--------------+
|gender|total_customer|
+------+--------------+
|  Male|          9341|
|Female|          9128|
|   n/a|            15|
+------+--------------+



In [0]:
# What is the total revenue generated for each category?

sales_df.join(product_df, sales_df.product_key==product_df.product_key,"left") \
    .groupBy("category") \
    .agg(sum("sales_amount").alias("total_revenue")) \
    .orderBy(col("total_revenue").desc()) \
    .show()

+-----------+-------------+
|   category|total_revenue|
+-----------+-------------+
|      Bikes|     28316272|
|Accessories|       700262|
|   Clothing|       339716|
+-----------+-------------+



In [0]:
# What is the total revenue generated by each customer?

sales_df.join(customers_df, sales_df.customer_key == customers_df.customer_key,"left") \
.groupBy(customers_df.customer_key,"first_name","last_name") \
.agg(sum("sales_amount").alias("total_revenue")) \
.orderBy(col("total_revenue").desc()) \
.show()

+------------+----------+---------+-------------+
|customer_key|first_name|last_name|total_revenue|
+------------+----------+---------+-------------+
|        1302|   Nichole|     Nara|        13294|
|        1133|   Kaitlyn|Henderson|        13294|
|        1309|  Margaret|       He|        13268|
|        1132|   Randall|Dominguez|        13265|
|        1301|   Adriana| Gonzalez|        13242|
|        1322|      Rosa|       Hu|        13215|
|        1125|    Brandi|     Gill|        13195|
|        1308|      Brad|      She|        13172|
|        1297| Francisco|     Sara|        13164|
|         434|   Maurice|     Shan|        12914|
|         440|     Janet|    Munoz|        12488|
|         242|      Lisa|      Cai|        11468|
|         418|     Lacey|    Zheng|        11248|
|         421|    Jordan|   Turner|        11200|
|         243|     Larry|    Munoz|        11067|
|        1656|     Larry|  Vazquez|        10899|
|        2264|      Kate|    Anand|        10871|


In [0]:
#-- What is the distribution of sold items across countries?

sales_df.join(customers_df,"customer_key","left") \
.groupBy("country") \
.agg(sum("quantity").alias("total_item_sold")) \
.orderBy(col("total_item_sold").desc()) \
.show()

+--------------+---------------+
|       country|total_item_sold|
+--------------+---------------+
| United States|          20481|
|     Australia|          13346|
|        Canada|           7630|
|United Kingdom|           6910|
|       Germany|           5626|
|        France|           5559|
|           n/a|            871|
+--------------+---------------+



##  **_Ranking Analysis_**

### **_Purpose :_**
- **_To rank items (e.g., products, customers) based on performance or other metrics._**
- **_To identify top performers or laggards._**

### **_SQL Functions Used :_**
- **_Window Ranking Functions: RANK(), DENSE_RANK(), ROW_NUMBER()_**
- **_Clauses: GROUP BY, ORDER BY_**

**_Which 5 products Generating the Highest Revenue?_**

**_Importing Library_**

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [0]:
# Calculate total revenue for each product
total_rev = (
    sales_df.join(product_df, "product_key", "left")
        .groupBy("product_name")
        .agg(sum("sales_amount").alias("total_revenue"))
)

# Window for ranking by revenue (highest first)
window_spec = Window.orderBy(col("total_revenue").desc())

# Add rank and filter top 5 products
top_5_products = (
    total_rev.withColumn("rank", dense_rank().over(window_spec))
             .filter(col("rank") <= 5)
)

top_5_products.show()

+--------------------+-------------+----+
|        product_name|total_revenue|rank|
+--------------------+-------------+----+
|Mountain-200 Blac...|      1373454|   1|
|Mountain-200 Blac...|      1363128|   2|
|Mountain-200 Silv...|      1339394|   3|
|Mountain-200 Silv...|      1301029|   4|
|Mountain-200 Blac...|      1294854|   5|
+--------------------+-------------+----+



 **_What are the 5 worst-performing products in terms of sales?_**


In [0]:
# Calculate total revenue for each product
total_rev = (
    sales_df.join(product_df, "product_key", "left")
        .groupBy("product_name")
        .agg(sum("sales_amount").alias("total_revenue"))
)

# Window for ranking by revenue (Lowest first)
window_spec = Window.orderBy(col("total_revenue"))

# Add rank and filter Worst 5 products
top_5_products = (
    total_rev.withColumn("rank", dense_rank().over(window_spec))
             .filter(col("rank") <= 5)
)

top_5_products.show()

+--------------------+-------------+----+
|        product_name|total_revenue|rank|
+--------------------+-------------+----+
|     Racing Socks- L|         2430|   1|
|     Racing Socks- M|         2682|   2|
| Patch Kit/8 Patches|         6382|   3|
|Bike Wash - Disso...|         7272|   4|
|   Touring Tire Tube|         7440|   5|
+--------------------+-------------+----+



**_Find the top 10 customers who have generated the highest revenue_**

In [0]:
# Join sales and customer tables to calculate total revenue per customer.
# "customer_key" is the common column used for joining.

total_sales = (
    sales_df.join(customers_df, "customer_key", "left")
            .groupBy("customer_key", "first_name", "last_name")
            .agg(sum("sales_amount").alias("total_revenue"))
)

# Create a window specification ordered by total revenue in descending order.
# This will help in ranking customers based on their total revenue.

window_spec = Window.orderBy(col("total_revenue").desc())

# Rank customers using dense_rank() based on total revenue.
# Filter the top 10 ranked customers and select relevant columns.

top_10_cust = (
    total_sales
        .withColumn("cust_rank", dense_rank().over(window_spec))
        .filter(col("cust_rank") <= 10)
        .select("customer_key", "first_name", "last_name", "total_revenue")
)

# Display the top 10 customers with highest revenue.
top_10_cust.show()


+------------+----------+---------+-------------+
|customer_key|first_name|last_name|total_revenue|
+------------+----------+---------+-------------+
|        1302|   Nichole|     Nara|        13294|
|        1133|   Kaitlyn|Henderson|        13294|
|        1309|  Margaret|       He|        13268|
|        1132|   Randall|Dominguez|        13265|
|        1301|   Adriana| Gonzalez|        13242|
|        1322|      Rosa|       Hu|        13215|
|        1125|    Brandi|     Gill|        13195|
|        1308|      Brad|      She|        13172|
|        1297| Francisco|     Sara|        13164|
|         434|   Maurice|     Shan|        12914|
|         440|     Janet|    Munoz|        12488|
+------------+----------+---------+-------------+



## **_Change Over Time Analysis_**

### **_Purpose :_**
- **_To track trends, growth, and changes in key metrics over time._**
- **_For time-series analysis and identifying seasonality._**
- **_To measure growth or decline over specific periods._**

### **_SQL Functions Used :_**
- **_Date Functions: , date_format()_**
- **_Aggregate Functions: SUM(), COUNT(), AVG()_**


In [0]:
total = sales_df \
    .withColumn("order_year",year("order_date")) \
    .withColumn("order_month",month("order_date")) \
    .filter(col("order_date").isNotNull()) \
    .groupBy("order_year","order_month") \
        .agg(
                sum("sales_amount").alias("total_sales"),
                sum("quantity").alias("total_quantity"),
                countDistinct("customer_key").alias("total_customer")) \
                .orderBy("order_year","order_month")

total.show()

+----------+-----------+-----------+--------------+--------------+
|order_year|order_month|total_sales|total_quantity|total_customer|
+----------+-----------+-----------+--------------+--------------+
|      2010|         12|      43419|            14|            14|
|      2011|          1|     469795|           144|           144|
|      2011|          2|     466307|           144|           144|
|      2011|          3|     485165|           150|           150|
|      2011|          4|     502042|           157|           157|
|      2011|          5|     561647|           174|           174|
|      2011|          6|     737793|           230|           230|
|      2011|          7|     596710|           188|           188|
|      2011|          8|     614516|           193|           193|
|      2011|          9|     603047|           185|           185|
|      2011|         10|     708164|           221|           221|
|      2011|         11|     660507|           208|           

In [0]:
# ---------------------------------------------------------
# 1. TOTAL SALES, QUANTITY & CUSTOMERS BY YEAR & MONTH
# ---------------------------------------------------------
# - Extract year and month using year() and month()
# - Filter out records where order_date is null
# - Group by year and month
# - Calculate:
#       total_sales     → sum of sales_amount
#       total_quantity  → sum of quantity
#       total_customer  → count of distinct customer_key
# - Order the output by year and month
total = (
    sales_df
        .withColumn("order_year", year("order_date"))
        .withColumn("order_month", month("order_date"))
        .filter(col("order_date").isNotNull())
        .groupBy("order_year", "order_month")
        .agg(
            sum("sales_amount").alias("total_sales"),
            sum("quantity").alias("total_quantity"),
            countDistinct("customer_key").alias("total_customer")
        )
        .orderBy("order_year", "order_month")
)

total.show()

# ---------------------------------------------------------
# 2. TOTALS GROUPED BY YEAR (yyyy)
# ---------------------------------------------------------
# - Convert order_date to year format yyyy using date_format
# - Group by formatted year
# - Compute total_sales, total_quantity and total_customer
total_year = (
    sales_df
        .withColumn("order_date_f", date_format("order_date", "yyyy"))
        .filter(col("order_date").isNotNull())
        .groupBy("order_date_f")
        .agg(
            sum("sales_amount").alias("total_sales"),
            sum("quantity").alias("total_quantity"),
            countDistinct("customer_key").alias("total_customer")
        )
        .orderBy("order_date_f")
)

total_year.show()

# ---------------------------------------------------------
# 3. TOTALS GROUPED BY MONTH NAME (MMM)
# ---------------------------------------------------------
# - Convert order_date to 3-letter month name (Jan, Feb, Mar...)
# - Group by month name
# - Compute total_sales, total_quantity, total_customer
total_month = (
    sales_df
        .withColumn("order_date_f", date_format("order_date", "MMM"))
        .filter(col("order_date").isNotNull())
        .groupBy("order_date_f")
        .agg(
            sum("sales_amount").alias("total_sales"),
            sum("quantity").alias("total_quantity"),
            countDistinct("customer_key").alias("total_customer")
        )
        .orderBy("order_date_f")
)

total_month.show()


# ---------------------------------------------------------
# 4. TOTALS GROUPED BY YEAR-MONTH (yyyy-MMM)
# ---------------------------------------------------------
# - Create year-month format (e.g., "2023-Jan", "2024-Mar")
# - Group by the formatted value
# - Compute total metrics
year_month = (
    sales_df
        .withColumn("order_date_f", date_format("order_date", "yyyy-MMM"))
        .filter(col("order_date").isNotNull())
        .groupBy("order_date_f")
        .agg(
            sum("sales_amount").alias("total_sales"),
            sum("quantity").alias("total_quantity"),
            countDistinct("customer_key").alias("total_customer")
        )
        .orderBy("order_date_f")
)

year_month.show()


+----------+-----------+-----------+--------------+--------------+
|order_year|order_month|total_sales|total_quantity|total_customer|
+----------+-----------+-----------+--------------+--------------+
|      2010|         12|      43419|            14|            14|
|      2011|          1|     469795|           144|           144|
|      2011|          2|     466307|           144|           144|
|      2011|          3|     485165|           150|           150|
|      2011|          4|     502042|           157|           157|
|      2011|          5|     561647|           174|           174|
|      2011|          6|     737793|           230|           230|
|      2011|          7|     596710|           188|           188|
|      2011|          8|     614516|           193|           193|
|      2011|          9|     603047|           185|           185|
|      2011|         10|     708164|           221|           221|
|      2011|         11|     660507|           208|           

## **_Cumulative Analysis_**

### **_Purpose :_**
- **_To calculate running totals or moving averages for key metrics.._**
- **_To track performance over time cumulatively._**
- **_Useful for growth analysis or identifying long-term trends._**

### **_SQL Functions Used :_**
- **_Window Functions: SUM() OVER(), AVG() OVER()_**


In [0]:

# Calculate the total sales per month 
# and the running total of sales over time 

sales_data = (
    sales_df.withColumn("order_date",year("order_date"))
    .filter(col("order_date").isNotNull())
    .groupBy("order_date")
        .agg(sum("sales_amount").alias("total_sales"),
             round(avg("price"),2).alias('avg_price'))
        )

window_spec = Window.orderBy(col("order_date"))

(sales_data.withColumn("running_total",sum("total_sales").over(window_spec))
            .withColumn("moving_avg_price",avg("avg_price").over(window_spec))
).show()

+----------+-----------+---------+-------------+------------------+
|order_date|total_sales|avg_price|running_total|  moving_avg_price|
+----------+-----------+---------+-------------+------------------+
|      2010|      43419|  3101.36|        43419|           3101.36|
|      2011|    7075088|  3192.73|      7118507|          3147.045|
|      2012|    5842231|  1719.82|     12960738|2671.3033333333333|
|      2013|   16344878|   309.66|     29305616|         2080.8925|
|      2014|      45642|    23.17|     29351258|          1669.348|
+----------+-----------+---------+-------------+------------------+



### **_Performance Analysis (Year-over-Year, Month-over-Month)_**

In [0]:

"""
Analyze the yearly performance of products by comparing:
1. Each year's sales with the product's average sales across all years
2. Each year's sales with the previous year's sales
"""

from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Window to calculate metrics across the entire history of each product
window_all_years = Window.partitionBy("product_name")

# Window to calculate year-over-year comparison (ordered by year)
window_year_order = Window.partitionBy("product_name").orderBy("order_year")

# ------------------ Main Calculation ------------------

yearly_product_sales = (
    sales_df
        # Join product master to get product_name
        .join(product_df, "product_key", "left")

        # Extract year from the order_date
        .withColumn("order_year", year("order_date"))

        # Aggregate yearly sales per product
        .groupBy("order_year", "product_name")
        .agg(sum("sales_amount").alias("current_sales"))

        # -------- Compare with average sales of that product --------
        # Calculate average sales of the product across all years
        .withColumn("avg_sales", avg("current_sales").over(window_all_years))

        # Difference vs average
        .withColumn("diff_avg", col("current_sales") - col("avg_sales"))

        # Above / Below / Exact Avg flag
        .withColumn(
            "avg_change",
            when(col("diff_avg") > 0, "Above Avg")
            .when(col("diff_avg") < 0, "Below Avg")
            .otherwise("Equal to Avg")
        )

        # -------- Compare with previous year's sales --------
        # Get last year's sales
        .withColumn("py_sales", lag("current_sales").over(window_year_order))

        # Difference from last year
        .withColumn("py_sales_diff", col("current_sales") - col("py_sales"))

        # Increase / Decrease / No Change flag
        .withColumn(
            "py_change",
            when(col("py_sales_diff") > 0, "Increase")
            .when(col("py_sales_diff") < 0, "Decrease")
            .otherwise("No Change")
        )

        # Final ordering
        .orderBy("product_name", "order_year")
)

yearly_product_sales.show(truncate=False)

+----------+----------------------+-------------+---------+---------+----------+--------+-------------+---------+
|order_year|product_name          |current_sales|avg_sales|diff_avg |avg_change|py_sales|py_sales_diff|py_change|
+----------+----------------------+-------------+---------+---------+----------+--------+-------------+---------+
|2012      |AWC Logo Cap          |72           |6570.0   |-6498.0  |Below_avg |NULL    |NULL         |No Change|
|2013      |AWC Logo Cap          |18891        |6570.0   |12321.0  |Above_avg |72      |18819        |Increase |
|2014      |AWC Logo Cap          |747          |6570.0   |-5823.0  |Below_avg |18891   |-18144       |Decrease |
|2012      |All-Purpose Bike Stand|159          |13197.0  |-13038.0 |Below_avg |NULL    |NULL         |No Change|
|2013      |All-Purpose Bike Stand|37683        |13197.0  |24486.0  |Above_avg |159     |37524        |Increase |
|2014      |All-Purpose Bike Stand|1749         |13197.0  |-11448.0 |Below_avg |37683   

## **_Data Segmentation Analysis_**

In [0]:
"""
Categorize products based on cost ranges and count how many products fall
into each cost bucket.
"""

from pyspark.sql.functions import *

# -------------- Cost Range Categorization & Aggregation ---------------- #

cost_range_summary = (
    product_df
        # Select required columns
        .select("product_key", "product_name", "cost")

        # Create cost range bucket for each product
        .withColumn(
            "cost_range",
            when(col("cost") < 100, "Below 100")
            .when(col("cost").between(100, 500), "100 - 500")
            .when(col("cost").between(500, 1000), "500 - 1000")
            .otherwise("Above 1000")
        )

        # Group by cost range and count number of products
        .groupBy("cost_range")
        .agg(count("product_key").alias("total_products"))

        # Optional: Sort cost ranges logically
        .orderBy("cost_range")
)

cost_range_summary.show(truncate=False)


+----------+-------------+
|cost_range|total_product|
+----------+-------------+
|500 - 1000|45           |
|100 - 500 |101          |
|Above 1000|39           |
|Below 100 |110          |
+----------+-------------+



In [0]:
"""
    Group customers into three segments based on their spending behavior:
	    - VIP: Customers with at least 12 months of history and spending more than €5,000.
	    - Regular: Customers with at least 12 months of history but spending €5,000 or less.
	    - New: Customers with a lifespan less than 12 months.
    And find the total number of customers by each group
"""
from pyspark.sql.functions import *

customer_segmentation = (
    sales_df
        # Join customer info
        .join(customers_df, "customer_key", "left")

        # Aggregate customer-level metrics
        .groupBy("customer_key")
        .agg(
            sum("sales_amount").alias("total_spending"),
            min("order_date").alias("first_order_date"),
            max("order_date").alias("last_order_date")
        )

        # Customer lifespan in months
        .withColumn(
            "lifespan",
            months_between(col("last_order_date"), col("first_order_date"))
                .cast("int")     # convert to integer months
        )

        # Customer segmentation logic
        .withColumn(
            "customer_segment",
            when((col("lifespan") >= 12) & (col("total_spending") > 5000), "VIP")
            .when((col("lifespan") >= 12) & (col("total_spending") <= 5000), "Regular")
            .otherwise("New")
        )

        # Count customers in each segment
        .groupBy("customer_segment")
        .agg(count("customer_key").alias("total_customers"))
        
)

customer_segmentation.show(truncate=False)


+----------------+---------------+
|customer_segment|total_customers|
+----------------+---------------+
|New             |14828          |
|Regular         |2037           |
|VIP             |1619           |
+----------------+---------------+

