##### I. Setup

In [None]:
# Java 11 and Spark 3.5.5 installation
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar -xzf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

tar (child): spark-3.5.5-bin-hadoop3.tgz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now


In [None]:
import os
import findspark
from pyspark.sql import SparkSession, DataFrame
from google.colab import files
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from functools import reduce

In [None]:
# Setup environment
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

In [None]:
# Initial SparkSession
findspark.init()
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
spark

FileNotFoundError: [Errno 2] No such file or directory: '/content/spark-3.5.5-bin-hadoop3/./bin/spark-submit'

##### II. Load data

In [None]:
files.upload()

In [None]:
df = spark.read.csv("OnlineRetail.csv", header=True, inferSchema=True)
df.show(5)

##### III. Data Profiling, Anomaly Detection, and Data Cleaning

###### 3.1 Check the number of columns and rows

In [None]:
# Check the number of columns and rows
print("Number of columns: ", len(df.columns))
print("Number of rows: ", df.count())

###### 3.2 Abnormal 1: Check and Correct data type

In [None]:
# Check schema
df.printSchema()

In [None]:
# Convert datatype of InvoiceDate to timestamp
df_cleaned = df.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "M/d/yyyy H:mm"))

# Re-check schema
df_cleaned.printSchema()

###### 3.3 Abnormal 2: Check and handle missing values

In [None]:
# Check for missing values in original data
missing_values = df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns
])
missing_values.show()

- `Description`: 1,454 --> remove since it is hard to guess the product description
- `CustomerID`: 135,080 --> might keep them, change `null` value into `Unknown`

In [None]:
df_cleaned.filter(df.Description.isNull()).select("StockCode").distinct().count()

In [None]:
missing_stockcodes = (
    df_cleaned
    .filter(col("Description").isNull())
    .select("StockCode")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)

In [None]:
# Delete the null values in Description
df_cleaned = df_cleaned.filter(df_cleaned.Description.isNotNull())

# Change the value in CustomerID into "Unknown" for null values
df_cleaned = df_cleaned.withColumn("CustomerID", when(df_cleaned.CustomerID.isNull(), "Unknown").otherwise(df_cleaned.CustomerID))

In [None]:
# Re-check for missing values in the cleaned data
missing_values = df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns
])
missing_values.show()

###### 3.4 Abnormal 3: Check and handle duplicate rows

In [None]:
# Check the number of duplicate rows
duplicate_count = df_cleaned.count() - df_cleaned.dropDuplicates().count()
print("Number of duplicate rows: ", duplicate_count)

In [None]:
# Drop duplicate rows
df_cleaned = df_cleaned.dropDuplicates()

In [None]:
# Re-check the number of duplicate rows
duplicate_count = df_cleaned.count() - df_cleaned.dropDuplicates().count()
print("Number of duplicate rows: ", duplicate_count)

###### 3.5 Abnormal 4: Handle negative Quantity and UnitPrice

**Check the number of cancelled order:**

The InvoiceNo starts with C (has negative Quantity)

In [None]:
cancel = df_cleaned.filter(col("InvoiceNo").startswith("C"))
print("Number of canceled orders: ", cancel.count())
cancel.show(5)

**Check for invalid values in Quantity and UnitPrice:**
- Quantity < 0
- UnitPrice < 0

In [None]:
# Quantity has negative value
negative_Quantity = df_cleaned.filter(col("Quantity") < 0).count()
print("Number of negative Quantity: ", negative_Quantity)
df_cleaned.filter(col("Quantity") < 0).select("InvoiceNo", "Quantity").show(5)

In [None]:
# UnitPrice has negative value
negative_UnitPrice = df_cleaned.filter(col("UnitPrice") < 0).count()
print("Number of negative UnitPrice: ", negative_UnitPrice)
df_cleaned.filter(col("UnitPrice") < 0).select("InvoiceNo", "UnitPrice").show(5)

- We can see that cancelled orders also have the negative Quantity.
- The dataset contains 9,251 cancelled orders, while 9,725 orders have a negative quantity.
- Therefore, in the Data Cleaning section, when we remove orders with negative quantities, we also remove the cancelled orders at the same time.

**Remove abnormal Quantity and UnitPrice**

In [None]:
# Filter out negative Quantity and UnitPrice
df_cleaned = df_cleaned.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))

# Check negative Quantity and UnitPrice again
print("Negative Quantity count:", df_cleaned.filter(col("Quantity") < 0).count())
print("Negative UnitPrice count:", df_cleaned.filter(col("UnitPrice") < 0).count())

# Check number of cancelled orders again
cancel = df_cleaned.filter(col("InvoiceNo").startswith("C"))
print("Number of cancelled orders: ", cancel.count())

###### 3.6 Abnormal 5: Identify abnormal `StockCode` - `Description` pairs that are not actual products

**Check abnormal StockCodes**

In [None]:
excluded_stockcodes = ["POST", "DOT", "M", "C2", "BANK CHARGES","S", "B", "AMAZONFEE",
                       "gift_0001_10", "gift_0001_20","gift_0001_30","gift_0001_40","gift_0001_50"]

# Identify rows with exclued StockCode
df_excluded = df_cleaned.filter(col("StockCode").isin(excluded_stockcodes))

# Show distinct excluded StockCode - Description pairs
df_excluded.select("StockCode", "Description").distinct().show(truncate=False)

**Handle abnormal `StockCode` and `Description` pairs that are not actual products**

In [None]:
df_cleaned = df_cleaned.filter(~col("StockCode").isin(excluded_stockcodes))

In [None]:
# Re-check the abnormal stock code
df_excluded = df_cleaned.filter(col("StockCode").isin(excluded_stockcodes))
df_excluded.select("StockCode", "Description").distinct().show(truncate=False)

##### IV. Data Cleaning results

In [None]:
# The number of rows before cleaning
rows_before_cleaning = df.count()
print(f"Number of rows before cleaning: {rows_before_cleaning}")

In [None]:
# Check the number of rows after cleaning
rows_after_cleaning = df_cleaned.count()
print(f"Number of rows after cleaning: {rows_after_cleaning}")

##### V. Feature Engineering

###### 5.1 Create Recency, Frequency, and Monetary (RFM) features

Convert InvoiceDate to DateType

In [None]:
from pyspark.sql.functions import col, to_date

# Convert InvoiceDate from string to DateType
df_fe = df_cleaned.withColumn("InvoiceDate", to_date(col("InvoiceDate"), "M/d/yyyy H:mm"))
df_fe.printSchema()
df_fe.show(5)

In [None]:
# Get the max date of the dataset
max_date = df_fe.agg(max("InvoiceDate")).collect()[0][0]
max_date

In [None]:
# Calculate recency
recency_df = df_fe.groupBy("CustomerID").agg(
    datediff(lit(max_date), max("InvoiceDate")).alias("Recency")
)
recency_df.show()

In [None]:
# Calculate frequency (number of transactions per customer)
frequency_df = df_fe.groupBy("CustomerID").agg(
    countDistinct("InvoiceNo").alias("Frequency")
)
frequency_df.show()

In [None]:
# Calculate Monetary (Total money spent by customer)
monetary_df = df_fe.withColumn("TotalPrice", col("Quantity") * col("UnitPrice")) \
    .groupBy("CustomerID") \
    .agg(round(sum("TotalPrice"), 3).alias("Monetary"))
monetary_df.show()

In [None]:
# Join all RFM features
dfs = [recency_df, frequency_df, monetary_df]
rfm_df = reduce(lambda df1, df2: df1.join(df2, "CustomerID"), dfs)
rfm_df.show()

In [None]:
from pyspark.sql.functions import mean

rfm_df.select(
    mean("Recency").alias("Mean_Recency"),
    mean("Frequency").alias("Mean_Frequency"),
    mean("Monetary").alias("Mean_Monetary")
).show()


In [None]:
# Define thresholds based on the mean values
recency_threshold = 95
frequency_threshold = 7

# Create a new column 'Churn' based on the thresholds
rfm_df = rfm_df.withColumn(
    "Churn",
    when((col("Recency") > recency_threshold) & (col("Frequency") <= frequency_threshold), 1).otherwise(0)
)

# Show the resulting DataFrame with Churn column
rfm_df.select("CustomerID", "Recency", "Frequency", "Monetary", "Churn").show(50)

In [None]:
# join data
final_df= df_fe.join(rfm_df,on= 'CustomerID', how= 'left')
final_df.show()

### Customer Churn & Value Analysis: A 3Ps-G Framework Approach
- This analysis aims to understand customer churn behavior and uncover opportunities to drive long-term growth by applying the 3Ps-G Framework — a strategic lens that evaluates customer dynamics through four key dimensions: Place, People, Product, and Growth.
- The 3P-G framework is not academic but has emerged from practical growth operating systems used by growth-stage startups and tech companies (inspired by frameworks like AARRR or McKinsey’s 7S). It gained popularity among product managers, data analysts, and growth teams as a way to structure root cause analysis around user problems or retention issues

### Overall RFM by churn
- This is also conducted through the 3Ps-G frameworks:

In [None]:
final_df.groupBy("Churn") \
    .agg(
        avg("Recency").alias("Avg_Recency"),
        avg("Frequency").alias("Avg_Frequency"),
        avg("Monetary").alias("Avg_Monetary"),
        countDistinct("CustomerID").alias("Num_Customers")
    ).show()

### Customer Behavior Breakdown By RFM metrics
- Based on the data, churned customers account for nearly one-third of the total customer base (1,376 out of 4,335). Their behavior suggests a pattern of one-time purchases, as indicated by significantly higher recency and very low frequency and monetary values. Active users, on the other hand, outperformed churn customers in engagement time, number of purchases, and the revenue they brought in.

- Avg_Recency: ~16 days (vs. ~197 days for churned)

- Avg_Frequency: ~390 purchases (vs. ~2.3)

- Avg_Monetary: ~$422,701 (vs. ~$963)

These metrics showed that active customers spend approximately 43,800% more on average than churned ones — highlighting the urgent need of rententention strategies.

### Are there certain products/StockCodes more common in orders with high Recency (long since last purchase)?

In [None]:
from pyspark.sql.functions import col, percentile_approx, first
recency_threshold = final_df.select(
    percentile_approx("Recency", 0.75).alias("recency_75th")
).collect()[0]["recency_75th"]
high_recency_customers = final_df.filter(col("Recency") >= recency_threshold) \
    .select(['CustomerID'])
high_recency_txns = high_recency_customers.join(final_df, on="CustomerID", how="inner")
high_recency_products = high_recency_txns.groupBy("StockCode") \
    .count() \
    .orderBy(col("count").desc())
product_desc = final_df.select("StockCode", "Description") \
    .dropna() \
    .dropDuplicates(["StockCode"])

high_recency_products_with_desc = high_recency_products.join(product_desc, on="StockCode", how="left")
high_recency_products_with_desc.select("StockCode", "Description", "count") \
    .show(20, truncate=False)


### Summary Of Insights Of Long-Time Customers:
- Old-time buyers are showing strong interest in aesthetically appealing,  and seasonal home decor and gift items with a vintage vibe.
- Products like ALARM CLOCK BAKELIKE PINK, RED RETROSPOT SMALL MILK JUG, and BOX OF 6 MINI VINTAGE CRACKERS suggest a consistent preference for retro-style or vintage-themed items.
- WOODLAND HEIGHT CHART STICKERS, CHRISTMAS STAR WISH LIST CHALKBOARD, and FELT TOADSTOOL LARGE indicate that buyers are actively purchasing home decor, likely for seasonal holiday such as Christmas.
- TOADSTOOL MONEY BOX, OFFICE MUG WARMER CHOC+BLUE, and DOOR HANGER MUM + DADS ROOM suggests a high interest from office workers for a personalized, me-style everyday items such as mug



### Are there any popular products in customers with low recency (new customers):

In [None]:
from pyspark.sql.functions import col, percentile_approx, row_number
from pyspark.sql.window import Window

recency_threshold = final_df.select(
    percentile_approx("Recency", 0.25).alias("recency_25th")
).collect()[0]["recency_25th"]

low_recency_customers = final_df.filter(col("Recency") <= recency_threshold) \
    .select(['CustomerID'])

low_recency_txns = low_recency_customers.join(final_df, on="CustomerID", how="inner")

low_recency_products = low_recency_txns.groupBy("StockCode") \
    .count() \
    .orderBy(col("count").desc())

windowSpec = Window.partitionBy("StockCode").orderBy("Description")

product_desc = final_df.select("StockCode", "Description") \
    .dropna() \
    .withColumn("row_num", row_number().over(windowSpec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

low_recency_products_with_desc = low_recency_products.join(product_desc, on="StockCode", how="left")

low_recency_products_with_desc.select("StockCode", "Description", "count") \
    .show(20, truncate=False)


### Are churn rates higher in certain countries?

In [None]:
from pyspark.sql.functions import col, countDistinct, when
churn_by_country = final_df.groupBy("Country").agg(
    countDistinct(when(col("Churn") == 1, col("CustomerID"))).alias("Churned_Customers"),
    countDistinct(when(col("Churn") == 0, col("CustomerID"))).alias("Active_Customers")
)

churn_by_country = churn_by_country.withColumn(
    "Churn_Rate",
    col("Churned_Customers") / (col("Churned_Customers") + col("Active_Customers"))
)


In [None]:
churn_pd = churn_by_country.orderBy(col("Churn_Rate").desc()).toPandas()

In [None]:
print(churn_pd)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))
sns.barplot(data=churn_pd, x="Churn_Rate", y="Country", palette="Reds_r")
plt.title("Churn Rate by Country")
plt.xlabel("Churn Rate")
plt.ylabel("Country")
plt.grid(True, axis='x', linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()


### Churn Rate Breakdown By Countries
- United Kingdom — Largest Market: Customers: ~3,900 total (2706 active + 1211 churned) -> Main revenue driver, but churn rate is nearly 1/3 of total customers
- Countries like Singapore, Saudi Arabia, Lithuania have NaN churn rates due to missing active or churned customers
- Greece, Canada, Bahrain: Very high churn, but small customer base (1–3 active customers) — is this a outlier or data entry problem?
- Unspecified-> subscious entry-> maybe a data entry issue
- Austria: mid-size market, but notable churn rate
- Channel Islands: small size market, but notably high churn rate
- Finland, Norway, Germany, Spain: low churn rate, and mid-size market

### 3. People

### Which churned customers had high past value? Can they be targeted for reactivation?

In [None]:
quantiles = final_df.approxQuantile("Monetary", [0.75], 0.01)
high_value_threshold = quantiles[0]
high_value_churned = final_df.filter(
    (col("Churn") == 1) & (col("Monetary") >= high_value_threshold)
)
high_value_churned.select("CustomerID", "Recency", "Frequency", "Monetary").show(10)


 - No churned customers have past high values -> 1. they almost leave after 1 and second purchase maybe because of customer service or product-related problems.


### Which countries contribute the most revenue?

In [None]:
from pyspark.sql.functions import col, sum as _sum

# Compute revenue per country
revenue_by_country_df = final_df.withColumn("Revenue", col("Quantity") * col("UnitPrice")) \
    .groupBy("Country") \
    .agg(_sum("Revenue").alias("Total_Revenue")) \
    .orderBy(col("Total_Revenue").desc())
revenue_pd = revenue_by_country_df.toPandas()


In [None]:
import matplotlib.pyplot as plt

# Plot
plt.figure(figsize=(14, 7))
plt.bar(revenue_pd["Country"], revenue_pd["Total_Revenue"], color='skyblue')
plt.xticks(rotation=90)
plt.xlabel("Country")
plt.ylabel("Total Revenue")
plt.title("Revenue per Country")
plt.tight_layout()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

- The United Kingdom outperforming other countries in revenue-> suggesting a strong customer base in this country
- The revenue from the Netherlands, EIRE (Ireland), Germany, and France—while next in line—is negligible in comparison to the UK.
- Most top-performing countries are concentrated in the Europe

### 2. The YOY growth in revenue of each countries

In [None]:
from pyspark.sql.functions import year, lag, round
from pyspark.sql.window import Window

df_with_year = final_df.withColumn("Year", year("InvoiceDate"))
# revenue per year
revenue_per_year = df_with_year.groupBy("Country", "Year") \
    .agg(Fsum("Monetary").alias("YearlyRevenue"))

# yoy growth
window_spec = Window.partitionBy("Country").orderBy("Year")

revenue_growth = revenue_per_year.withColumn(
    "PreviousRevenue", lag("YearlyRevenue").over(window_spec)
).withColumn(
    "YoY_Growth", round(((col("YearlyRevenue") - col("PreviousRevenue")) / col("PreviousRevenue")) * 100, 2)
).orderBy("Country", "Year")

revenue_growth.show()


### YoY (Year Over Year Growth In Revenue) Walk Through:
- Channel Islands: +72,425% YoY (leading growing market)-> target them with new customer campaign to turn them into loyal customers
- Australia: +60,724% YoY (148K -> 90M) -> massive growing market
- Bahrain: -99.58% YoY -> almost frozen market
- Countries like Brazil, Canada, Czech Republic, European Community only start buying from us in 2011
- Austria: +2,403% YoY, Denmark: +1,406% YoY and Ireland (EIRE): +1,723% YoY. Grow slightly better than next year-> customer feedback analysis needed to discover how we can target these markets better

### Sales Trend Of Top 3 Countries In Revenue


In [None]:
from pyspark.sql.functions import col, sum as _sum

# Compute total revenue per country
top_countries_df = final_df.withColumn("Revenue", col("Quantity") * col("UnitPrice")) \
    .groupBy("Country") \
    .agg(_sum("Revenue").alias("Total_Revenue")) \
    .orderBy(col("Total_Revenue").desc()) \
    .limit(3)

top_countries = [row["Country"] for row in top_countries_df.collect()]
print("Top 3 Countries by Revenue:", top_countries)


In [None]:
from pyspark.sql.functions import month, year

# Filter only top 3 countries
filtered_df = final_df.filter(col("Country").isin(top_countries))

# Compute monthly revenue
monthly_revenue_trend = filtered_df.withColumn("Revenue", col("Quantity") * col("UnitPrice")) \
    .withColumn("Year", year("InvoiceDate")) \
    .withColumn("Month", month("InvoiceDate")) \
    .groupBy("Year", "Month", "Country") \
    .agg(_sum("Revenue").alias("Total_Revenue")) \
    .orderBy("Year", "Month", "Country")


In [None]:
# Convert to Pandas
monthly_pd = monthly_revenue_trend.toPandas()
monthly_pd["YearMonth"] = monthly_pd["Year"].astype(str) + "-" + monthly_pd["Month"].astype(str).str.zfill(2)

# Sort by time
monthly_pd = monthly_pd.sort_values(["Year", "Month"])


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(14, 6))
for country in top_countries:
    country_data = monthly_pd[monthly_pd["Country"] == country]
    plt.plot(country_data["YearMonth"], country_data["Total_Revenue"], label=country, marker='o')

plt.xticks(rotation=45)
plt.title("Monthly Revenue Trend of Top 3 Countries")
plt.xlabel("Year-Month")
plt.ylabel("Total Revenue")
plt.legend()
plt.tight_layout()
plt.grid(True)
plt.show()


- The UK is the top 1 performer in revenue, and revenue of this country suddenly spike in 11/2011 and significantly decrease in the next month-> a season purchasing behavior, strongly linked to
- Other countries in top 3 revenue often have a plateu-like purchasing pattern, with a slightly increase in mid-year period (7-8/2011)

### What are the top 10 best-selling products by quantity?

In [None]:
from pyspark.sql.functions import sum as _sum
top_products = final_df.groupBy("Description") \
    .agg(_sum("Quantity").alias("TotalQuantity")) \
    .orderBy("TotalQuantity", ascending=False)
top_products.show(10, truncate=False)


- The top-selling product is "PAPER CRAFT , LITTLE BIRDIE" with over 80,000 units sold, indicating a strong customer interest in affordable, decorative, and possibly DIY-themed items
- Other best selling rpoducts such as ceramic jar, or asstd designs, or other decorations-> customers have preference for unique, niche decorative products.

### Which products churned customers mostly buy and when they buy them?

In [None]:
from pyspark.sql.functions import col, sum as _sum, desc, month, year, to_date
churned_df = final_df.filter(col("Churn") == 1)

top_products = churned_df.groupBy("StockCode", "Description") \
    .agg(_sum("Quantity").alias("TotalQuantity")) \
    .orderBy(desc("TotalQuantity")) \
    .limit(3)
top_products.show(truncate=False)

- Worth-noticing is that, Stock Code: 23166 appears both in best-selling lists, and top purchasing for churn customers
- Other products are also fragile, and decorative items-> investigate the quality of products through customer feedbacks

### 5. Growth

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

product_df = final_df.withColumn("Year", F.year("InvoiceDate"))

# Calculate total revenue per product per year
product_year_revenue = product_df.groupBy("StockCode", "Description", "Year") \
    .agg(F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("YearlyRevenue"))
window_spec = Window.partitionBy("StockCode").orderBy("Year")

# Get previous year's revenue to calculate YoY growth
product_year_revenue = product_year_revenue.withColumn(
    "PreviousYearRevenue", F.lag("YearlyRevenue").over(window_spec))

# Calculate YoY growth %
product_year_revenue = product_year_revenue.withColumn(
    "YoY_Growth",
    (F.col("YearlyRevenue") - F.col("PreviousYearRevenue")) / F.col("PreviousYearRevenue") * 100)
product_year_revenue_filtered = product_year_revenue.filter(F.col("PreviousYearRevenue").isNotNull())
top_growth_products = product_year_revenue_filtered.orderBy(F.desc("YoY_Growth"))
top_growth_products.show(20, False)


- It is noticeble that specialize products for holidy events such as Christmas, are growing very fast. Our customers are also shifting their interest towards souvernirs, gifts, or special occassion items such as wedding day card.