# Customer Behaviour Analysis

## By Aayush Singh

# Objective
In this case study, you will be working on E-commerce Customer Behavior Analysis using Apache Spark, a powerful distributed computing framework designed for big data processing. This assignment aims to give you hands-on experience in analyzing large-scale e-commerce datasets using PySpark. You will apply techniques learned in data analytics to clean, transform, and explore customer behavior data, drawing meaningful insights to support business decision-making. Apart from understanding how big data tools can optimize performance on a single machine and across clusters, you will develop a structured approach to analyzing customer segmentation, purchase patterns, and behavioral trends.

# Business Value
E-commerce businesses operate in a highly competitive market where understanding customer behavior is critical to driving growth and retention. To stay ahead, companies must leverage data-driven insights to optimize marketing strategies, personalize customer experiences, and improve product offerings. In this assignment, you will analyze e-commerce transaction data to uncover patterns in purchasing behavior, customer preferences, and sales performance. With Apache Spark's ability to handle large datasets efficiently, businesses can process vast amounts of customer interactions in real-time, helping them make faster and more informed decisions.
As an analyst at an e-commerce company, your task is to examine historical transaction records and customer survey data to derive actionable insights that can drive business growth. Your analysis will help identify high-value customers, segment users based on behavior, and uncover trends in product demand and customer engagement. By leveraging big data analytics, businesses can enhance customer satisfaction, improve retention rates, and maximize revenue opportunities.


# Tasks
1. Data Preparation
2. Data Cleaning
3. Exploratory Data Analysis
4. Evaluation and Conclusion


# Dataset Overview
The dataset can be accessed the following [link](https://drive.google.com/drive/folders/1mBgC5tvZrh1bIBvpXVP_j-au5LFUAwOZ?usp=sharing).

The dataset used in this analysis comprises longitudinal purchase records from 5,027 Amazon.com users in the United States, spanning 2018 to 2022.

It is structured into three CSV files (amazon-purchases.csv, survey.csv, and fields.csv) that capture transactional data, demographic profiles, and survey responses.

Collected with informed consent, the dataset enables analysis of customer behavior, product preferences, and demographic trends.

**NOTE**: Personal identifiers (PII) were removed to ensure privacy, and all data were preprocessed by users before submission.

`Data Dictionary:`

| **Attribute**          | **Description** |
|------------------------|----------------|
| **Order Dates**        | The specific dates when orders were placed, enabling chronological analysis of sales trends. |
| **Title** |The name of the product purchased. |
|**Category** | The classification or group to which the product belongs, facilitating category-wise analysis. |
| **Pricing** | The cost per unit of each product, essential for revenue calculations and pricing strategy assessments. |
| **Quantities** | The number of units of each product ordered in a transaction, aiding in inventory and demand analysis. |
| **Shipping States**    | The states to which products were shipped, useful for geographical sales distribution analysis. |
| **Survey ResponseID**  | A unique identifier linking purchases to customer survey responses, enabling correlation between purchasing behavior and customer feedback. |



# Loading the Datasets

In [1]:
## Installing the libraries if required
!pip install --quiet pyspark==3.5.4 datasets==3.3.2 pandas==2.2.2 matplotlib==3.10.0 seaborn==0.13.2 numpy==1.26.4 tqdm==4.67.1


[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Initialise Spark session
spark = SparkSession.builder \
    .appName("Customer Behavior Analysis") \
    .getOrCreate()

In [3]:
spark

In [4]:
# Load the paths to the datasets/csv files
amazon_purchases_path = "amazon-purchases.csv"
survey_path = "survey.csv"
fields_path = "fields.csv"

# Load datasets into PySpark DataFrames
amazon_purchases = spark.read.csv(amazon_purchases_path, header=True, inferSchema=True)
survey = spark.read.csv(survey_path, header=True, inferSchema=True)
fields = spark.read.csv(fields_path, header=True, inferSchema=True)

# Merge the datasets
merged_data = amazon_purchases.join(survey, "Survey ResponseID", "inner") 
                              


# Display the merged data
merged_data.show()

+-----------------+----------+-----------------------+--------+----------------------+--------------------+------------------------+--------------------+-------------+----------------+--------------------+-----------------+-----------------+--------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------------+-------------------------+-----------------------+-------------------+---------------------+--------------+--------------------+--------------------+---------------+------------+------------------+
|Survey ResponseID|Order Date|Purchase Price Per Unit|Quantity|Shipping Address State|               Title|ASIN/ISBN (Product Code)|            Category|  Q-demos-age|Q-demos-hispanic|        Q-demos-race|Q-demos-education|   Q-demos-income|Q-demos-gender|Q-sexual-orientation|Q-demos-state|Q-amazon-use-howmany|Q-amazon-use-hh-size|Q-amazon-use-how-oft|Q-substance-use-cigarettes|Q-substance-use-marijuana|Q-substance-us

In [5]:
merged_data.printSchema()

root
 |-- Survey ResponseID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Purchase Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Shipping Address State: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ASIN/ISBN (Product Code): string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Q-demos-age: string (nullable = true)
 |-- Q-demos-hispanic: string (nullable = true)
 |-- Q-demos-race: string (nullable = true)
 |-- Q-demos-education: string (nullable = true)
 |-- Q-demos-income: string (nullable = true)
 |-- Q-demos-gender: string (nullable = true)
 |-- Q-sexual-orientation: string (nullable = true)
 |-- Q-demos-state: string (nullable = true)
 |-- Q-amazon-use-howmany: string (nullable = true)
 |-- Q-amazon-use-hh-size: string (nullable = true)
 |-- Q-amazon-use-how-oft: string (nullable = true)
 |-- Q-substance-use-cigarettes: string (nullable = true)
 |-- Q-substance-use-marijuana: strin

#1. Data Preparation

Before analysis, the data needs to be prepared to ensure consistency and efficiency.
- Check for data consistency and ensure all columns are correctly formatted.
- Structure and prepare the dataset for further processing, ensuring that relevant features are retained


In [6]:
from pyspark.sql.functions import sum as spark_sum, col

# Check for missing values in the merged dataset
merged_data.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in merged_data.columns]).show()

+-----------------+----------+-----------------------+--------+----------------------+-----+------------------------+--------+-----------+----------------+------------+-----------------+--------------+--------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------------+-------------------------+-----------------------+-------------------+---------------------+--------------+----------------+--------------------+---------------+------------+------------------+
|Survey ResponseID|Order Date|Purchase Price Per Unit|Quantity|Shipping Address State|Title|ASIN/ISBN (Product Code)|Category|Q-demos-age|Q-demos-hispanic|Q-demos-race|Q-demos-education|Q-demos-income|Q-demos-gender|Q-sexual-orientation|Q-demos-state|Q-amazon-use-howmany|Q-amazon-use-hh-size|Q-amazon-use-how-oft|Q-substance-use-cigarettes|Q-substance-use-marijuana|Q-substance-use-alcohol|Q-personal-diabetes|Q-personal-wheelchair|Q-life-changes|Q-sell-YOUR-data|

In [7]:
# Total number of columns.
len(merged_data.columns)

30

In [8]:
# Total number of rows in the merged dataset.
merged_data.count()

1811238

In [9]:
null_counts = merged_data.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in merged_data.columns
]).collect()[0].asDict()

non_zero_nulls = {k: v for k, v in null_counts.items() if v > 0}

sorted_nulls = dict(sorted(non_zero_nulls.items(), key=lambda item: item[1], reverse=True))

for col_name, count in sorted_nulls.items():
    print(f"{col_name}: {count}")


Q-life-changes: 1212958
Title: 89740
Category: 89435
Shipping Address State: 86832
ASIN/ISBN (Product Code): 951


# 2. Data Cleaning.

Prepare the data for further analysis by performing data cleaning such as missing value treatment, handle data schema, outlier analysis, and relevant feature engineering techniques.

## 2.1 Handling Missing values.
Handle missing values in the data

In [10]:
# Import necessary functions
from pyspark.sql.functions import col, desc

# Fill missing (null) values with the appropriate techniques as required by the analysis

# Aggregate and count missing values (nulls) for each column after replacement

# Display the count of remaining missing values in each column


In [11]:
# Your code here
# Dropping the column "Q-life-changes" as about 67% values are null.
merged_data = merged_data.drop("Q-life-changes")

In [12]:
# Dropping rows where the "ASIN/ISBN (Product Code)" column value is null
merged_data = merged_data.na.drop(subset=["ASIN/ISBN (Product Code)"])

In [13]:
# Filling the missing values in the remaining 3 categorical columns with their respective modes. 
null_columns = ['Title', 'Category', 'Shipping Address State']
for i in null_columns:
    mode_row = merged_data.groupBy(i).count().orderBy(desc("count")).first()
    
    # Check if the mode is None (for instance, if the column has only None values)
    if mode_row and mode_row[0] is not None:
        mode_value = mode_row[0]
        # Fill missing values with the mode
        merged_data = merged_data.fillna({i: mode_value})
    else:
        # If the mode is "None".
        merged_data = merged_data.fillna({i: "Unknown"})

In [14]:
# Checking for any null values after handling them.
null_counts = merged_data.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in merged_data.columns
]).collect()[0].asDict()

# Filter to show only columns with null values.
non_zero_nulls = {k: v for k, v in null_counts.items() if v > 0}

# Sort by null count (optional)
sorted_nulls = dict(sorted(non_zero_nulls.items(), key=lambda item: item[1], reverse=True))

# Print in a neat format
for col_name, count in sorted_nulls.items():
    print(f"{col_name}: {count}")

### There are no more columns in the dataframe containing null values, as they are either filled with the mode or "Unknown". 

## 2.2 Feature Engineering.
Perform feature engineering on the dataset to extract relevant/ create new features as required and map specific data types.

In [15]:
from pyspark.sql.functions import col, month, year, dayofmonth

# Perform appropriate feature engineering. Eg. Extract order date, month, year and cast to the appropriate values
merged_data = merged_data.withColumn("year", year("Order Date"))
merged_data = merged_data.withColumn("month", month("Order Date"))
merged_data = merged_data.withColumn("date", dayofmonth("Order Date"))
# Display the updated dataset
merged_data.columns

['Survey ResponseID',
 'Order Date',
 'Purchase Price Per Unit',
 'Quantity',
 'Shipping Address State',
 'Title',
 'ASIN/ISBN (Product Code)',
 'Category',
 'Q-demos-age',
 'Q-demos-hispanic',
 'Q-demos-race',
 'Q-demos-education',
 'Q-demos-income',
 'Q-demos-gender',
 'Q-sexual-orientation',
 'Q-demos-state',
 'Q-amazon-use-howmany',
 'Q-amazon-use-hh-size',
 'Q-amazon-use-how-oft',
 'Q-substance-use-cigarettes',
 'Q-substance-use-marijuana',
 'Q-substance-use-alcohol',
 'Q-personal-diabetes',
 'Q-personal-wheelchair',
 'Q-sell-YOUR-data',
 'Q-sell-consumer-data',
 'Q-small-biz-use',
 'Q-census-use',
 'Q-research-society',
 'year',
 'month',
 'date']

In [16]:
from pyspark.sql.functions import create_map, lit
from itertools import chain

# Map categorical income to numerical values
income_mapping = {
    'Less than $25,000': 0,
    '$25,000 - $49,999': 1,
    '$50,000 - $74,999': 2,
    '$75,000 - $99,999': 3,
    '$100,000 - $149,999': 4,
    '$150,000 or more': 5
}

income_map_expr = create_map([lit(x) for x in chain(*income_mapping.items())])

# Map gender to numerical values
gender_mapping = {
    'Male': 1,
    'Female': 2,
    'Other': 3,
    'Prefer not to say': 4
}

gender_map_expr = create_map([lit(x) for x in chain(*gender_mapping.items())])

# Display the updated dataset
merged_data = merged_data.withColumn("income_numeric", income_map_expr.getItem("income")) \
              .withColumn("gender_numeric", gender_map_expr.getItem("gender"))

merged_data.columns

['Survey ResponseID',
 'Order Date',
 'Purchase Price Per Unit',
 'Quantity',
 'Shipping Address State',
 'Title',
 'ASIN/ISBN (Product Code)',
 'Category',
 'Q-demos-age',
 'Q-demos-hispanic',
 'Q-demos-race',
 'Q-demos-education',
 'Q-demos-income',
 'Q-demos-gender',
 'Q-sexual-orientation',
 'Q-demos-state',
 'Q-amazon-use-howmany',
 'Q-amazon-use-hh-size',
 'Q-amazon-use-how-oft',
 'Q-substance-use-cigarettes',
 'Q-substance-use-marijuana',
 'Q-substance-use-alcohol',
 'Q-personal-diabetes',
 'Q-personal-wheelchair',
 'Q-sell-YOUR-data',
 'Q-sell-consumer-data',
 'Q-small-biz-use',
 'Q-census-use',
 'Q-research-society',
 'year',
 'month',
 'date',
 'income_numeric',
 'gender_numeric']

## 2.3 Data Cleaning.
Handle data cleaning techniques such as data duplication, dropping unnecessary values etc.

In [17]:
# Check for duplicates
print("Number of Duplicates:", merged_data.count() - merged_data.dropDuplicates().count())

# Remove duplicates
merged_data = merged_data.dropDuplicates()

# Verify duplicates after cleaning
print("Number of Duplicates After Cleaning:", merged_data.count() - merged_data.dropDuplicates().count())

Number of Duplicates: 11506
Number of Duplicates After Cleaning: 0


In [18]:
# cleaned_data_path =  r"C:\Users\Aayush Singh"# Save the cleaned dataset locally
# merged_data.write.csv(cleaned_data_path, header=True, mode='overwrite')

# # Load the cleaned dataset from the location
# cleaned_data = spark.read.csv(cleaned_data_path, header=True, inferSchema=True)

# # Display the first few rows
# print("Cleaned Data:")
# cleaned_data.show(5)

# 3. Exploratory Data Analysis.

## 3.1 Analyse purchases by hour, day and month.

Examine overall trends in purchases over time and analyse the trends by hour, day, month.

In [None]:
# Purchase Distribution by Hour, Day, and Month

from pyspark.sql.functions import hour, dayofweek, month
import seaborn as sns
import matplotlib.pyplot as plt

# Extract hour, day, and month
merged_data = merged_data.withColumn('day_of_week', dayofweek('Order Date'))
merged_data = merged_data.withColumn('month', month('Order Date'))

# Group and count purchases by time factors
daily_data = merged_data.groupBy('day_of_week').count().orderBy('day_of_week')
monthly_data = merged_data.groupBy('month').count().orderBy('month')

# Convert to Pandas for visualisation
daily_df = daily_data.toPandas()
monthly_df = monthly_data.toPandas()

fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Daily distribution plot
sns.barplot(x='day_of_week', y='count', data=daily_df, ax=axes[1])
axes[1].set_title('Purchases by Day of Week')
axes[1].set_xlabel('Day of Week (1=Monday, 7=Sunday)')
axes[1].set_ylabel('Number of Purchases')

# Monthly distribution plot
sns.barplot(x='month', y='count', data=monthly_df, ax=axes[2])
axes[2].set_title('Purchases by Month')
axes[2].set_xlabel('Month')
axes[2].set_ylabel('Number of Purchases')

# Plot the data
plt.tight_layout()
plt.show()

In [None]:
# Monthly Purchase Trends

from pyspark.sql.functions import date_format

# Extract month and year from 'Order Date'
merged_data = merged_data.withColumn('Month_Year', date_format('Order Date', 'yyyy-MM'))

# Group by month and count purchases
monthly_purchases = merged_data.groupBy('Month_Year').count().orderBy('Month_Year')

# Convert to Pandas for visualization
monthly_purchases_pd = monthly_purchases.toPandas()

# Plot
plt.figure(figsize=(10, 6))
plt.plot(monthly_purchases_pd['Month_Year'], monthly_purchases_pd['count'], marker='o')
plt.xticks(rotation=45)
plt.title('Monthly Purchase Trends')
plt.xlabel('Month-Year')
plt.ylabel('Number of Purchases')
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# Yearly Purchase Trends
# Group by Year and count purchases
yearly_data = merged_data.groupBy('year').count().orderBy('year')

# Convert to Pandas for visualisation
yearly_df = yearly_data.toPandas()

# Plot
plt.figure(figsize=(10, 6))
plt.plot(yearly_df['year'], yearly_df['count'], marker='o')
plt.xticks(rotation=45)
plt.title('Yearly Purchase Trends')
plt.xlabel('Year')
plt.ylabel('Number of Purchases')
plt.grid(True)
plt.tight_layout()
plt.show()

## 3.2 Customer Demographics vs Purchase Frequency.
Analyse the trends between the customer deographics and the purchase frequency

In [None]:
# Correlation Between Demographics and Purchase Frequency

from pyspark.sql import functions as F

# Group by demographic attributes and count purchases
demographic_purchases = merged_data.groupBy('Q-demos-gender').agg(F.count('*').alias('purchase_count'))


# Convert to Pandas for visualisation
demographic_purchases_pd = demographic_purchases.toPandas()


# Plot
plt.figure(figsize=(10, 6))
plt.bar(demographic_purchases_pd['Q-demos-gender'], demographic_purchases_pd['purchase_count'], color='skyblue')
plt.title('Purchase Frequency by Demographic Group')
plt.xlabel('Demographic Group (Gender)')
plt.ylabel('Number of Purchases')
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

## 3.3 Purchase behavior weekend vs weekday.

Compare the purchase behavior of customer's on weekdays vs. weekends.

In [None]:
# Weekday vs. Weekend Purchase Behavior

from pyspark.sql.functions import when, col

# Define weekdays and weekends
merged_data = merged_data.withColumn("Day_Type", when(col("Order Date").rlike(".*[0-6]$"), "Weekend")
                   .otherwise("Weekday"))

# Group and count purchases
purchase_counts = merged_data.groupBy("Day_Type").count()

# Convert to Pandas for visualisation
purchase_counts_pd = purchase_counts.toPandas()

# Plot
purchase_counts_pd.set_index("Day_Type").plot(kind='bar', legend=False, color=['skyblue', 'lightgreen'])
plt.title('Weekday vs Weekend Purchase Behavior')
plt.ylabel('Number of Purchases')
plt.xlabel('Day Type')
plt.xticks(rotation=0)
plt.show()

## 3.4 Frequently purchased product pairs.

Analyze how frequently products are purchased together (also known as Market Basket Analysis)


In [None]:
# Frequently Purchased Product Pairs (Market Basket Analysis)

from pyspark.sql.functions import collect_set, explode

# Group purchases by customer and collect items bought together
df_grouped = merged_data.groupBy("Survey ResponseID").agg(collect_set("Title").alias("Title"))


# Explode item pairs
df_exploded = df_grouped.withColumn("item", explode("Title")).select("Survey ResponseID", "item")
df_exploded.createOrReplaceTempView("exploded")

df_pairs = spark.sql("""
    SELECT a.`Survey ResponseID`, a.item AS item1, b.item AS item2
    FROM exploded a
    JOIN exploded b
    ON a.`Survey ResponseID` = b.`Survey ResponseID`
    AND a.item < b.item
""")

# Count co-occurrences of items
df_pair_counts = df_pairs.groupBy("item1", "item2").count().sort("count", ascending=False)

# Convert to Pandas for visualisation
pandas_df = df_pair_counts.toPandas()

# Plot
plt.figure(figsize=(10, 6))
plt.barh(top_pairs['item1'] + " & " + top_pairs['item2'], top_pairs['count'], color='skyblue')
plt.xlabel('Count')
plt.title('Top 10 Most Frequently Bought Item Pairs')
plt.gca().invert_yaxis()  # To have the highest values at the top
plt.show()

## 3.5 Examine Product Performance.

Examine the performance of products by calculating revenue and item popularity.

In [None]:
# Contribution of Product Categories (Top 25)
df_with_sales = merged_data.withColumn("total_sales", col("Purchase Price Per Unit") * col("Quantity"))
category_contribution = df_with_sales.groupBy("Category").sum("total_sales").withColumnRenamed("sum(total_sales)", "total_sales")
category_contribution = category_contribution.orderBy("total_sales", ascending=False)

# Show the top 25 categories
top_25_contributions = category_contribution.limit(25)

# Convert to Pandas for visualisation
top_25_contributions_pd = top_25_contributions.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x="total_sales", y="Category", data=top_25_contributions_pd, palette="viridis")

# Set labels and title
plt.xlabel("Total Sales")
plt.ylabel("Product Category")
plt.title("Top 25 Product Categories by Total Sales")
plt.tight_layout()

# Show plot
plt.show()

## 3.6 Top products by quantity.

Identify the most frequently purchased products.

In [None]:
# Top 10 Products by Quantity

# Group by product title and sum 'Quantity'
product_quantity = merged_data.groupBy("Title").sum("Quantity").withColumnRenamed("sum(quantity)", "total_quantity")
product_quantity = product_quantity.orderBy("total_quantity", ascending=False)
top_10_products = product_quantity.limit(10)


# Convert to Pandas for visualisation
top_10_products_pd = top_10_products.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x="total_quantity", y="Title", data=top_10_products_pd, palette="Blues_d")

# Set labels and title
plt.xlabel("Total Quantity Sold")
plt.ylabel("Product Title")
plt.title("Top 10 Products by Quantity Sold")
plt.tight_layout()

# Show the plot
plt.show()

## 3.7 Distribution of Purchases by State.

Analyze the distribution of purchases across states and categories.

In [None]:
# Distribution of Purchases by State (Top 25)
state_purchases = merged_data.groupBy("Shipping Address State").sum("Quantity").withColumnRenamed("sum(quantity)", "total_purchases")
state_purchases = state_purchases.orderBy("total_purchases", ascending=False)
top_25_states = state_purchases.limit(25)

# Convert to Pandas for visualisation
top_25_states_pd = top_25_states.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x="total_purchases", y="Shipping Address State", data=top_25_states_pd, palette="viridis")

# Set labels and title
plt.xlabel("Total Purchases")
plt.ylabel("State")
plt.title("Top 25 States by Total Purchases")
plt.tight_layout()

# Show the plot
plt.show()

## 3.8 Price vs Product Quantity.

Identify the Relationship between Price and Quantity

In [None]:
# Relationship Between Price and Quantity
df_price_quantity = merged_data.select("Purchase Price Per Unit", "Quantity")


# Convert to Pandas for visualisation
df_price_quantity_pd = df_price_quantity.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.scatterplot(x="Purchase Price Per Unit", y="Quantity", data=df_price_quantity_pd, color='blue', alpha=0.6)

# Set labels and title
plt.xlabel("Price")
plt.ylabel("Quantity Sold")
plt.title("Relationship Between Price and Quantity")
plt.tight_layout()

# Show the plot
plt.show()

## 3.9 Analyse the spending KPIs.


A popular KPI is average spend per customer. Calculate this metric as the ratio of total transaction amount from non-recurring payments divided by the total number of customers who made a purchase

In [None]:
# Average Spend per Customer

from pyspark.sql.functions import avg

# Group by customer and calculate average spend
df_with_spend = merged_data.withColumn("total_spend", col("Purchase Price Per Unit") * col("Quantity"))
customer_spend = df_with_spend.groupBy("Survey ResponseID").sum("total_spend").withColumnRenamed("sum(total_spend)", "total_spend")
average_spend_per_customer = customer_spend.agg(avg("total_spend").alias("avg_spend"))

# Convert to Pandas for visualisation
average_spend_df = average_spend_per_customer.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.histplot(customer_spend.toPandas()["total_spend"], kde=True, color="blue")

# Set labels and title
plt.xlabel("Total Spend per Customer ($)")
plt.ylabel("Frequency")
plt.title("Distribution of Spend per Customer")
plt.tight_layout()

# Show the plot
plt.show()

Analyse the Repeat Purchase Behavior of Customers

In [None]:
# Repeat Purchase Analysis Behavior Per Customers
customer_purchase_count = merged_data.groupBy("Survey ResponseID").count().withColumnRenamed("count", "purchase_count")
repeat_customers = customer_purchase_count.filter(col("purchase_count") > 1)

# Convert to Pandas for visualisation
repeat_customers_pd = repeat_customers.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.histplot(repeat_customers_pd["purchase_count"], kde=False, bins=range(1, repeat_customers_pd["purchase_count"].max() + 1), color="blue")

# Set labels and title
plt.xlabel("Number of Purchases")
plt.ylabel("Number of Customers")
plt.title("Distribution of Repeat Purchases per Customer")
plt.tight_layout()

# Show the plot
plt.show()

Analyse the top 10 high-engagement customers

In [None]:
# Top 10 High-Engagement Customers
df_with_spend = merged_data.withColumn("total_spend", col("Purchase Price Per Unit") * col("Quantity"))
customer_spend = df_with_spend.groupBy("Survey ResponseID").sum("total_spend").withColumnRenamed("sum(total_spend)", "total_spend")
top_10_customers = customer_spend.orderBy("total_spend", ascending=False).limit(10)

# Convert to Pandas for visualisation
top_10_customers_pd = top_10_customers.toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x="total_spend", y="Survey ResponseID", data=top_10_customers_pd, palette="viridis")

# Set labels and title
plt.xlabel("Total Spend ($)")
plt.ylabel("Customer ID")
plt.title("Top 10 High-Engagement Customers by Total Spend")
plt.tight_layout()

# Show the plot
plt.show()

## 3.10 Seasonal trends in product purchases and their impact on revenues.

Investigate the seasonal trends in product purchases and their impact on the overall revenue.

In [None]:
# Seasonal Trends in Product Purchases and Their Impact on Revenue

from pyspark.sql.functions import year

# Extract year and month


# Group by year and month, summing total revenue
df_with_revenue = merged_data.withColumn("total_revenue", col("Purchase Price Per Unit") * col("Quantity"))

# Group by 'year' and 'month' and calculate total revenue per month
monthly_revenue = df_with_revenue.groupBy("year", "month").sum("total_revenue").withColumnRenamed("sum(total_revenue)", "total_revenue")

# Convert to Pandas for visualisation
monthly_revenue_pd = monthly_revenue.toPandas()

# Plot
monthly_revenue_pd["year_month"] = monthly_revenue_pd["year"].astype(str) + "-" + monthly_revenue_pd["month"].astype(str).str.zfill(2)

# Plotting the seasonal trends in total revenue
plt.figure(figsize=(12, 6))
sns.lineplot(x="year_month", y="total_revenue", data=monthly_revenue_pd, marker='o', color="blue")

# Set labels and title
plt.xlabel("Year-Month")
plt.ylabel("Total Revenue ($)")
plt.title("Seasonal Trends in Product Purchases and Their Impact on Revenue")
plt.xticks(rotation=45)
plt.tight_layout()

# Show the plot
plt.show()

## 3.11 Customer location vs purchasing behavior.

Examine the relationship between customer's location and their purchasing behaviors

In [None]:
# Relationship Between Customer Location and Purchase Behavior
df_with_revenue = merged_data.withColumn("total_revenue", col("Purchase Price Per Unit") * col("Quantity"))

# Group purchases by state and total spend
state_revenue = df_with_revenue.groupBy("Shipping Address State").sum("total_revenue").withColumnRenamed("sum(total_revenue)", "total_revenue")

# Convert to Pandas for visualisation
state_revenue_pd = state_revenue.toPandas()

# Plot revenue by state
plt.figure(figsize=(12, 6))
sns.barplot(x="total_revenue", y="Shipping Address State", data=state_revenue_pd, palette="viridis")

# Set labels and title
plt.xlabel("Total Revenue ($)")
plt.ylabel("State")
plt.title("Total Revenue by State")
plt.tight_layout()

# Show the plot
plt.show()

## 3. Customer Segmentation and Insights.


## 3.1 Perform RFM Analysis.

RFM Analysis is a powerful customer segmentation technique used to evaluate and quantify customer value based on three key dimensions:
- **Recency**,
- **Frequency**,
- **Monetary**.

This method is particularly effective in identifying high-value customers, optimizing marketing strategies, and improving customer retention in the e-commerce industry.


### 1. Recency (R)
Recency measures how recently a customer made a purchase Customers who have purchased more recently are more likely to respond to promotions and offers.
- **Application:** By ranking customers based on the number of days since their last transaction, you can prioritize those who are most engaged.

### 2. Frequency (F)
Frequency counts the number of purchases a customer has made over a given period.
Frequent purchasers tend to be more loyal and are often a source of recurring revenue.
- **Application:** Analyzing purchase frequency helps in identifying consistent buyers and understanding their buying patterns.

### 3. Monetary (M)
Monetary value represents the total amount of money a customer has spent.
Customers who spend more are often more profitable, making them ideal targets for retention and upsell strategies.
- **Application:** By assessing the monetary contribution, you can distinguish between high-value and low-value customers.


### Prepare data for RFM Analysis.


In [None]:
from pyspark.sql.functions import datediff, max, count, sum, lit

# Get the latest order date in the dataset
latest_order_date = merged_data.agg(max("Order Date")).collect()[0][0]
print(f"Latest Order Date: {latest_order_date}")

# Calculate RFM metrics
rfm_data = merged_data.groupBy("Survey ResponseID") \
    .agg(
        # Recency: Difference between the latest date and the last purchase date
        datediff(lit(latest_order_date), max("Order Date")).alias("Recency"),
        
        # Frequency: Count of orders per customer
        count("*").alias("Frequency"),
        
        # Monetary: Sum of total spend (price * quantity)
        sum(col("Purchase Price Per Unit") * col("Quantity")).alias("Monetary")
    )

# Filter out customers with no purchases
rfm_data_filtered = rfm_data.filter(col("Frequency") > 0)

# Show RFM data
rfm_data.show()
rfm_data_filtered.show()

In [None]:
from pyspark.sql.functions import log1p

# Apply log transformation to skewed features
skewed_columns = ['Purchase Price Per Unit', 'Quantity']

df_log_transformed = merged_data
for column in skewed_columns:
    df_log_transformed = df_log_transformed.withColumn(column, log1p(col(column)))
    
# Convert to Pandas DataFrame (for scikit-learn compatibility)
df_log_transformed_pd = df_log_transformed.toPandas()

# Scale features using StandardScaler
from sklearn.preprocessing import StandardScaler
features = df_log_transformed_pd[['Purchase Price Per Unit', 'Quantity']]
scaler = StandardScaler()

# Fit and transform the features
scaled_features = scaler.fit_transform(features)

# Convert the scaled features back to a DataFrame
scaled_df = pd.DataFrame(scaled_features, columns=features.columns)

In [None]:
from sklearn.cluster import KMeans
X = scaled_df.values
# Calculate the Within-Cluster Sum of Squares (WCSS)
wcss = []

for i in range(1, 11):
    kmeans = KMeans(n_clusters=i, init='k-means++', max_iter=300, n_init=10, random_state=42)
    kmeans.fit(X)
    wcss.append(kmeans.inertia_)
    
# Plot the elbow curve with the number of clusters on the x-axis and WCSS on the y-axis
plt.figure(figsize=(8,6))
plt.plot(range(1, 11), wcss, marker='o')
plt.title('Elbow Method for Optimal Number of Clusters')
plt.xlabel('Number of Clusters')
plt.ylabel('WCSS (Within-Cluster Sum of Squares)')
plt.grid(True)
plt.show()

### 3.2.1 When to schedule effective promotions.

Compare sales across weekdays to schedule effective promotions

In [None]:
#Compare sales across weekdays to schedule effective promotions

from pyspark.sql.functions import dayofweek
df_with_sales = merged_data.withColumn('total_sales', F.col('Purchase Price Per Unit') * F.col('Quantity'))

# Extract day of the week (1 = Sunday, 7 = Saturday)
df_with_weekday = df_with_sales.withColumn('weekday', F.dayofweek('Order Date'))

# Group by weekday and sum total sales
df_sales_by_weekday = df_with_weekday.groupBy('weekday').agg(F.sum('total_sales').alias('total_sales'))

# Convert to Pandas for visualisation
df_sales_by_weekday_pd = df_sales_by_weekday.toPandas()

weekday_map = {
    1: 'Sunday',
    2: 'Monday',
    3: 'Tuesday',
    4: 'Wednesday',
    5: 'Thursday',
    6: 'Friday',
    7: 'Saturday'
}

df_sales_by_weekday_pd['weekday'] = df_sales_by_weekday_pd['weekday'].map(weekday_map)

# Sort by weekday for visualization
df_sales_by_weekday_pd = df_sales_by_weekday_pd.sort_values('weekday', key=lambda x: x.map(weekday_map))

# Plot sales by weekday
plt.figure(figsize=(8, 6))
plt.bar(df_sales_by_weekday_pd['weekday'], df_sales_by_weekday_pd['total_sales'])
plt.title('Sales by Weekday')
plt.xlabel('Weekday')
plt.ylabel('Total Sales')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

### 3.2.2 Top-selling Products.

Identify top-selling products by considering revenue and engagement metrics

In [None]:
#Identify top-selling products using revenue and engagement metrics
df_with_revenue = merged_data.withColumn('total_revenue', F.col('Purchase Price Per Unit') * F.col('Quantity'))

# Group by product and sum revenue
df_revenue_by_product = df_with_revenue.groupBy('Title').agg(F.sum('total_revenue').alias('total_revenue'))

# Get top 10 products by revenue
df_top_10_products = df_revenue_by_product.orderBy(F.col('total_revenue').desc()).limit(10)

# Convert to Pandas for visualisation
df_top_10_products_pd = df_top_10_products.toPandas()

# Plot top products by revenue
plt.figure(figsize=(10, 6))
plt.bar(df_top_10_products_pd['Title'], df_top_10_products_pd['total_revenue'])
plt.title('Top 10 Products by Revenue')
plt.xlabel('Product Name')
plt.ylabel('Total Revenue')
plt.xticks(rotation=45, ha='right')  # Rotate x-axis labels for readability
plt.tight_layout()
plt.show()

### 3.2.3 State-wise revenue Distribution.

Assess state-wise revenue to focus on high-growth areas

In [None]:
#Assess state-wise revenue to focus on high-growth areas
df_with_revenue = merged_data.withColumn('total_revenue', F.col('Purchase Price Per Unit') * F.col('Quantity'))

# Group by state and sum revenue
df_revenue_by_state = df_with_revenue.groupBy('Shipping Address State').agg(F.sum('total_revenue').alias('total_revenue'))

# Convert to Pandas for visualisation
df_revenue_by_state_pd = df_revenue_by_state.toPandas()

# Plot revenue by state
plt.figure(figsize=(12, 8))
plt.bar(df_revenue_by_state_pd['Shipping Address State'], df_revenue_by_state_pd['total_revenue'])
plt.title('Revenue by State')
plt.xlabel('State')
plt.ylabel('Total Revenue')
plt.xticks(rotation=45, ha='right')  # Rotate x-axis labels for readability
plt.tight_layout()
plt.show()

### 3.2.4 Repeat Purchase Behavior.

Examine repeat purchase behavior to enhance retention initiatives.

In [None]:
#Examine repeat purchase behavior to enhance retention initiatives

from pyspark.sql.functions import count, col

# Count total purchases per customer
df_purchases_per_customer = merged_data.groupBy('Survey ResponseID').agg(F.count('*').alias('purchase_count'))

# Filter for repeat customers (those with more than one purchase)
df_repeat_customers = df_purchases_per_customer.filter(F.col('purchase_count') > 1)

# Show sample data
df_repeat_customers.show(10)

### 3.2.5 Flagging Potential Fraud.

Identify irregular transaction patterns to flag potential fraud.

In [None]:
#Identify irregular transaction patterns to flag potential fraud

from pyspark.sql.functions import col, avg, stddev

# Calculate the threshold for unusually high spending
df_with_spending = merged_data.withColumn('total_spent', F.col('Purchase Price Per Unit') * F.col('Quantity'))

mean_spent, stddev_spent = df_with_spending.select(
    F.avg('total_spent').alias('mean_spent'),
    F.stddev('total_spent').alias('stddev_spent')
).first()

threshold = mean_spent + 3 * stddev_spent

df_suspicious_transactions = df_with_spending.filter(F.col('total_spent') > threshold)

df_suspicious_transactions.show(10)

### 3.2.6 Demand Variations across product categories.

Perform inventory management by monitoring demand variations across product categories.

In [None]:
#Monitor demand variations across product categories (Top 25) for inventory management

from pyspark.sql.functions import col, sum
import matplotlib.pyplot as plt
import seaborn as sns

df_with_revenue = merged_data.withColumn('total_revenue', F.col('Purchase Price Per Unit') * F.col('Quantity'))

df_with_revenue = df_with_revenue.withColumn('year_month', F.date_format('Order Date', 'yyyy-MM'))

# Group by category and month, summing total revenue
category_trends = df_with_revenue.groupBy('Category', 'year_month').agg(F.sum('total_revenue').alias('total_revenue'))

# Compute total revenue per category
category_total_revenue = category_trends.groupBy('Category').agg(F.sum('total_revenue').alias('total_revenue'))

# Get the top 25 categories by total revenue
top_25_categories = category_total_revenue.orderBy(F.col('total_revenue').desc()).limit(25)

# Filter category_trends to include only top 25 categories
top_25_category_trends = category_trends.join(top_25_categories, on='Category', how='inner')

# Convert to Pandas for visualisation
top_25_category_trends_pd = top_25_category_trends.toPandas()

# Plot revenue trends for top 25 categories
plt.figure(figsize=(14, 8))
sns.lineplot(data=top_25_category_trends_pd, x='year_month', y='total_revenue', hue='Category', marker='o')

### 3.2.7 Assess how bulk purchases affect revenue and supply chain operations.

Analyse the impact of how bulk purchasing behavior affects revenue and the overall supply chain operations.

In [None]:
#Assess how bulk purchases affect revenue and supply chain operations
df_with_revenue = merged_data.withColumn('total_revenue', F.col('Purchase Price Per Unit') * F.col('Quantity'))

# Filter bulk purchases (Quantity > 5) and compute total revenue per category
df_bulk_purchases = df_with_revenue.filter(F.col('Quantity') > 5)
bulk_purchases_by_category = df_bulk_purchases.groupBy('Category').agg(F.sum('total_revenue').alias('total_revenue'))

# Select the top 25 categories by total revenue
top_25_bulk_categories = bulk_purchases_by_category.orderBy(F.col('total_revenue').desc()).limit(25)

# Convert to Pandas for visualisation
top_25_bulk_categories_pd = top_25_bulk_categories.toPandas()

# Plot revenue from bulk purchases (Top 25 categories)
plt.figure(figsize=(12, 8))
sns.barplot(data=top_25_bulk_categories_pd, x='Category', y='total_revenue')

### 4. Conclusion

Write your conclusion.

In [None]:
•	The number of purchases made by customers has been mostly at consistent levels throughout the week with the peak at Tuesday.
•	When it comes to purchases throughout the months the most purchases were made in the month of December and the lowest in February.
•	The highest volume of orders was made from the state of California followed by Texas and Florida. On the other hand, Nevada, Kentucky recorded the least purchase amount.
•	If we observe the top sold product categories from amazon then ABIS_BOOK, GIFT_CARD and PET_FOOD were leading the list with TABLET_COMPUTER, ELECTRONIC_CABLE and MATTRESS in the end of the list.
•	Extending the third point, the Total Revenue by States was naturally highest for California, Texas and Florida. 
•	The number of products ordered is inversely proportional to number of customer using amazon service.
•	Upon visualizing the Purchase Frequency based on the gender demographics of the customers, we found that females make more frequent purchases on amazon compared to males and other genders.
•	Another significant observation was that quantity of orders by customers was way more during weekends as opposed to weekdays, which is understandable as people have more time to make various purchases in the weekends compared to working weekdays.