# Customer Behaviour Analysis

# Objective
In this case study, you will be working on E-commerce Customer Behavior Analysis using Apache Spark, a powerful distributed computing framework designed for big data processing. This assignment aims to give you hands-on experience in analyzing large-scale e-commerce datasets using PySpark. You will apply techniques learned in data analytics to clean, transform, and explore customer behavior data, drawing meaningful insights to support business decision-making. Apart from understanding how big data tools can optimize performance on a single machine and across clusters, you will develop a structured approach to analyzing customer segmentation, purchase patterns, and behavioral trends.

# Business Value
E-commerce businesses operate in a highly competitive market where understanding customer behavior is critical to driving growth and retention. To stay ahead, companies must leverage data-driven insights to optimize marketing strategies, personalize customer experiences, and improve product offerings. In this assignment, you will analyze e-commerce transaction data to uncover patterns in purchasing behavior, customer preferences, and sales performance. With Apache Spark's ability to handle large datasets efficiently, businesses can process vast amounts of customer interactions in real-time, helping them make faster and more informed decisions.
As an analyst at an e-commerce company, your task is to examine historical transaction records and customer survey data to derive actionable insights that can drive business growth. Your analysis will help identify high-value customers, segment users based on behavior, and uncover trends in product demand and customer engagement. By leveraging big data analytics, businesses can enhance customer satisfaction, improve retention rates, and maximize revenue opportunities.


# Assignment Tasks
1. Data Preparation
2. Data Cleaning
3. Exploratory Data Analysis
4. Customer Segmentation (RFM Analysis) and Business Insights
5. Evaluation and Conclusion


# Dataset Overview
The dataset can be accessed the following [link](https://drive.google.com/drive/folders/1mBgC5tvZrh1bIBvpXVP_j-au5LFUAwOZ?usp=sharing).

The dataset used in this analysis comprises longitudinal purchase records from 5,027 Amazon.com users in the United States, spanning 2018 to 2022.

It is structured into three CSV files (amazon-purchases.csv, survey.csv, and fields.csv) that capture transactional data, demographic profiles, and survey responses.

Collected with informed consent, the dataset enables analysis of customer behavior, product preferences, and demographic trends.

**NOTE**: Personal identifiers (PII) were removed to ensure privacy, and all data were preprocessed by users before submission.

`Data Dictionary:`

| **Attribute**          | **Description** |
|------------------------|----------------|
| **Order Dates**        | The specific dates when orders were placed, enabling chronological analysis of sales trends. |
| **Title** |The name of the product purchased. |
|**Category** | The classification or group to which the product belongs, facilitating category-wise analysis. |
| **Pricing** | The cost per unit of each product, essential for revenue calculations and pricing strategy assessments. |
| **Quantities** | The number of units of each product ordered in a transaction, aiding in inventory and demand analysis. |
| **Shipping States**    | The states to which products were shipped, useful for geographical sales distribution analysis. |
| **Survey ResponseID**  | A unique identifier linking purchases to customer survey responses, enabling correlation between purchasing behavior and customer feedback. |



# Loading the Datasets

In [1]:
%pip install pyspark==3.5.4 datasets==3.3.2 pandas==1.5.3 matplotlib==3.8.4 seaborn==0.13.2 numpy==1.26.4 tqdm==4.67.1

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark==3.5.4
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting datasets==3.3.2
  Downloading datasets-3.3.2-py3-none-any.whl (485 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.4/485.4 kB[0m [31m74.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pandas==1.5.3
  Downloading pandas-1.5.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.2/12.2 MB[0m [31m74.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting matplotlib==3.8.4
  Downloading matplotlib-3.8.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Initialise Spark session
spark = SparkSession.builder \
    .appName("Customer Behavior Analysis") \
    .getOrCreate()


# Load the paths to the datasets/csv files
amazon_purchases_path = "s3a://customeranalysis123/amazon-purchases.csv"
survey_path = "s3a://customeranalysis123/survey.csv"
fields_path = "s3a://customeranalysis123/fields.csv"

# Load datasets into PySpark DataFrames
amazon_purchases = spark.read.csv(amazon_purchases_path, header=True, inferSchema=True)
survey = spark.read.csv(survey_path, header=True, inferSchema=True)
fields = spark.read.csv(fields_path, header=True, inferSchema=True)


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1752833681866_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
sc = spark.sparkContext # access SparkContext from SparkSession

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
sc.install_pypi_package("seaborn")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("scikit-learn")
sc.install_pypi_package("boto3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting seaborn
  Downloading seaborn-0.12.2-py3-none-any.whl (293 kB)
Collecting pandas>=0.25
  Downloading pandas-1.3.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
Collecting typing_extensions; python_version < "3.8"
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Collecting matplotlib!=3.6.1,>=3.1
  Downloading matplotlib-3.5.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (11.2 MB)
Collecting python-dateutil>=2.7.3
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Collecting packaging>=20.0
  Downloading packaging-24.0-py3-none-any.whl (53 kB)
Collecting pyparsing>=2.2.1
  Downloading pyparsing-3.1.4-py3-none-any.whl (104 kB)
Collecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.4.5-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.1 MB)
Collecting pillow>=6.2.0
  Downloading Pillow-9.5.0-cp37-cp37m-manylinux_2_17_x86_64.many

In [5]:
import boto3
s3 = boto3.client('s3')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [6]:
amazon_purchases.printSchema()
survey.printSchema()
fields.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Order Date: date (nullable = true)
 |-- Purchase Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Shipping Address State: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ASIN/ISBN (Product Code): string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Survey ResponseID: string (nullable = true)

root
 |-- Survey ResponseID: string (nullable = true)
 |-- Q-demos-age: string (nullable = true)
 |-- Q-demos-hispanic: string (nullable = true)
 |-- Q-demos-race: string (nullable = true)
 |-- Q-demos-education: string (nullable = true)
 |-- Q-demos-income: string (nullable = true)
 |-- Q-demos-gender: string (nullable = true)
 |-- Q-sexual-orientation: string (nullable = true)
 |-- Q-demos-state: string (nullable = true)
 |-- Q-amazon-use-howmany: string (nullable = true)
 |-- Q-amazon-use-hh-size: string (nullable = true)
 |-- Q-amazon-use-how-oft: string (nullable = true)
 |-- Q-substance-use-cigarettes: string 

In [7]:
amazon_purchases = amazon_purchases.withColumnRenamed("Survey ResponseID", "response_id") \
                                   .withColumnRenamed("Field ID", "field_id")
survey = survey.withColumnRenamed("Survey ResponseID", "response_id")
fields = fields.withColumnRenamed("Field ID", "field_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Merge the datasets
merged_data =  amazon_purchases\
    .join(survey, on="response_id",how="inner")

# Display the merged data
merged_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+----------+-----------------------+--------+----------------------+--------------------+------------------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------------+-------------------------+-----------------------+-------------------+---------------------+--------------+--------------------+--------------------+---------------+------------+------------------+
|      response_id|Order Date|Purchase Price Per Unit|Quantity|Shipping Address State|               Title|ASIN/ISBN (Product Code)|     Category|  Q-demos-age|Q-demos-hispanic|        Q-demos-race|Q-demos-education|   Q-demos-income|Q-demos-gender|Q-sexual-orientation|Q-demos-state|Q-amazon-use-howmany|Q-amazon-use-hh-size|Q-amazon-use-how-oft|Q-substance-use-cigarettes|Q-substance-use-marijuana|Q-substance-use-alcohol|Q-pe

#1. Data Preparation

Before analysis, the data needs to be prepared to ensure consistency and efficiency.
- Check for data consistency and ensure all columns are correctly formatted.
- Structure and prepare the dataset for further processing, ensuring that relevant features are retained


In [9]:
from pyspark.sql.functions import sum as spark_sum, col

# Check for missing values in the merged dataset
null_counts=merged_data.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in merged_data.columns])

null_counts.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+-----------------------+--------+----------------------+-----+------------------------+--------+-----------+----------------+------------+-----------------+--------------+--------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------------+-------------------------+-----------------------+-------------------+---------------------+--------------+----------------+--------------------+---------------+------------+------------------+
|response_id|Order Date|Purchase Price Per Unit|Quantity|Shipping Address State|Title|ASIN/ISBN (Product Code)|Category|Q-demos-age|Q-demos-hispanic|Q-demos-race|Q-demos-education|Q-demos-income|Q-demos-gender|Q-sexual-orientation|Q-demos-state|Q-amazon-use-howmany|Q-amazon-use-hh-size|Q-amazon-use-how-oft|Q-substance-use-cigarettes|Q-substance-use-marijuana|Q-substance-use-alcohol|Q-personal-diabetes|Q-personal-wheelchair|Q-life-changes|Q-sell-YOUR-data|Q-sell-consu

#2. Data Cleaning <font color = red>[20 marks]</font> <br>

Prepare the data for further analysis by performing data cleaning such as missing value treatment, handle data schema, outlier analysis, and relevant feature engineering techniques.

## 2.1 Handling Missing values <font color = red>[10 marks]</font> <br>
Handle missing values in the data

In [10]:
# Import necessary functions
from pyspark.sql.functions import when

#  Fill missing (null) values with the appropriate techniques as required by the analysis
# categorial column- fill with 'unknown'
categorical_cols = ['Q-demos-gender','Q-demos-education','Q-demos-race']
merged_data = merged_data.fillna('Unknown',subset=categorical_cols)

numerical_cols = ['Q-demos-age','Q-demos-income','Quantity','Purchase Price Per Unit']
merged_data = merged_data.fillna(0,subset=numerical_cols)

# Aggregate and count missing values (nulls) for each column after replacement
all_columns= merged_data.columns
remaining_columns = list(set(all_columns)-set(categorical_cols)-set(numerical_cols))

remaining_null_counts = merged_data.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in remaining_columns])

# Display the count of remaining missing values in each column

null_dict = remaining_null_counts.first().asDict()
print("Remaining columns with missing (null) values (excluding already filled):")
for col_name,count in null_dict.items():
    if count>0:
        print(f"{col_name}:{count} null(s)")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Remaining columns with missing (null) values (excluding already filled):
Category:89435 null(s)
Q-life-changes:1212958 null(s)
Title:89740 null(s)
ASIN/ISBN (Product Code):951 null(s)
Shipping Address State:86832 null(s)

In [11]:
from pyspark.sql.functions import col, month, year, to_date

# Perform appropriate feature engineering. Eg. Extract order date, month, year and cast to the appropriate values
# 1 convert order date from string to datetype
merged_data = merged_data.withColumn("order_date_parsed",to_date(col("Order Date"),"MM/dd/yyyy"))
# 2 Extract Month and Year

merged_data = merged_data.withColumn("order_month",month(col("order_date_parsed")))\
.withColumn("order_year",year(col("order_date_parsed")))

# Display the updated dataset
merged_data.select("Order Date","order_date_parsed","order_month","order_year").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------------+-----------+----------+
|Order Date|order_date_parsed|order_month|order_year|
+----------+-----------------+-----------+----------+
|2018-12-04|       2018-12-04|         12|      2018|
|2018-12-22|       2018-12-22|         12|      2018|
|2018-12-24|       2018-12-24|         12|      2018|
|2018-12-25|       2018-12-25|         12|      2018|
|2018-12-25|       2018-12-25|         12|      2018|
|2019-02-18|       2019-02-18|          2|      2019|
|2019-02-18|       2019-02-18|          2|      2019|
|2019-04-23|       2019-04-23|          4|      2019|
|2019-04-23|       2019-04-23|          4|      2019|
|2019-05-02|       2019-05-02|          5|      2019|
+----------+-----------------+-----------+----------+
only showing top 10 rows

In [12]:
from pyspark.sql.functions import create_map, lit
from itertools import chain

# Map categorical income to numerical values
income_mapping = {
    'Less than $25,000': 0,
    '$25,000 - $49,999': 1,
    '$50,000 - $74,999': 2,
    '$75,000 - $99,999': 3,
    '$100,000 - $149,999': 4,
    '$150,000 or more': 5
}

income_map_expr = create_map([lit(x) for x in chain (*income_mapping.items())])

merged_data = merged_data.withColumn("Q-demos-income-mapped",income_map_expr[col("Q-demos-income")])

# Map gender to numerical values

gender_mapping = {
    'Male' : 0,
    'Female':1,
    'Non-Binary':2,
    'Unknown':-1
}

gender_map_expr = create_map([lit(x) for x in chain (*gender_mapping.items())])
merged_data = merged_data.withColumn("Q-demos-gender-mapped",gender_map_expr[col("Q-demos-gender")])

# Display the updated dataset


merged_data.select("Q-demos-income","Q-demos-income-mapped","Q-demos-gender","Q-demos-gender-mapped").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+---------------------+--------------+---------------------+
|   Q-demos-income|Q-demos-income-mapped|Q-demos-gender|Q-demos-gender-mapped|
+-----------------+---------------------+--------------+---------------------+
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          Male|                    0|
|$25,000 - $49,999|                    1|          M

## 2.3 Data Cleaning <font color = red>[5 marks]</font> <br>
Handle data cleaning techniques such as data duplication, dropping unnecessary values etc.

In [13]:
# Check for duplicates
print("Number of Duplicates:", merged_data.count() - merged_data.dropDuplicates().count())


# Remove duplicates
merged_data = merged_data.dropDuplicates()


# Verify duplicates after cleaning
print("Number of Duplicates After Cleaning:", merged_data.count() - merged_data.dropDuplicates().count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of Duplicates: 11516
Number of Duplicates After Cleaning: 0

In [14]:
cleaned_data_path = "s3a://customeranalysis123/cleaned_customer_data"

merged_data.write.csv(cleaned_data_path, header=True, mode='overwrite')

# Load the cleaned dataset from the location
cleaned_data = spark.read.csv(cleaned_data_path, header=True, inferSchema=True)

# Display the first few rows
print("Cleaned Data:")
cleaned_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cleaned Data:
+-----------------+----------+-----------------------+--------+----------------------+--------------------+------------------------+----------------+-------------+----------------+------------------+--------------------+-----------------+--------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------------+-------------------------+-----------------------+-------------------+---------------------+--------------------+--------------------+--------------------+---------------+------------+------------------+-----------------+-----------+----------+---------------------+---------------------+
|      response_id|Order Date|Purchase Price Per Unit|Quantity|Shipping Address State|               Title|ASIN/ISBN (Product Code)|        Category|  Q-demos-age|Q-demos-hispanic|      Q-demos-race|   Q-demos-education|   Q-demos-income|Q-demos-gender|Q-sexual-orientation|Q-demos-state|Q-amazon-use-howmany|Q-amazon-u

# 3. Exploratory Data Analysis <font color = red>[55 marks]</font> <br>

## 3.1 Analyse purchases by hour, day and month <font color = red>[5 marks]</font> <br>

Examine overall trends in purchases over time and analyse the trends by hour, day, month.

In [15]:
# Purchase Distribution by Hour, Day, and Month

from pyspark.sql.functions import hour, dayofweek, month
import seaborn as sns
import matplotlib.pyplot as plt

# Extract hour, day, and month
df_time = merged_data.withColumn("hour",hour("order_date_parsed"))\
.withColumn("day_of_week",dayofweek("order_date_parsed"))\
.withColumn("month",month("order_date_parsed"))



# Group and count purchases by time factors

hourly_count = df_time.groupBy("hour").count().orderBy("hour").toPandas()
daily_count = df_time.groupBy("day_of_week").count().orderBy("day_of_week").toPandas()
monthly_count = df_time.groupBy("month").count().orderBy("month").toPandas()


# Convert to Pandas for visualisation



# Plot the data
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
sns.barplot(x="hour",y="count",data=hourly_count,ax=axes[0])
axes[0].set_title("Purchases By Hour")

sns.barplot(x="day_of_week",y="count",data=daily_count,ax=axes[1])
axes[1].set_title("Purchases By Day of Week")
axes[1].set_xticks(range(7))
axes[1].set_xticklabels(['sun','Mon','Tues','Wed','Thu','Fri','Sat'])

sns.barplot(x="month",y="count",data=monthly_count,ax=axes[2])
axes[2].set_title("Purchases By Month")

plt.tight_layout()
plt.savefig("month_count.png")
s3.upload_file('month_count.png', 'customeranalysis123', 'month_count.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Monthly Purchase Trends

from pyspark.sql.functions import date_format

# Extract month and year from 'Order Date'

data_monthly = cleaned_data.withColumn("year_month", date_format("order_date_parsed", "yyyy-MM"))
# Group by month and count purchases

monthly_counts = data_monthly.groupBy("year_month").count().orderBy("year_month")

# Convert to Pandas for visualisation
monthly_counts_pandas = monthly_counts.toPandas()

# Plot
plt.figure(figsize=(16, 7))
sns.lineplot(x="year_month", y="count", data=monthly_counts_pandas, marker="o", color="Blue")

plt.title("Monthly Purchase Trends")
plt.xlabel("Month")
plt.ylabel("Total Purchases")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.savefig("monthly_trend.png")
s3.upload_file('monthly_trend.png', 'customeranalysis123', 'monthly_trend.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Yealy Purchase Trends

from pyspark.sql.functions import date_format

# Group by Year and count purchases
data_yearly = cleaned_data.withColumn("order_year", date_format("order_date_parsed", "yyyy"))
yearly_counts = data_yearly.groupBy("order_year").count().orderBy("order_year")

# Convert to Pandas for visualisation
yearly_counts_pandas = yearly_counts.toPandas()

# Plot
plt.figure(figsize=(10, 5))
sns.barplot(x="order_year", y="count", data=yearly_counts_pandas)

plt.title("Yearly Purchase Trends")
plt.xlabel("Year")
plt.ylabel("Total  Purchases")
plt.grid(axis='y')
plt.tight_layout()
plt.savefig("yearly_trend.png")
s3.upload_file('yearly_trend.png', 'customeranalysis123', 'yearly_trend.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.2 Customer Demographics vs Purchase Frequency <font color = red>[5 marks]</font> <br>
Analyse the trends between the customer deographics and the purchase frequency

In [18]:
# Correlation Between Demographics and Purchase Frequency

from pyspark.sql.functions import count

# Group by demographic attributes and count purchases
gender_counts = cleaned_data.groupBy("Q-demos-gender").count()

age_counts = cleaned_data.groupBy("Q-demos-age").count()
income_counts = cleaned_data.groupBy("Q-demos-income").count()
state_counts = cleaned_data.groupBy("Q-demos-state").count()


# Convert to Pandas for visualisation

gender_counts_pd = gender_counts.toPandas()
age_counts_pd = age_counts.toPandas()
income_counts_pd = income_counts.toPandas()
state_counts_pd = state_counts.toPandas()

# Plot
# Gender
plt.figure(figsize=(6, 4))
sns.barplot(data=gender_counts_pd, x="Q-demos-gender", y="count", palette="pastel")
plt.title("Purchases by Gender")
plt.xlabel("Gender")
plt.ylabel("Number of Purchases")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("gender_trend.png")
s3.upload_file('gender_trend.png', 'customeranalysis123', 'gender_trend.png') 
plt.show()

#Age
plt.figure(figsize=(8, 4))
sns.barplot(data=age_counts_pd, x="Q-demos-age", y="count", palette="Set2")
plt.title("Purchases by Age Group")
plt.xlabel("Age Group")
plt.ylabel("Number of Purchases")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("age_trend.png")
s3.upload_file('age_trend.png', 'customeranalysis123', 'age_trend.png')
plt.show()

#income
plt.figure(figsize=(10, 4))
sns.barplot(data=income_counts_pd, x="Q-demos-income", y="count", palette="Blues_d")
plt.title("Purchases by Income ")
plt.xlabel("Income Range")
plt.ylabel("Number of Purchases")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("income_trend.png")
s3.upload_file('income_trend.png', 'customeranalysis123', 'income_trend.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.3 Purchase behavior weekend vs weekday <font color = red>[5 marks]</font> <br>

Compare the purchase behavior of customer's on weekdays vs. weekends.

In [19]:
# Weekday vs. Weekend Purchase Behavior

from pyspark.sql.functions import col,date_format, when, count,sum,avg,countDistinct
import matplotlib.pyplot as plt

# Define weekdays and weekends

merged_data = merged_data.withColumn("day_of_week",date_format("order_date_parsed","E"))\
.withColumn("day_type",when(col("day_of_week").isin("Sat","sun"),"Weekend").otherwise("Weekday"))


# Group and count purchases

summary_merged_data = merged_data.groupBy("day_type").agg(count("*").alias("total_transactions"),
                                                         sum("Purchase Price Per Unit").alias("total_revenue"),
                                                         avg("Purchase Price Per Unit").alias("average_purchase"),
                                                         countDistinct("response_id").alias("unique_customers"))
# Convert to Pandas for visualisation

summary_pd = summary_merged_data.toPandas().sort_values("day_type")

# Plot for total_transaction,average_purchase and unique customer

plt.figure(figsize=(14,5))

# Total_transaction

plt.subplot(1,3,1)
plt.bar(summary_pd["day_type"],summary_pd["total_transactions"],color=["skyblue","orange"])
plt.title("Total Transactions")
plt.ylabel("Count")

# Average_purchase

plt.subplot(1,3,2)
plt.bar(summary_pd["day_type"],summary_pd["average_purchase"],color=["green","red"])
plt.title("Average Purchase Amount")
plt.ylabel("Amount")

# Unique Customer
plt.subplot(1,3,3)
plt.bar(summary_pd["day_type"],summary_pd["unique_customers"],color=["purple","pink"])
plt.title("Unique Customers")
plt.ylabel("Count")

plt.suptitle("Weekday Vs Weekend Purchase Behavior",fontsize=16)
plt.tight_layout
plt.savefig('Weekday_Vs_Weekend.png')
s3.upload_file('Weekday_Vs_Weekend.png', 'customeranalysis123', 'Weekday_Vs_Weekend.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.4 Frequently purchased product pairs <font color = red>[5 marks]</font> <br>

Analyze how frequently products are purchased together (also known as Market Basket Analysis)


In [20]:
# Frequently Purchased Product Pairs (Market Basket Analysis)

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set , col
from itertools import combinations
from pyspark.sql import Row

# Group purchases by customer and collect items bought together
customer_merged_data = merged_data.groupBy("response_id").agg(collect_set("Title").alias("items"))

# Explode item pairs

pairs_rdd = customer_merged_data.rdd.flatMap(lambda row:[Row(item1=a,item2=b) for a,b in combinations(sorted(row['items'])[:10],2)])

pair_df=spark.createDataFrame(pairs_rdd)


# Count co-occurrences of items
pair_counts = pair_df.groupBy("item1","item2").count().orderBy(col("count").desc())

# Convert to Pandas for visualisation

top_pair_pd = pair_counts.limit(10).toPandas()

# Plot

import matplotlib.pyplot as plt

plt.figure(figsize=(10,6))
plt.barh(
y=[f"{a} & {b}" for a,b in zip(top_pair_pd["item1"],top_pair_pd["item2"])],width=top_pair_pd["count"])
plt.xlabel("Frequency")
plt.title("Top 10 Frequently Product Purchased Pair")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('Top_10_purchased_pair.png')
s3.upload_file('Top_10_purchased_pair.png', 'customeranalysis123', 'Top_10_purchased_pair.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.5 Examine Product Performance <font color = red>[5 marks]</font> <br>

Examine the performance of products by calculating revenue and item popularity.

In [21]:
from pyspark.sql.functions import col, sum as _sum

# Contribution of Product Categories (Top 25)

category_performance= merged_data.groupBy("Category").agg(_sum(col("Purchase Price Per Unit")* col("Quantity")).alias("Total_Revenue"),
                                                         _sum("Quantity").alias("Total_Quantity"))

# Convert to Pandas for visualisation
top_25_categories = category_performance.orderBy(col("Total_Revenue").desc()).limit(25)
top_25_pd= top_25_categories.toPandas()

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(12,7))
top_25_pd["Category"] = top_25_pd["Category"].astype(str)
plt.barh(top_25_pd["Category"],top_25_pd["Total_Revenue"],color='skyblue',label="Revenue")
plt.xlabel("Total_Revenue")
plt.title("Top 25 Product Category By Revenue")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('Top_25_Category_Revenue.png')
s3.upload_file('Top_25_Category_Revenue.png', 'customeranalysis123', 'Top_25_Category_Revenue.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.6 Top products by quantity <font color = red>[5 marks]</font> <br>

Identify the most frequently purchased products.

In [22]:
# Top 10 Products by Quantity

from pyspark.sql.functions import col, sum as _sum
import matplotlib.pyplot as plt

# Group by product title and sum 'Quantity'
top_products_qty= (
    merged_data.groupBy("Title")
    .agg(_sum("Quantity").alias("Total_Quantity"))
    .orderBy(col("Total_Quantity").desc())
    .limit(10) 
)
# Convert to Pandas for visualisation

top_products_pd=top_products_qty.toPandas()

top_products_pd["Title"]= top_products_pd["Title"].astype(str)

# Plot

plt.figure(figsize=(12,6))
plt.barh(top_products_pd["Title"],top_products_pd["Total_Quantity"],color='salmon')
plt.xlabel("Total Quantity Sold")
plt.title("Top 10 Products By Quantity")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('Top_10_Products_by_Quantity.png')
s3.upload_file('Top_10_Products_by_Quantity.png', 'customeranalysis123', 'Top_10_Products_by_Quantity.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.7 Distribution of Purchases by State <font color = red>[5 marks]</font> <br>

Analyze the distribution of purchases across states and categories.

In [23]:
# Distribution of Purchases by State (Top 25)
from pyspark.sql.functions import col, sum as _sum
import matplotlib.pyplot as plt

state_distribution= (
    merged_data.groupBy("Shipping Address State")
    .agg(_sum("Quantity").alias("Total_Quantity"))
    .orderBy(col("Total_Quantity").desc())
    .limit(25) 
)

# Convert to Pandas for visualisation

state_pd = state_distribution.toPandas()

state_pd["Shipping Address State"]= state_pd["Shipping Address State"].astype(str)

# Plot

plt.figure(figsize=(12,6))
plt.barh(state_pd["Shipping Address State"],state_pd["Total_Quantity"],color='lightgreen')
plt.xlabel("Total Quantity Purchased")
plt.title("Top 25 States By Purchase Quantity")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('Top_25_states_by_Quantity.png')
s3.upload_file('Top_25_states_by_Quantity.png', 'customeranalysis123', 'Top_25_states_by_Quantity.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.8 Price vs Product Quantity <font color = red>[5 marks]</font> <br>

Identify the Relationship between Price and Quantity

In [24]:
# Relationship Between Price and Quantity
import matplotlib.pyplot as plt

# Convert to Pandas for visualisation
price_qty_pd = merged_data.select("Purchase Price Per Unit","Quantity").toPandas()


# removes rows with missing value

price_qty_pd = price_qty_pd.dropna()

# Plot

plt.figure(figsize=(10,6))
plt.scatter(price_qty_pd["Purchase Price Per Unit"],price_qty_pd["Quantity"],alpha=0.5,color='purple')
plt.xlabel("Purchase Price Per Unit")
plt.ylabel("Quantity")
plt.title("Relationship Between Price and Quantity")
plt.tight_layout()
plt.savefig('Price_Vs_Quantity.png')
s3.upload_file('Price_Vs_Quantity.png', 'customeranalysis123', 'Price_Vs_Quantity.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.9 Analyse the spending KPIs <font color = red>[5 marks]</font> <br>


A popular KPI is average spend per customer. Calculate this metric as the ratio of total transaction amount from non-recurring payments divided by the total number of customers who made a purchase

In [25]:
# Average Spend per Customer

from pyspark.sql.functions import avg, col, sum as _sum, round

import matplotlib.pyplot as plt

merged_data= merged_data.withColumn("Total_Spend",col("Purchase Price Per Unit")*col("Quantity"))

# Group by customer and calculate average spend

avg_spend_by_gender = merged_data.groupBy("Q-demos-gender")\
     .agg(round(avg("Total_Spend"),2).alias("Avg_Total_Spend"))\
     .orderBy("Avg_Total_Spend",ascending = False)

        
avg_spend_by_education= merged_data.groupBy("Q-demos-education")\
     .agg(round(avg("Total_Spend"),2).alias("Avg_Total_Spend"))\
     .orderBy("Avg_Total_Spend",ascending = False)
        
avg_spend_by_income= merged_data.groupBy("Q-demos-income")\
     .agg(round(avg("Total_Spend"),2).alias("Avg_Total_Spend"))\
     .orderBy("Avg_Total_Spend",ascending = False)

# Convert to Pandas for visualisation
avg_spend_pd = avg_spend_by_gender.toPandas()

# Plot
plt.figure(figsize=(6,4))
plt.bar(avg_spend_pd["Q-demos-gender"],avg_spend_pd["Avg_Total_Spend"],color='mediumseagreen')
plt.xlabel("Gender")
plt.ylabel("Average Spend")
plt.title("Average Spend Per Customer By Gender")
plt.tight_layout()
plt.savefig('Average_Spend_Per_Customer_by_gender.png')
s3.upload_file('Average_Spend_Per_Customer_by_gender.png', 'customeranalysis123', 'Average_Spend_Per_Customer_by_gender.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Analyse the Repeat Purchase Behavior of Customers

In [26]:
# Repeat Purchase Analysis Behavior Per Customers
from pyspark.sql.functions import count, col, when, avg, round
import matplotlib.pyplot as plt

# Step 1: Calculate Total Spend per row
merged_data = merged_data.withColumn(
    "Total_Spend",
    col("Purchase Price Per Unit") * col("Quantity")
)

# Step 2: Count how many purchases each customer made
purchase_counts = merged_data.groupBy("response_id")\
    .agg(count("*").alias("num_purchases"))

# Step 3: Label each customer as Repeat or One-time buyer
purchase_counts = purchase_counts.withColumn(
    "purchase_type",
    when(col("num_purchases") > 1, "Repeat").otherwise("One-time")
)

# Step 4: Join this info back to main data
merged_with_type = merged_data.join(purchase_counts, on="response_id", how="inner")

# Step 5: Group by purchase_type to get count of customers and average spend
repeat_stats = merged_with_type.groupBy("purchase_type") \
    .agg(
        count("response_id").alias("num_customers"),
        round(avg("Total_Spend"), 2).alias("avg_spend")
    )

# Step 6: Convert to Pandas for visualization
repeat_stats_pd = repeat_stats.toPandas()

# === Plot 1: Distribution of Repeat vs One-time Customers ===
plt.bar(repeat_stats_pd["purchase_type"], repeat_stats_pd["num_customers"], color='steelblue')
plt.title("Customer Distribution by Purchase Behavior")
plt.xlabel("Purchase Type")
plt.ylabel("Number of Customers")
plt.tight_layout()
plt.savefig('Customer_Distribution_Purchase_Behavior.png')
s3.upload_file('Customer_Distribution_Purchase_Behavior.png', 'customeranalysis123', 'Customer_Distribution_Purchase_Behavior.png')
plt.show()

# === Plot 2: Average Spend by Purchase Type ===
plt.figure(figsize=(6, 4))
plt.bar(repeat_stats_pd["purchase_type"], repeat_stats_pd["avg_spend"], color='seagreen')
plt.title("Average Spend: Repeat vs One-time Buyers")
plt.xlabel("Purchase Type")
plt.ylabel("Average Spend ($)")
plt.tight_layout()
plt.savefig('Average_Spend_By_Purchase_Type.png')
s3.upload_file('Average_Spend_By_Purchase_Type.png', 'customeranalysis123', 'Average_Spend_By_Purchase_Type.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Analyse the top 10 high-engagement customers

In [27]:
# Top 10 High-Engagement Customers

from pyspark.sql.functions import col, sum as spark_sum
import matplotlib.pyplot as plt

# Step 1: Calculate Total Spend column
merged_data = merged_data.withColumn(
    "Total_Spend",
    col("Purchase Price Per Unit") * col("Quantity")
)

# Step 2: Aggregate total spend per customer
customer_spend = merged_data.groupBy("response_id") \
    .agg(
        spark_sum("Total_Spend").alias("total_spend")
    )

# Step 3: Get Top 10 High-Spending Customers
top_customers = customer_spend.orderBy(col("total_spend").desc()).limit(10)

# Step 4: Convert to Pandas for plotting
top_customers_pd = top_customers.toPandas()

# Step 5: Plot
plt.figure(figsize=(10, 6))
plt.barh(top_customers_pd["response_id"].astype(str), top_customers_pd["total_spend"], color='darkorange')
plt.xlabel("Total Spend ($)")
plt.ylabel("Customer ID (response_id)")
plt.title("Top 10 High-Engagement Customers by Total Spend")
plt.gca().invert_yaxis()  # Highest spender on top
plt.tight_layout()
plt.savefig("Top_10_High_Engagement_Customers.png")
s3.upload_file("Top_10_High_Engagement_Customers.png", 'customeranalysis123', 'Top_10_High_Engagement_Customers.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.10 Seasonal trends in product purchases and their impact on revenues <font color = red>[5 marks]</font> <br>

Investigate the seasonal trends in product purchases and their impact on the overall revenue.

In [28]:
# Seasonal Trends in Product Purchases and Their Impact on Revenue

from pyspark.sql.functions import year, month, col, sum as spark_sum
import matplotlib.pyplot as plt

merged_data = merged_data.withColumn(
    "Total_Spend",
    col("Purchase Price Per Unit") * col("Quantity")
)

#  Extract year and month
merged_data = merged_data.withColumn("year", year(col("Order Date"))) \
                         .withColumn("month", month(col("Order Date")))

# Group by year and month, summing total revenue
monthly_revenue = merged_data.groupBy("year", "month") \
    .agg(spark_sum("Total_Spend").alias("monthly_revenue")) \
    .orderBy("year", "month")


# Convert to Pandas for visualisation
monthly_revenue_pd = monthly_revenue.toPandas()

# Plot
plt.figure(figsize=(12, 6))

for yr in sorted(monthly_revenue_pd["year"].unique()):
    data = monthly_revenue_pd[monthly_revenue_pd["year"] == yr]
    plt.plot(data["month"], data["monthly_revenue"], marker='o', label=str(yr))

plt.title("Seasonal Trends in Monthly Revenue")
plt.xlabel("Month")
plt.ylabel("Revenue ($)")
plt.xticks(range(1, 13))
plt.legend(title="Year")
plt.grid(True)
plt.tight_layout()
plt.savefig("Seasonal_Trends_Revenue.png")
s3.upload_file("Seasonal_Trends_Revenue.png", 'customeranalysis123', 'Seasonal_Trends_Revenue.png')
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.11 Customer location vs purchasing behavior <font color = red>[5 marks]</font> <br>

Examine the relationship between customer's location and their purchasing behaviors

In [29]:

from pyspark.sql.functions import col, sum as spark_sum
import matplotlib.pyplot as plt

merged_data = merged_data.withColumn(
    "Total_Spend", 
    col("Purchase Price Per Unit") * col("Quantity")
)


# Relationship Between Customer Location and Purchase Behavior

# Group purchases by state and total spend

location_spend = merged_data.groupBy("Q-demos-state") \
    .agg(spark_sum("Total_Spend").alias("Total_Revenue")) \
    .orderBy(col("Total_Revenue").desc())

# Convert to Pandas for visualisation

location_spend_pd = location_spend.toPandas()

# Plot revenue by state

top_states = location_spend_pd.head(10)

plt.figure(figsize=(10, 6))
plt.barh(top_states["Q-demos-state"], top_states["Total_Revenue"], color="slateblue")
plt.xlabel("Total Revenue ($)")
plt.ylabel("State")
plt.title("Top 10 States by Total Purchase Revenue")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig("Revenue_By_Location.png")
s3.upload_file("Revenue_By_Location.png", "customeranalysis123", "Revenue_By_Location.png")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#4. Customer Segmentation and Insights <font color = red>[45 marks]</font> <br>


## 4.1 Perform RFM Analysis <font color = red>[10 marks]</font> <br>

RFM Analysis is a powerful customer segmentation technique used to evaluate and quantify customer value based on three key dimensions:
- **Recency**,
- **Frequency**,
- **Monetary**.

This method is particularly effective in identifying high-value customers, optimizing marketing strategies, and improving customer retention in the e-commerce industry.


### 1. Recency (R)
Recency measures how recently a customer made a purchase Customers who have purchased more recently are more likely to respond to promotions and offers.
- **Application:** By ranking customers based on the number of days since their last transaction, you can prioritize those who are most engaged.

### 2. Frequency (F)
Frequency counts the number of purchases a customer has made over a given period.
Frequent purchasers tend to be more loyal and are often a source of recurring revenue.
- **Application:** Analyzing purchase frequency helps in identifying consistent buyers and understanding their buying patterns.

### 3. Monetary (M)
Monetary value represents the total amount of money a customer has spent.
Customers who spend more are often more profitable, making them ideal targets for retention and upsell strategies.
- **Application:** By assessing the monetary contribution, you can distinguish between high-value and low-value customers.


### Prepare data for RFM Analysis <font color = red>[2 marks]</font> <br>


In [30]:
from pyspark.sql.functions import datediff, max as spark_max, count, sum as spark_sum, col, lit, to_date

merged_data = merged_data.withColumn(
    "Order Date", to_date(col("Order Date"), "yyyy-MM-dd")
)

merged_data = merged_data.withColumn(
    "Total_Spend", col("Purchase Price Per Unit") * col("Quantity")
)

# Get the latest order date in the dataset

max_date = merged_data.agg(spark_max("Order Date").alias("max_date")).collect()[0]["max_date"]

# Calculate RFM metrics

rfm_df = merged_data.groupBy("response_id").agg(
    datediff(lit(max_date), spark_max("Order Date")).alias("Recency"),
    count("*").alias("Frequency"),
    spark_sum("Total_Spend").alias("Monetary")
)

# Filter out customers with no purchases
rfm_df = rfm_df.filter(col("Monetary") > 0)

# Show RFM data

rfm_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
from pyspark.sql.functions import log1p
import pandas as pd

# Apply log transformation to skewed features

rfm_log = rfm_df.withColumn("log_Recency", log1p(col("Recency"))) \
                .withColumn("log_Frequency", log1p(col("Frequency"))) \
                .withColumn("log_Monetary", log1p(col("Monetary")))

# Convert to Pandas DataFrame (for scikit-learn compatibility)

rfm_pd = rfm_log.select("log_Recency", "log_Frequency", "log_Monetary").toPandas()

# Scale features using StandardScaler
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
rfm_scaled = scaler.fit_transform(rfm_pd)
rfm_scaled_df = pd.DataFrame(rfm_scaled, columns=["Recency_scaled", "Frequency_scaled", "Monetary_scaled"])
rfm_scaled_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   Recency_scaled  Frequency_scaled  Monetary_scaled
0       -0.139780          0.482647         0.398730
1       -0.622933         -0.556361        -0.258022
2       -0.407676          0.261067         0.329777
3       -0.709675          1.518122         1.430251
4       -0.797902         -0.211111        -0.090658

In [32]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

wcss = []

# Calculate the Within-Cluster Sum of Squares (WCSS)

for i in range(1, 11):
    kmeans = KMeans(n_clusters=i, random_state=42, n_init=10)
    kmeans.fit(rfm_scaled)
    wcss.append(kmeans.inertia_)

# Plot the elbow curve with the number of clusters on the x-axis and WCSS on the y-axis

plt.figure(figsize=(8, 5))
plt.plot(range(1, 11), wcss, marker='o', linestyle='--', color='teal')
plt.title('Elbow Method - Optimal Number of Clusters')
plt.xlabel('Number of Clusters')
plt.ylabel('WCSS (Within-Cluster Sum of Squares)')
plt.xticks(range(1, 11))
plt.grid(True)
plt.tight_layout()
plt.savefig("elbow_curve_rfm.png")
s3.upload_file("elbow_curve_rfm.png", "customeranalysis123", "elbow_curve_rfm.png")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
from sklearn.cluster import KMeans
import pandas as pd

# Fit the K-Means model using the optimal number of clusters obtained after understanding the elblow plot

kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
rfm_pd['Cluster'] = kmeans.fit_predict(rfm_scaled)

print(rfm_pd.head())

# Add the assigned cluster labels to the Pandas DataFrame and convert back to PySpark if needed

rfm_clustered_spark = spark.createDataFrame(rfm_pd)

rfm_clustered_spark.show(10, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   log_Recency  log_Frequency  log_Monetary  Cluster
0     6.383507       5.866468      8.929807        0
1     6.289716       4.574711      8.098199        0
2     6.331502       5.590987      8.842496        0
3     6.272877       7.153834     10.235964        2
4     6.255750       5.003946      8.310122        0
+------------------+------------------+------------------+-------+
|log_Recency       |log_Frequency     |log_Monetary      |Cluster|
+------------------+------------------+------------------+-------+
|6.3835066348840055|5.8664680569332965|8.929807350718853 |0      |
|6.289715570908998 |4.574710978503383 |8.098198976267856 |0      |
|6.331501849893691 |5.5909869805108565|8.842496140272365 |0      |
|6.272877006546167 |7.153833801578843 |10.235963959309123|2      |
|6.255750041753367 |5.003946305945459 |8.310122280422137 |0      |
|6.459904454377535 |5.934894195619588 |8.745569428320573 |0      |
|6.285998094508865 |6.246106765481563 |9.152696428521098 |2      |
|6.270988431

In [34]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Convert the full RFM dataset from PySpark DataFrame to Pandas DataFrame for visualisation

rfm_pd = rfm_df.select("Recency", "Frequency", "Monetary").toPandas()

# Log transformation to reduce skewness
rfm_pd["Recency_log"] = np.log1p(rfm_pd["Recency"])
rfm_pd["Frequency_log"] = np.log1p(rfm_pd["Frequency"])
rfm_pd["Monetary_log"] = np.log1p(rfm_pd["Monetary"])

# Generate a pairplot to visualise the relationships between the numeric RFM columns
sns.pairplot(rfm_pd[["Recency_log", "Frequency_log", "Monetary_log"]], diag_kind="kde")
plt.suptitle("RFM Feature Relationships (Log Transformed)", y=1.02)
plt.tight_layout()
plt.savefig("RFM_Pairplot.png")
s3.upload_file("RFM_Pairplot.png", "customeranalysis123", "RFM_Pairplot.png")
plt.show()
plt.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Behavioral Trends Analysis <font color = red>[8 marks]</font> <br>

Perform RFM analysis to study the behavior of customers to tailor marketing strategies

In [35]:
# Import necessary PySpark functions for data processing

from pyspark.sql.functions import col, to_date, max as spark_max, datediff, countDistinct, sum as spark_sum, lit
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import seaborn as sns

# Group the dataset by 'Survey ResponseID' to calculate RFM (Recency, Frequency, Monetary) metrics
merged_data = merged_data.withColumn("Order Date", to_date(col("Order Date"), "yyyy-MM-dd"))
merged_data = merged_data.withColumn("Total_Spend", col("Purchase Price Per Unit") * col("Quantity"))

max_date = merged_data.agg(spark_max("Order Date").alias("max_date")).collect()[0]["max_date"]

# Compute 'Recency' as the difference between the latest date and the most recent order date

# Compute 'Frequency' as the count of distinct product purchases (ASIN/ISBN)

# Compute 'Monetary' as the total spending sum for each customer


# By Assuming 'Title' is the product identifier instead of 'ASIN/ISBN'
rfm_df = merged_data.groupBy("response_id").agg(
    datediff(lit(max_date), spark_max("Order Date")).alias("Recency"),
    countDistinct("Title").alias("Frequency"),
    spark_sum("Total_Spend").alias("Monetary")
).filter(col("Monetary") > 0)


rfm_pd = rfm_df.toPandas()

# Rename columns if required and normlise the distributions

rfm_pd["Recency_log"] = np.log1p(rfm_pd["Recency"])
rfm_pd["Frequency_log"] = np.log1p(rfm_pd["Frequency"])
rfm_pd["Monetary_log"] = np.log1p(rfm_pd["Monetary"])

# Convert the processed RFM dataset back to Pandas for sklearn compatibility for clustering

scaler = StandardScaler()
rfm_scaled = scaler.fit_transform(rfm_pd[["Recency_log", "Frequency_log", "Monetary_log"]])

rfm_scaled_df = pd.DataFrame(rfm_scaled, columns=["Recency_scaled", "Frequency_scaled", "Monetary_scaled"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# Apply K-Means clustering

kmeans = KMeans(n_clusters=4, random_state=42)


# Fit the K-Means model and predict cluster labels for each customer

rfm_scaled_df["Cluster"] = kmeans.fit_predict(rfm_scaled_df)

# Add the predicted cluster labels to the Pandas DataFrame

rfm_pd["Cluster"] = rfm_scaled_df["Cluster"]

# Convert the Pandas DataFrame back to a PySpark DataFrame

rfm_spark_df = spark.createDataFrame(rfm_pd)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Analyse the Cluster Distribution by Income <font color = red>[2 marks]</font> <br>


In [37]:
print(rfm_spark_df.columns)
print(survey.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['response_id', 'Recency', 'Frequency', 'Monetary', 'Recency_log', 'Frequency_log', 'Monetary_log', 'Cluster']
['response_id', 'Q-demos-age', 'Q-demos-hispanic', 'Q-demos-race', 'Q-demos-education', 'Q-demos-income', 'Q-demos-gender', 'Q-sexual-orientation', 'Q-demos-state', 'Q-amazon-use-howmany', 'Q-amazon-use-hh-size', 'Q-amazon-use-how-oft', 'Q-substance-use-cigarettes', 'Q-substance-use-marijuana', 'Q-substance-use-alcohol', 'Q-personal-diabetes', 'Q-personal-wheelchair', 'Q-life-changes', 'Q-sell-YOUR-data', 'Q-sell-consumer-data', 'Q-small-biz-use', 'Q-census-use', 'Q-research-society']

In [38]:
#Trend 1: Cluster Distribution by Income

# Import the necessary function for counting records in PySpark

from pyspark.sql.functions import count
import seaborn as sns
import matplotlib.pyplot as plt


# Join the RFM dataset with the survey dataset using a common key

survey_income = survey.select("response_id", "Q-demos-income").dropDuplicates()

rfm_with_income = rfm_spark_df.join(survey_income, on="response_id", how="inner")


# Aggregate data to count the number of customers per Cluster-Income group

cluster_income_counts = rfm_with_income.groupBy("Cluster", "Q-demos-income").agg(
    count("*").alias("Customer_Count")
)

# Convert the aggregated data from PySpark DataFrame to Pandas DataFrame for visualisation

cluster_income_pd = cluster_income_counts.toPandas()

# Plot
if not cluster_income_pd.empty:
    plt.figure(figsize=(12, 6))
    sns.barplot(data=cluster_income_pd, x="Q-demos-income", y="Customer_Count", hue="Cluster")
    plt.title("Customer Cluster Distribution by Income Group")
    plt.xlabel("Income Group")
    plt.ylabel("Number of Customers")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig("Cluster_vs_Income.png")
    s3.upload_file("Cluster_vs_Income.png", "customeranalysis123", "Cluster_vs_Income.png")
    plt.show()
else:
    print("No data available to plot. Check if join returned empty result.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<Figure size 1200x600 with 0 Axes>
<AxesSubplot:xlabel='Q-demos-income', ylabel='Customer_Count'>
Text(0.5, 1.0, 'Customer Cluster Distribution by Income Group')
Text(0.5, 0, 'Income Group')
Text(0, 0.5, 'Number of Customers')
(array([0, 1, 2, 3, 4, 5, 6]), [Text(0, 0, '$75,000 - $99,999'), Text(1, 0, '$25,000 - $49,999'), Text(2, 0, '$150,000 or more'), Text(3, 0, 'Less than $25,000'), Text(4, 0, '$100,000 - $149,999'), Text(5, 0, '$50,000 - $74,999'), Text(6, 0, 'Prefer not to say')])

Analyse the Average Spending by Cluster <font color = red>[2 marks]</font> <br>


In [39]:
#Trend 2: Average Spending by Cluster

# Import the required function for calculating averages in PySpark

from pyspark.sql.functions import avg
import matplotlib.pyplot as plt
import seaborn as sns

# Compute the average values of 'Recency_log', 'Frequency', and 'Monetary_log' for each customer cluster

cluster_avg = rfm_spark_df.groupBy("Cluster").agg(
    avg("Recency_log").alias("Avg_Recency_log"),
    avg("Frequency").alias("Avg_Frequency"),
    avg("Monetary_log").alias("Avg_Monetary_log")
)

# Convert the aggregated cluster summary from PySpark DataFrame to Pandas DataFrame for visualisation

cluster_avg_pd = cluster_avg.toPandas()

# Generate a bar plot to visualise the average monetary spending per cluster
plt.figure(figsize=(10, 6))
sns.barplot(data=cluster_avg_pd, x="Cluster", y="Avg_Monetary_log", palette="viridis")
plt.title("Average Monetary Spending by Cluster")
plt.xlabel("Cluster")
plt.ylabel("Average Log Monetary Value")
plt.tight_layout()
plt.savefig("Avg_Monetary_Spending_by_Cluster.png")
s3.upload_file("Avg_Monetary_Spending_by_Cluster.png", "customeranalysis123", "Avg_Monetary_Spending_by_Cluster.png")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Analyse the Purchase Frequency vs. Recency <font color = red>[2 marks]</font> <br>


In [40]:
import matplotlib.pyplot as plt
import seaborn as sns
#Trend 3: Purchase Frequency vs. Recency

# Convert the RFM dataset from PySpark DataFrame to Pandas DataFrame for visualisation

rfm_pd = rfm_spark_df.toPandas()

# Generate a scatter plot to analyse the relationship between Purchase Frequency and Recency

plt.figure(figsize=(10, 6))
sns.scatterplot(data=rfm_pd, x="Recency", y="Frequency", hue="Cluster", palette="deep")
plt.title("Purchase Frequency vs. Recency")
plt.xlabel("Recency (Days since Last Purchase)")
plt.ylabel("Purchase Frequency")
plt.legend(title="Cluster")
plt.tight_layout()
plt.savefig("Frequency_vs_Recency.png")
s3.upload_file("Frequency_vs_Recency.png", "customeranalysis123", "Frequency_vs_Recency.png")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



Analyse the top categories by clusters <font color = red>[2 marks]</font> <br>


In [41]:
#Trend 4: Top Categories by Cluster

# Import the necessary function to calculate the sum in PySpark

from pyspark.sql.functions import sum as _sum, row_number
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns


# Join the merged dataset with the RFM dataset to associate each customer with their respective cluster

merged_with_cluster = merged_data.join(
    rfm_spark_df.select("response_id", "Cluster"),
    on="response_id",
    how="inner"
)

# Group the filtered data by 'Category' and compute the total spending in each category
category_cluster_spending = merged_with_cluster.groupBy("Cluster", "Category").agg(
    _sum("Total_Spend").alias("Total_Spend")
)

# Order the categories by total spending in descending order and select the top 5 highest spending categories

windowSpec = Window.partitionBy("Cluster").orderBy(category_cluster_spending["Total_Spend"].desc())

top_categories = category_cluster_spending.withColumn(
    "rank", row_number().over(windowSpec)
).filter("rank <= 5")

# Convert the top categories dataset from PySpark DataFrame to Pandas DataFrame for visualisation

top_categories_pd = top_categories.toPandas()

top_categories_pd = top_categories_pd.sort_values(by=["Cluster", "Total_Spend"], ascending=[True, False])


# Plot the cluster
if top_categories_pd.empty:
    print("No top category data available to plot.")
else:
    plt.figure(figsize=(12, 6))
    sns.barplot(data=top_categories_pd, x="Category", y="Total_Spend", hue="Cluster")
    plt.title("Top 5 Spending Categories by Cluster")
    plt.xlabel("Category")
    plt.ylabel("Total Spending")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig("Top_Categories_by_Cluster.png")
    s3.upload_file("Top_Categories_by_Cluster.png", "customeranalysis123", "Top_Categories_by_Cluster.png")
    plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<Figure size 1200x600 with 0 Axes>
<AxesSubplot:xlabel='Category', ylabel='Total_Spend'>
Text(0.5, 1.0, 'Top 5 Spending Categories by Cluster')
Text(0.5, 0, 'Category')
Text(0, 0.5, 'Total Spending')
(array([0, 1, 2, 3, 4, 5, 6]), [Text(0, 0, 'ABIS_BOOK'), Text(1, 0, 'GIFT_CARD'), Text(2, 0, 'HEADPHONES'), Text(3, 0, 'SHOES'), Text(4, 0, 'PERSONAL_COMPUTER'), Text(5, 0, 'PET_FOOD'), Text(6, 0, 'NUTRITIONAL_SUPPLEMENT')])

## 4.2 Insights <font color = red>[35 marks]</font> <br>


### 4.2.1 When to schedule effective promotions. <font color = red>[3 marks]</font> <br>

Compare sales across weekdays to schedule effective promotions

In [42]:
print(merged_data.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['response_id', 'Order Date', 'Purchase Price Per Unit', 'Quantity', 'Shipping Address State', 'Title', 'ASIN/ISBN (Product Code)', 'Category', 'Q-demos-age', 'Q-demos-hispanic', 'Q-demos-race', 'Q-demos-education', 'Q-demos-income', 'Q-demos-gender', 'Q-sexual-orientation', 'Q-demos-state', 'Q-amazon-use-howmany', 'Q-amazon-use-hh-size', 'Q-amazon-use-how-oft', 'Q-substance-use-cigarettes', 'Q-substance-use-marijuana', 'Q-substance-use-alcohol', 'Q-personal-diabetes', 'Q-personal-wheelchair', 'Q-life-changes', 'Q-sell-YOUR-data', 'Q-sell-consumer-data', 'Q-small-biz-use', 'Q-census-use', 'Q-research-society', 'order_date_parsed', 'order_month', 'order_year', 'Q-demos-income-mapped', 'Q-demos-gender-mapped', 'day_of_week', 'day_type', 'Total_Spend', 'year', 'month']

In [43]:
#Compare sales across weekdays to schedule effective promotions
from pyspark.sql.functions import dayofweek, sum as _sum, to_date, col, when, date_format
import matplotlib.pyplot as plt
import seaborn as sns

# Extract day of the week (1 = Sunday, 7 = Saturday)
merged_data = merged_data.withColumn("order_date_parsed", to_date(col("Order Date"), "MM/dd/yyyy"))
merged_data = merged_data.withColumn("Weekday", dayofweek("order_date_parsed"))

# Step 2: Classify as 'Weekday' or 'Weekend'
merged_data = merged_data.withColumn(
    "day_type",
    when(col("Weekday").isin(1, 7), "Weekend").otherwise("Weekday")
)

# Group by weekday and sum total sales
weekday_sales = merged_data.groupBy("Weekday").agg(
    _sum("Total_Spend").alias("Total_Sales")
).orderBy("Weekday")

# Convert to pandas DataFrame
weekday_sales_pd = weekday_sales.toPandas()

# label
weekday_labels = {
    1: "Sunday", 2: "Monday", 3: "Tuesday", 4: "Wednesday",
    5: "Thursday", 6: "Friday", 7: "Saturday"
}
weekday_sales_pd["Weekday_Name"] = weekday_sales_pd["Weekday"].map(weekday_labels)

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(data=weekday_sales_pd, x="Weekday_Name", y="Total_Sales", order=[
    "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"
])
plt.title("Total Sales by Weekday")
plt.xlabel("Weekday")
plt.ylabel("Total Sales")
plt.tight_layout()
plt.savefig("Sales_by_Weekday.png")
s3.upload_file("Sales_by_Weekday.png", "customeranalysis123", "Sales_by_Weekday.png")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2.2 Top-selling Products <font color = red>[2 marks]</font> <br>

Identify top-selling products by considering revenue and engagement metrics

In [44]:
from pyspark.sql.functions import sum as _sum
import matplotlib.pyplot as plt
import seaborn as sns


#Identify top-selling products using revenue and engagement metrics

# Group by product and sum revenue

top_products = merged_data.groupBy("Title").agg(
    _sum("Total_Spend").alias("Total_Revenue")
)

# Get top 10 products by revenue

top_10_products = top_products.orderBy(col("Total_Revenue").desc()).limit(10)

# Convert to Pandas for visualisation

top_10_products_pd = top_10_products.toPandas()

top_10_products_pd = top_10_products_pd.sort_values(by="Total_Revenue", ascending=False)


# Plot top products by revenue

plt.figure(figsize=(12, 6))
sns.barplot(data=top_10_products_pd, x="Total_Revenue", y="Title", palette="viridis")
plt.title("Top 10 Best-Selling Products by Revenue")
plt.xlabel("Total Revenue")
plt.ylabel("Product Title")
plt.tight_layout()
plt.savefig("Top_10_Products_Revenue.png")
s3.upload_file("Top_10_Products_Revenue.png", "customeranalysis123", "Top_10_Products_Revenue.png")
plt.show()
plt.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### 4.2.3 State-wise revenue Distribution <font color = red>[5 marks]</font> <br>

Assess state-wise revenue to focus on high-growth areas

In [45]:
#Assess state-wise revenue to focus on high-growth areas
from pyspark.sql.functions import sum as _sum
import matplotlib.pyplot as plt
import seaborn as sns

merged_data = merged_data.withColumn(
    "Total_Spend", 
    col("Purchase Price Per Unit") * col("Quantity")
)


# Group by state and sum revenue
state_revenue = merged_data.groupBy("Q-demos-state").agg(
    _sum("Total_Spend").alias("Total_Revenue")
).withColumnRenamed("Q-demos-state", "State")

# Convert to Pandas for visualisation

state_revenue_pd = state_revenue.toPandas().sort_values(by="Total_Revenue", ascending=False)


# Plot revenue by state

plt.figure(figsize=(14, 6))
sns.barplot(data=state_revenue_pd, x="State", y="Total_Revenue", palette="mako")
plt.title("Revenue by State")
plt.xlabel("State")
plt.ylabel("Total Revenue")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.savefig("Revenue_by_State.png")
s3.upload_file("Revenue_by_State.png", "customeranalysis123", "Revenue_by_State.png")
plt.show()
plt.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2.4 Repeat Purchase Behavior <font color = red>[5 marks]</font> <br>

Examine repeat purchase behavior to enhance retention initiatives.

In [46]:
#Examine repeat purchase behavior to enhance retention initiatives

from pyspark.sql.functions import count, col

# Count total purchases per customer

customer_purchases = merged_data.groupBy("response_id").agg(
    count("*").alias("total_purchases")
)

# Filter for repeat customers (those with more than one purchase

repeat_customers = customer_purchases.filter(col("total_purchases") > 1)

# Show sample data

repeat_customers.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+---------------+
|      response_id|total_purchases|
+-----------------+---------------+
|R_2xLvxRKX2448l9k|            352|
|R_2eP9RaoDUn9qgIR|             96|
|R_s4I8huB9BOcWI5H|            267|
|R_1dKBjc0UTA0ZiYb|           1278|
|R_10r2RIJOCZFOra8|            148|
|R_2BaEdBiHdzJwaqI|            377|
|R_2QrmzNCNpcA7B9p|            515|
|R_3NFhCSnuJa54Jn3|            476|
|R_30tLi6WqLJxToYS|            185|
|R_OGUcDEj85m0paaR|            154|
+-----------------+---------------+
only showing top 10 rows

### 4.2.5 Flagging Potential Fraud <font color = red>[5 marks]</font> <br>

Identify irregular transaction patterns to flag potential fraud.

In [51]:
#Identify irregular transaction patterns to flag potential fraud

from pyspark.sql.functions import col, avg, stddev

# Calculate the threshold for unusually high spendingstats = merged_data.select(avg("Total_Spend").alias("mean_spend"),
stats = merged_data.select(
    avg("Total_Spend").alias("mean_spend"),
    stddev("Total_Spend").alias("std_spend")
).collect()[0]

mean_spend = stats["mean_spend"]
std_spend = stats["std_spend"]

# Consider spending to be unusually high if the total spent is greater than the mean + 3 * std dev

threshold = mean_spend + 3 * std_spend

# Filter transactions that exceed the threshold

suspicious_transactions = merged_data.filter(col("Total_Spend") > threshold)

# Show suspicious transactions

suspicious_transactions.select("response_id", "Title", "Total_Spend", "Q-demos-state").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+--------------------+-----------+-------------+
|      response_id|               Title|Total_Spend|Q-demos-state|
+-----------------+--------------------+-----------+-------------+
|R_1DonBwcSlvKQtt9|Acer Aspire 5 Sli...|     313.41|        Texas|
|R_1FzkhYhOpeojsYt|ASUS AM4 TUF Gami...|      189.0|       Oregon|
|R_1QLfbB3Lho273wG|Barbie DreamHouse...|      179.0|        Texas|
|R_1eEsLZ6SYLsIG0s|2 Pcs 8 x 3 x 1 F...|     179.99|     New York|
|R_1lgeraQD2mZ2SDP|Full-Automatic Wa...|     199.99|     Kentucky|
|R_1M5mnTcAVcSRcli|                null|     205.56|     Kentucky|
|R_1cYOJAbPFoatHTg|                null|     289.99|      Georgia|
|R_1hMV4AHCk04WO5V|INSIGNIA 32-inch ...|     199.99|New Hampshire|
|R_1i7Zoa74FWb4o89|VASAGLE TV Stand ...|     219.99|         Ohio|
|R_1mkjLIewa5Obhi4|Paging Zone-Canon...|      499.0|     Missouri|
+-----------------+--------------------+-----------+-------------+
only showing top 10 rows

### 4.2.6 Demand Variations across product categories <font color = red>[5 marks]</font> <br>

Perform inventory management by monitoring demand variations across product categories.

In [48]:
#Monitor demand variations across product categories (Top 25) for inventory management

from pyspark.sql.functions import col, sum
import matplotlib.pyplot as plt
import seaborn as sns

# Group by category and month, summing total revenue

from pyspark.sql.functions import to_date, month, year

merged_data = merged_data.withColumn("order_date_parsed", to_date(col("Order Date"), "MM/dd/yyyy"))
merged_data = merged_data.withColumn("order_month", month(col("order_date_parsed")))
merged_data = merged_data.withColumn("order_year", year(col("order_date_parsed")))


# Compute total revenue per category

category_trends = merged_data.groupBy("Category", "order_year", "order_month").agg(
    _sum("Total_Spend").alias("Total_Revenue")
)

category_total = category_trends.groupBy("Category").agg(
    _sum("Total_Revenue").alias("Category_Total_Revenue")
)

# Get the top 25 categories by total revenue

top_25_categories = category_total.orderBy(col("Category_Total_Revenue").desc()).limit(25)


# Filter category_trends to include only top 25 categories

top_25_category_list = [row["Category"] for row in top_25_categories.collect()]
category_trends_filtered = category_trends.filter(col("Category").isin(top_25_category_list))


# Convert to Pandas for visualisation

category_trends_pd = category_trends_filtered.toPandas()

category_trends_pd["Month_Year"] = category_trends_pd["order_year"].astype(str) + "-" + \
                                    category_trends_pd["order_month"].astype(str).str.zfill(2)


# Plot revenue trends for top 25 categories

plt.figure(figsize=(16, 10))
sns.lineplot(data=category_trends_pd, x="Month_Year", y="Total_Revenue", hue="Category", marker="o")
plt.title("Monthly Revenue Trends for Top 25 Product Categories")
plt.xlabel("Month-Year")
plt.ylabel("Total Revenue")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("Top_25_Category_Trends.png")
s3.upload_file("Top_25_Category_Trends.png", "customeranalysis123", "Top_25_Category_Trends.png")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2.7 Assess how bulk purchases affect revenue and supply chain operations <font color = red>[5 marks]</font> <br>

Analyse the impact of how bulk purchasing behavior affects revenue and the overall supply chain operations.

In [49]:
#Assess how bulk purchases affect revenue and supply chain operations
from pyspark.sql.functions import col, sum as _sum
import matplotlib.pyplot as plt
import seaborn as sns

# Filter bulk purchases (Quantity > 5) and compute total revenue per category

bulk_purchases = merged_data.filter(col("Quantity") > 5)

bulk_category_revenue = bulk_purchases.groupBy("Category").agg(
    _sum("Total_Spend").alias("Bulk_Total_Revenue")
)

# Select the top 25 categories by total revenue

top_25_bulk_categories = bulk_category_revenue.orderBy(col("Bulk_Total_Revenue").desc()).limit(25)


# Convert to Pandas for visualisation

top_25_bulk_pd = top_25_bulk_categories.toPandas().sort_values(by="Bulk_Total_Revenue", ascending=False)


# Plot revenue from bulk purchases (Top 25 categories)

plt.figure(figsize=(14, 6))
sns.barplot(data=top_25_bulk_pd, y="Category", x="Bulk_Total_Revenue", palette="coolwarm")
plt.title("Top 25 Categories by Revenue from Bulk Purchases (Quantity > 5)")
plt.xlabel("Bulk Purchase Revenue")
plt.ylabel("Product Category")
plt.tight_layout()
plt.savefig("Bulk_Purchase_Top25_Categories.png")
s3.upload_file("Bulk_Purchase_Top25_Categories.png", "customeranalysis123", "Bulk_Purchase_Top25_Categories.png")
plt.show()
plt.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2.8 Compare lifecycle strategies <font color = red>[5 marks]</font> <br>

Compare new and established products to inform and compare lifecycle strategies to make informed decisions.

In [52]:
#Compare new and established products to inform lifecycle strategies

from pyspark.sql.functions import year, min as _min, sum as _sum, col
import matplotlib.pyplot as plt
import seaborn as sns

merged_data = merged_data.withColumn("order_year", year(col("order_date_parsed")))

# Compute "Launch Year" as the first recorded sale year for each product

product_launch_year = merged_data.groupBy("Title").agg(
    _min("order_year").alias("Launch_Year")
)

# Join this back to the main dataset

merged_with_launch = merged_data.join(product_launch_year, on="Title", how="left")


# Now, we can compute revenue by launch year

revenue_by_launch_year = merged_with_launch.groupBy("Launch_Year").agg(
    _sum("Total_Spend").alias("Total_Revenue")
).orderBy("Launch_Year")


# Convert to Pandas

revenue_by_launch_year_pd = revenue_by_launch_year.toPandas()

# Plot revenue vs. launch year

plt.figure(figsize=(10, 6))
sns.barplot(data=revenue_by_launch_year_pd, x="Launch_Year", y="Total_Revenue", palette="crest")
plt.title("Revenue by Product Launch Year")
plt.xlabel("Launch Year")
plt.ylabel("Total Revenue")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("Revenue_by_Launch_Year.png")
s3.upload_file("Revenue_by_Launch_Year.png", "customeranalysis123", "Revenue_by_Launch_Year.png")
plt.show()
plt.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#5 Conclusion <font color = red>[10 marks]</font> <br>

Write your conclusion.

### In this project, we conducted an in-depth analysis of customer purchase behavior using PySpark on a large-scale e-commerce dataset. The key objectives were to uncover trends in revenue, product performance, state-wise demand, and potential fraud indicators. Below are the major findings and insights:

## a) Weekly Sales Trends: Schedule marketing campaigns and promotions around Sundays and Mondays to maximize customer engagement.
## b) Top-Selling Products :Focus inventory and marketing efforts on these top performers for better ROI.
## c) State Wise Revenue Distribution:Invest in targeted ads and logistics in high-performing states; explore growth strategies in underperforming regions.
## d) Bulk Behaviour Purchase:ailor bulk discount campaigns for top bulk-buying categories to optimize inventory turnover.
## e) Repeat Purchase Analysis: Develop retention initiatives like loyalty programs or email follow-ups to convert one-time buyers into repeat customers(Repeat customer data useful for analysis)
## f) Suspicious Transaction Detection:Investigate flagged transactions further to distinguish between genuine bulk purchases and fraud attempts.
## g) Category Wise Demand Trends:Use this data for inventory planning and seasonal promotions to match demand cycles.
## h) Product Lifecycle Analysis: Maintain support for high-performing old products while strategically investing in launch and promotion of new items.

#This analysis provides valuable direction for improving:

#Marketing strategies (based on time and region),

#Inventory management (through category trends and bulk behavior),

#Customer retention, and

#Fraud prevention.