In [9]:
import os
import sys
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import unittest
import json
import csv
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP

In [10]:
print(os.getcwd())

/content


In [15]:
import pandas as pd
from pyspark.sql import SparkSession

# Create Spark session if not already created
spark = SparkSession.builder \
    .appName("EcommerceAnalysis") \
    .getOrCreate()

base_path = "/content"

In [16]:
products_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"{base_path}/Products.csv")


In [17]:
display(products_df)

DataFrame[Product ID: string, Category: string, Sub-Category: string, Product Name: string, State: string, Price per product: string]

In [6]:
orders_df = spark.read \
    .format("json") \
    .option("multiLine", "true") \
    .load(f"{base_path}/Orders.json")

In [7]:
display(orders_df)

DataFrame[Customer ID: string, Discount: double, Order Date: string, Order ID: string, Price: double, Product ID: string, Profit: double, Quantity: bigint, Row ID: bigint, Ship Date: string, Ship Mode: string]

In [13]:
customer_pd = pd.read_excel(f"{base_path}/Customer.xlsx")
customer_df = spark.createDataFrame(customer_pd)

In [14]:
display(customer_df)

DataFrame[Customer ID: string, Customer Name: string, email: string, phone: string, address: string, Segment: string, Country: string, City: string, State: string, Postal Code: bigint, Region: string]

In [18]:
orders_df.show(5)

+-----------+--------+----------+--------------+------+---------------+------+--------+------+---------+--------------+
|Customer ID|Discount|Order Date|      Order ID| Price|     Product ID|Profit|Quantity|Row ID|Ship Date|     Ship Mode|
+-----------+--------+----------+--------------+------+---------------+------+--------+------+---------+--------------+
|   JK-15370|     0.3| 21/8/2016|CA-2016-122581|573.17|FUR-CH-10002961| 63.69|       7|     1|25/8/2016|Standard Class|
|   BD-11320|     0.0| 23/9/2017|CA-2017-117485|291.96|TEC-AC-10004659|102.19|       4|     2|29/9/2017|Standard Class|
|   LB-16795|     0.7| 6/10/2016|US-2016-157490|  17.0|OFF-BI-10002824|-14.92|       4|     3|7/10/2016|   First Class|
|   KB-16315|     0.2|  2/7/2015|CA-2015-111703| 15.55|OFF-PA-10003349|  5.64|       3|     4| 9/7/2015|Standard Class|
|   DO-13435|     0.2| 3/10/2014|CA-2014-108903|142.49|TEC-AC-10003023|  -3.0|       3|     5|3/10/2014|      Same Day|
+-----------+--------+----------+-------

In [None]:
#Task1

In [19]:

def create_raw_tables(customer_df, orders_df, products_df):
    """
    Task 1: Create raw tables (temporary views) for each source dataset
    """
    print("="*80)
    print("TASK 1: Creating Raw Tables")
    print("="*80)

    # Create temporary views (raw tables)
    customer_df.createOrReplaceTempView("raw_customers")
    orders_df.createOrReplaceTempView("raw_orders")
    products_df.createOrReplaceTempView("raw_products")

    print(f"✓ Created raw_customers table with {customer_df.count()} records")
    print(f"✓ Created raw_orders table with {orders_df.count()} records")
    print(f"✓ Created raw_products table with {products_df.count()} records")

    # Display schemas
    print("\n--- Raw Customers Schema ---")
    customer_df.printSchema()

    print("\n--- Raw Orders Schema ---")
    orders_df.printSchema()

    print("\n--- Raw Products Schema ---")
    products_df.printSchema()

    return {
        'customers': customer_df,
        'orders': orders_df,
        'products': products_df
    }


In [20]:
create_raw_tables(customer_df, orders_df, products_df)

TASK 1: Creating Raw Tables
✓ Created raw_customers table with 793 records
✓ Created raw_orders table with 9994 records
✓ Created raw_products table with 1851 records

--- Raw Customers Schema ---
root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: long (nullable = true)
 |-- Region: string (nullable = true)


--- Raw Orders Schema ---
root
 |-- Customer ID: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Row ID: l

{'customers': DataFrame[Customer ID: string, Customer Name: string, email: string, phone: string, address: string, Segment: string, Country: string, City: string, State: string, Postal Code: bigint, Region: string],
 'orders': DataFrame[Customer ID: string, Discount: double, Order Date: string, Order ID: string, Price: double, Product ID: string, Profit: double, Quantity: bigint, Row ID: bigint, Ship Date: string, Ship Mode: string],
 'products': DataFrame[Product ID: string, Category: string, Sub-Category: string, Product Name: string, State: string, Price per product: string]}

In [23]:
#Task2

In [25]:
def create_enriched_customers(customer_df):
    """
    Task 2.1: Create enriched customers table with additional derived fields
    """
    print("\n" + "="*80)
    print("TASK 2.1: Creating Enriched Customers Table")
    print("="*80)

    # Note: Using backticks for column names with spaces
    enriched_customers = customer_df \
        .withColumn("Customer_Name_Clean", F.trim(F.col("`Customer Name`"))) \
        .withColumn("Country_Clean", F.trim(F.col("Country"))) \
        .withColumn("City_Clean", F.trim(F.col("City"))) \
        .withColumn("State_Clean", F.trim(F.col("State"))) \
        .withColumn("Full_Address",
                   F.concat_ws(", ", F.col("City"), F.col("State"), F.col("Country"))) \
        .withColumn("Customer_Segment_Priority",
                   F.when(F.col("Segment") == "Consumer", 1)
                    .when(F.col("Segment") == "Corporate", 2)
                    .when(F.col("Segment") == "Home Office", 3)
                    .otherwise(0)) \
        .withColumn("Has_Email",
                   F.when(F.col("email").isNotNull(), 1).otherwise(0)) \
        .withColumn("Has_Phone",
                   F.when(F.col("phone").isNotNull(), 1).otherwise(0)) \
        .withColumn("processed_date", F.current_date()) \
        .dropDuplicates(["Customer ID"])

    # Create temporary view
    enriched_customers.createOrReplaceTempView("enriched_customers")

    print(f"✓ Created enriched_customers with {enriched_customers.count()} unique customers")
    print("\nSample enriched customer records:")
    enriched_customers.select("`Customer ID`", "Customer_Name_Clean", "Full_Address",
                             "Customer_Segment_Priority").show(5, truncate=False)

    # Show segment distribution
    print("\nCustomer Segment Distribution:")
    enriched_customers.groupBy("Segment").count().orderBy("count", ascending=False).show()

    return enriched_customers

In [26]:
create_enriched_customers(customer_df)


TASK 2.1: Creating Enriched Customers Table
✓ Created enriched_customers with 793 unique customers

Sample enriched customer records:
+-----------+-------------------+--------------------------------------+-------------------------+
|Customer ID|Customer_Name_Clean|Full_Address                          |Customer_Segment_Priority|
+-----------+-------------------+--------------------------------------+-------------------------+
|AA-10315   |Alex Avila         |Round Rock, Texas, United States      |1                        |
|AA-10375   |Allen Armold       |Atlanta, Georgia, United States       |1                        |
|AA-10480   |Andrew Allen       |Concord, North Carolina, United States|1                        |
|AA-10645   |Anna Andreadi      |Chester, Pennsylvania, United States  |1                        |
|AB-10015   |Aaron Bergman      |Arlington, Texas, United States       |1                        |
+-----------+-------------------+--------------------------------------+-

DataFrame[Customer ID: string, Customer Name: string, email: string, phone: string, address: string, Segment: string, Country: string, City: string, State: string, Postal Code: bigint, Region: string, Customer_Name_Clean: string, Country_Clean: string, City_Clean: string, State_Clean: string, Full_Address: string, Customer_Segment_Priority: int, Has_Email: int, Has_Phone: int, processed_date: date]

In [28]:
# Task 3
def create_enriched_orders(orders_df, customer_df, products_df):
    """
    Task 3: Create enriched orders table by joining orders, customers, and products
    """
    print("\n" + "="*80)
    print("TASK 3: Creating Enriched Orders Table")
    print("="*80)

    # Join orders with enriched customers
    orders_with_customer = orders_df.join(
        customer_df,
        on="Customer ID",
        how="inner"
    ).select(
        orders_df["*"], # Select all columns from orders
        customer_df["Customer Name"],
        customer_df["Country"]
    )

    # Join with products
    enriched_orders = orders_with_customer.join(
        products_df,
        on="Product ID",
        how="inner"
    ).select(
        orders_with_customer["*"], # Select all columns from previous join
        products_df["Category"],
        products_df["Sub-Category"]
    )

    # Round Profit to 2 decimal places and cast to Decimal
    enriched_orders = enriched_orders.withColumn(
        "Profit_Rounded",
        F.col("Profit").cast(DecimalType(10, 2))
    )

    # Create temporary view
    enriched_orders.createOrReplaceTempView("enriched_orders")

    print(f"✓ Created enriched_orders with {enriched_orders.count()} records")
    print("\nSample enriched order records:")
    enriched_orders.select(
        "Order ID",
        "Customer ID",
        "Customer Name",
        "Country",
        "Product ID",
        "Category",
        "Sub-Category",
        "Profit",
        "Profit_Rounded"
    ).show(5, truncate=False)

    return enriched_orders



In [29]:
enriched_orders_df = create_enriched_orders(orders_df, customer_df, products_df)


TASK 3: Creating Enriched Orders Table
✓ Created enriched_orders with 10133 records

Sample enriched order records:
+--------------+-----------+-------------+-------------+---------------+---------------+------------+------+--------------+
|Order ID      |Customer ID|Customer Name|Country      |Product ID     |Category       |Sub-Category|Profit|Profit_Rounded|
+--------------+-----------+-------------+-------------+---------------+---------------+------------+------+--------------+
|US-2015-145422|PW-19240   |Pierre Wener |United States|FUR-BO-10002213|Furniture      |Bookcases   |-29.61|-29.61        |
|US-2015-145422|PW-19240   |Pierre Wener |United States|FUR-BO-10002213|Furniture      |Bookcases   |-29.61|-29.61        |
|US-2015-114839|PW-19240   |Pierre Wener |United States|FUR-CH-10004086|Furniture      |Chairs      |-5.83 |-5.83         |
|CA-2015-144274|PW-19240   |Pierre Wener |United States|OFF-PA-10003441|Office Supplies|Paper       |19.0  |19.00         |
|CA-2014-164210

In [33]:
# Task 4
def create_profit_by_year_category_customer(enriched_orders_df):
    """
    Task 4: Create an aggregate table that shows profit by Year, Product Category,
            Product Sub Category, and Customer
    """
    print("\n" + "="*80)
    print("TASK 4: Creating Profit by Year, Category, Sub-Category, and Customer Table")
    print("="*80)

    spark.conf.set("spark.sql.legacy.timeParserPolicy", "CORRECTED")

    profit_by_agg = enriched_orders_df \
        .withColumn("parsed_date",
                    F.coalesce(
                        F.to_date(F.col("Order Date"), "d/M/yyyy"),
                        F.to_date(F.col("Order Date"), "dd/MM/yyyy"),
                        F.to_date(F.col("Order Date"), "M/d/yyyy"),
                        F.to_date(F.col("Order Date"), "MM/dd/yyyy")
                    )
                   ) \
        .withColumn("Order_Year", F.year(F.col("parsed_date")))

    print("\nRows with NULL Order_Year after attempting multiple formats:")
    profit_by_agg.filter(F.col("Order_Year").isNull()).select("Order Date", "parsed_date").distinct().show(truncate=False)


    profit_by_agg = profit_by_agg.groupBy("Order_Year", "Category", "Sub-Category", "Customer ID", "Customer Name") \
        .agg(F.sum("Profit_Rounded").alias("Total_Profit")) \
        .orderBy("Order_Year", "Category", "Sub-Category", "Customer ID")


    profit_by_agg.createOrReplaceTempView("profit_by_year_category_customer")

    print(f"✓ Created profit_by_year_category_customer with {profit_by_agg.count()} records")
    print("\nSample records from profit_by_year_category_customer:")
    profit_by_agg.show(5, truncate=False)

    return profit_by_agg

profit_by_agg_df = create_profit_by_year_category_customer(enriched_orders_df)


TASK 4: Creating Profit by Year, Category, Sub-Category, and Customer Table

Rows with NULL Order_Year after attempting multiple formats:
+----------+-----------+
|Order Date|parsed_date|
+----------+-----------+
+----------+-----------+

✓ Created profit_by_year_category_customer with 7937 records

Sample records from profit_by_year_category_customer:
+----------+---------+------------+-----------+-------------+------------+
|Order_Year|Category |Sub-Category|Customer ID|Customer Name|Total_Profit|
+----------+---------+------------+-----------+-------------+------------+
|2014      |Furniture|Bookcases   |AF-10885   |Art Foster   |-216.98     |
|2014      |Furniture|Bookcases   |BD-11605   |Brian Dahlen |3.93        |
|2014      |Furniture|Bookcases   |BM-11650   |Brian Moss   |20.52       |
|2014      |Furniture|Bookcases   |BS-11590   |Brendan Sweed|-53.29      |
|2014      |Furniture|Bookcases   |BT-11440   |Bobby Trafton|-44.20      |
+----------+---------+------------+---------

In [36]:
# Task 5
print("\n" + "="*80)
print("TASK 5: SQL Aggregations")
print("="*80)

# 5.1 Profit by Year
print("\n--- Profit by Year ---")
spark.sql("""
    SELECT
        Order_Year,
        SUM(Total_Profit) AS Total_Profit
    FROM profit_by_year_category_customer
    GROUP BY Order_Year
    ORDER BY Order_Year
""").show()

# 5.2 Profit by Year + Product Category
print("\n--- Profit by Year and Product Category ---")
spark.sql("""
    SELECT
        Order_Year,
        Category,
        SUM(Total_Profit) AS Total_Profit
    FROM profit_by_year_category_customer
    GROUP BY Order_Year, Category
    ORDER BY Order_Year, Category
""").show()

# 5.3 Profit by Customer
print("\n--- Profit by Customer ---")
spark.sql("""
    SELECT
        `Customer ID`,
        `Customer Name`,
        SUM(Total_Profit) AS Total_Profit
    FROM profit_by_year_category_customer
    GROUP BY `Customer ID`, `Customer Name`
    ORDER BY Total_Profit DESC
""").show(truncate=False)

# 5.4 Profit by Customer + Year
print("\n--- Profit by Customer and Year ---")
spark.sql("""
    SELECT
        `Customer ID`,
        `Customer Name`,
        Order_Year,
        SUM(Total_Profit) AS Total_Profit
    FROM profit_by_year_category_customer
    WHERE Order_Year IS NOT NULL -- Exclude rows with NULL year if any remain
    GROUP BY `Customer ID`, `Customer Name`, Order_Year
    ORDER BY `Customer ID`, Order_Year
""").show(truncate=False)


TASK 5: SQL Aggregations

--- Profit by Year ---
+----------+------------+
|Order_Year|Total_Profit|
+----------+------------+
|      2014|    40975.40|
|      2015|    65706.25|
|      2016|    68161.13|
|      2017|   127175.23|
+----------+------------+


--- Profit by Year and Product Category ---
+----------+---------------+------------+
|Order_Year|       Category|Total_Profit|
+----------+---------------+------------+
|      2014|      Furniture|    -5174.65|
|      2014|Office Supplies|    22663.76|
|      2014|     Technology|    23486.29|
|      2015|      Furniture|     3392.12|
|      2015|Office Supplies|    25490.34|
|      2015|     Technology|    36823.79|
|      2016|      Furniture|     7750.15|
|      2016|Office Supplies|    35973.60|
|      2016|     Technology|    24437.38|
|      2017|      Furniture|     3361.76|
|      2017|Office Supplies|    45330.39|
|      2017|     Technology|    78483.08|
+----------+---------------+------------+


--- Profit by Customer