**Setup Spark Session**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime

In [3]:
spark = SparkSession.builder \
    .appName("CustomerPurchaseAnalytics") \
    .getOrCreate()


**Load Raw Data**

In [4]:
from google.colab import files
uploaded = files.upload()

Saving customer_shopping_behavior.csv to customer_shopping_behavior.csv


In [5]:
df = spark.read.csv("customer_shopping_behavior.csv", header=True, inferSchema=True)

In [6]:
df.printSchema()

root
 |-- Customer ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Item Purchased: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Purchase Amount (USD): integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Review Rating: double (nullable = true)
 |-- Subscription Status: string (nullable = true)
 |-- Shipping Type: string (nullable = true)
 |-- Discount Applied: string (nullable = true)
 |-- Promo Code Used: string (nullable = true)
 |-- Previous Purchases: integer (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Frequency of Purchases: string (nullable = true)



**Data Cleaning**

In [7]:
# Remove duplicates
df = df.dropDuplicates()

Column Renaming

In [8]:
df = df.withColumnRenamed("Customer ID", "customer_id") \
       .withColumnRenamed("Purchase Amount (USD)", "purchase_amount") \
       .withColumnRenamed("Previous Purchases", "previous_purchases") \
       .withColumnRenamed("Review Rating", "review_rating") \
       .withColumnRenamed("Subscription Status", "subscription_status") \
       .withColumnRenamed("Frequency of Purchases", "purchase_frequency_type")


In [9]:
# Remove invalid data (better than dropna)
df = df.filter(col("purchase_amount") > 0)
df = df.filter(col("customer_id").isNotNull())

**Customer Revenue & Frequency**

In [10]:
customer_metrics = df.groupBy("customer_id") \
    .agg(
        sum("purchase_amount").alias("total_revenue"),
        avg("purchase_amount").alias("avg_order_value"),
        max("previous_purchases").alias("purchase_frequency")
    )

Window Functions

In [11]:
window_spec = Window.orderBy(col("total_revenue").desc())

customer_metrics = customer_metrics.withColumn(
    "revenue_rank",
    rank().over(window_spec)
)

running_window = Window.orderBy("revenue_rank")

customer_metrics = customer_metrics.withColumn(
    "cumulative_revenue",
    sum("total_revenue").over(running_window)
)


**Category Revenue**

In [12]:
category_revenue = df.groupBy("Category") \
    .agg(sum("purchase_amount").alias("category_revenue"))


In [13]:
#Revenue by Gender
gender_revenue = df.groupBy("Gender") \
    .agg(sum("purchase_amount").alias("gender_revenue"))


In [14]:
#Subscription Impact
subscription_revenue = df.groupBy("subscription_status") \
    .agg(sum("purchase_amount").alias("subscription_revenue"))


In [15]:
#Revenue Rating Impact
rating_analysis = df.groupBy("review_rating") \
    .agg(
        avg("purchase_amount").alias("avg_purchase"),
        count("*").alias("total_orders")
    )


In [16]:
#Payment Method Analysis
payment_analysis = df.groupBy("Payment Method") \
    .agg(sum("purchase_amount").alias("payment_revenue"))


Customer Segmentation (Behavior Based)

In [17]:
customer_segment = customer_metrics.withColumn(
    "segment",
    when((col("total_revenue") > 300) & (col("purchase_frequency") > 20), "High-Value")
    .when(col("purchase_frequency") > 10, "Loyal")
    .when(col("purchase_frequency") < 5, "At-Risk")
    .otherwise("Regular")
)


**Spark SQL**

In [18]:
df.createOrReplaceTempView("transactions")

spark.sql("""
SELECT Category,
       SUM(purchase_amount) AS revenue
FROM transactions
GROUP BY Category
ORDER BY revenue DESC
""").show()


+-----------+-------+
|   Category|revenue|
+-----------+-------+
|   Clothing| 104264|
|Accessories|  74200|
|   Footwear|  36093|
|  Outerwear|  18524|
+-----------+-------+



**Retention Risk Customers**

In [19]:
at_risk_customers = customer_segment.filter(col("segment") == "At-Risk")


**What-If Revenue Uplift (Example: 15% Retention Improvement)**

In [20]:
uplift = at_risk_customers \
    .agg(sum("total_revenue") * 0.15) \
    .withColumnRenamed("(sum(total_revenue) * 0.15)", "estimated_revenue_uplift")

uplift.show()


+------------------------+
|estimated_revenue_uplift|
+------------------------+
|      3015.2999999999997|
+------------------------+



caching

In [21]:
customer_segment.cache()


DataFrame[customer_id: int, total_revenue: bigint, avg_order_value: double, purchase_frequency: int, revenue_rank: int, cumulative_revenue: bigint, segment: string]

**Save as Parquet**

In [22]:
customer_segment.write.mode("overwrite") \
    .partitionBy("segment") \
    .parquet("analytics/customer_segments")

category_revenue.write.mode("overwrite") \
    .parquet("analytics/category_revenue")


**Export for Power BI**

In [24]:
customer_segment.toPandas().to_csv("/content/customer_segments.csv", index=False)
category_revenue.toPandas().to_csv("/content/category_revenue.csv", index=False)

In [25]:
from google.colab import files
files.download("/content/customer_segments.csv")
files.download("/content/category_revenue.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**Cleaned Dataset downloading for dashboard**

In [26]:
# Join original cleaned dataframe with customer metrics

final_dashboard_df = df.join(
    customer_segment.select(
        "customer_id",
        "total_revenue",
        "avg_order_value",
        "purchase_frequency",
        "revenue_rank",
        "segment"
    ),
    on="customer_id",
    how="left"
)

final_dashboard_df.show(5)

+-----------+---+------+--------------+-----------+---------------+-------------+----+------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+-----------------------+-------------+---------------+------------------+------------+-------+
|customer_id|Age|Gender|Item Purchased|   Category|purchase_amount|     Location|Size| Color|Season|review_rating|subscription_status|Shipping Type|Discount Applied|Promo Code Used|previous_purchases|Payment Method|purchase_frequency_type|total_revenue|avg_order_value|purchase_frequency|revenue_rank|segment|
+-----------+---+------+--------------+-----------+---------------+-------------+----+------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+-----------------------+-------------+---------------+------------------+------------+-------+
|         72| 36|  Male|         Dress|   Clothing|             48|   

In [27]:
final_dashboard_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("final_dashboard_dataset")

In [28]:
import os
os.listdir("final_dashboard_dataset")

['part-00000-71ee9740-e691-4f65-a473-4010d5761124-c000.csv',
 '_SUCCESS',
 '.part-00000-71ee9740-e691-4f65-a473-4010d5761124-c000.csv.crc',
 '._SUCCESS.crc']

In [30]:
from google.colab import files

files.download("final_dashboard_dataset/part-00000-71ee9740-e691-4f65-a473-4010d5761124-c000.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>