<a href="https://colab.research.google.com/github/alarafat3001-coder/Arafat-github/blob/main/Copy_of_PySpark_Assignment_(2025_2026_B).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CW1 - PySpark: E-commerce Customer Churn Analysis with PySpark

In this assignment you will be tasked with exploring a dataset containing anonymous behavioral, demographic, and transactional data for 50,000 customers across a global e-commerce subscription platform. You should complete the exercises presented in the Google Colab Notebook below. This assignment will be graded using CodeGrade.

In [None]:
# CodeGrade Tag Init1

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# CodeGrade Tag Init2

# Apache Spark uses Java, so first we must install that
!sudo apt-get update -qq
!sudo apt-get install -y openjdk-8-jdk-headless -qq

# Unpack Spark from google drive
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"

# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# pyspark.sql.functions countains all the transformations and actions you will
# need

from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [None]:
# Load the game_sales_data.csv file into your notebook as a pyspark dataframe

CsvPath = '/content/drive/MyDrive/ecommerce_customer_churn_dataset.csv'

# Load .csv with header, ',' seperators and inferred schema
churnDF = spark.read\
                     .option('header', 'True')\
                     .option("escapeQuotes", "True")\
                     .option('sep', ',')\
                     .option('inferSchema', 'True')\
                     .csv(CsvPath)

In [None]:
# We drop all rows containing null values

churnDF = churnDF.na.drop()

In [None]:
# CodeGrade Tag Init3

churnDF.printSchema()
churnDF.show()

root
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Membership_Years: double (nullable = true)
 |-- Login_Frequency: double (nullable = true)
 |-- Session_Duration_Avg: double (nullable = true)
 |-- Pages_Per_Session: double (nullable = true)
 |-- Cart_Abandonment_Rate: double (nullable = true)
 |-- Wishlist_Items: double (nullable = true)
 |-- Total_Purchases: double (nullable = true)
 |-- Average_Order_Value: double (nullable = true)
 |-- Days_Since_Last_Purchase: double (nullable = true)
 |-- Discount_Usage_Rate: double (nullable = true)
 |-- Returns_Rate: double (nullable = true)
 |-- Email_Open_Rate: double (nullable = true)
 |-- Customer_Service_Calls: double (nullable = true)
 |-- Product_Reviews_Written: double (nullable = true)
 |-- Social_Media_Engagement_Score: double (nullable = true)
 |-- Mobile_App_Usage: double (nullable = true)
 |-- Payment_Method_Diversity: double 

# Exercise 1

In this exercise, we will compute a basic Key Performance Indicator table split by ``Churned``.

For each churn class, you must show the following columns: number of customers, mean LTV (where LTV = ``Lifetime_Value``), mean recency (where we take "recency" to mean ``Days_Since_Last_Purchase``), and mean number of purchases (use ``Total_Purchases``)

In [None]:
# CodeGrade Tag Ex1


exercise1_df = (
    churnDF.groupBy("Churned")
           .agg(
               F.count("*").alias("n_customers"),
               F.mean("Lifetime_Value").alias("mean_LTV"),
               F.mean("Days_Since_Last_Purchase").alias("mean_recency"),
               F.mean("Total_Purchases").alias("mean_purchases")
           )
)

exercise1_df.show()

+-------+-----------+------------------+------------------+------------------+
|Churned|n_customers|          mean_LTV|      mean_recency|    mean_purchases|
+-------+-----------+------------------+------------------+------------------+
|      1|       4932|1523.1739902676402| 37.09712084347121|12.052493917274928|
|      0|      13051|1485.4851329400074|26.994406558884375|14.225224120757025|
+-------+-----------+------------------+------------------+------------------+



# Exercise 2

In this exercise we will identify the countries with the highest churn.

Create a dataframe by grouping by ``Country``, and whose columns show number of customers, churn rate, and average LTV. Filter out small groups (where the number of customers is less than 200) and sort by churn rate descending.

In [None]:
# CodeGrade Tag Ex2


exercise2_df = (
    churnDF.groupBy("Country")
           .agg(
               F.count("*").alias("n_customers"),
               F.mean("Churned").alias("churn_rate"),
               F.mean("Lifetime_Value").alias("avg_LTV")
           )
           .filter(col("n_customers") >= 200)
           .orderBy(col("churn_rate").desc())
)

exercise2_df.show()


+---------+-----------+-------------------+------------------+
|  Country|n_customers|         churn_rate|           avg_LTV|
+---------+-----------+-------------------+------------------+
|Australia|       1477| 0.2870683818551117|1516.8457210561955|
|  Germany|       1739| 0.2823461759631972|1499.7421564117324|
|    India|       1254| 0.2783094098883573|  1539.74418660287|
|   Canada|       2143| 0.2753149790013999|1481.6044983667773|
|      USA|       6207| 0.2751731915579185|1490.7388706299319|
|       UK|       2679| 0.2732362821948488| 1495.419059350505|
|    Japan|        981| 0.2640163098878695|1497.4046687054024|
|   France|       1503|0.25216234198270127|1474.9250765136408|
+---------+-----------+-------------------+------------------+



# Exercise 3

In this exercise we will compare churn across ``Signup_Quarter`` and tenure buckets derived from ``Membership_Years``.

###a) Feature engineering

Create a new column called ``tenure_bin`` with the following bins:

* "<1y", if ``Membership_Years`` < 1
* "1–3y", if 1 <= ``Membership_Years`` < 3
* "3–5y", if 3 <= ``Membership_Years`` < 5
* "5y+", if ``Membership_Years`` >= 5

###b) cohort table

For each pair (Signup_Quarter, tenure_bin), compute:

* number of customers
* average churn rate
* mean LTV

Finally, sort by ``Signup_Quarter`` then ``tenure_bin``.

In [None]:
# CodeGrade Tag Ex3 a


churnDF = churnDF.withColumn(
    "tenure_bin",
    F.when(col("Membership_Years") < 1, "<1y")
     .when((col("Membership_Years") >= 1) & (col("Membership_Years") < 3), "1–3y")
     .when((col("Membership_Years") >= 3) & (col("Membership_Years") < 5), "3–5y")
     .otherwise("5y+")
)
# CodeGrade Tag Ex3 b

exercise3_df = (
    churnDF.groupBy("Signup_Quarter", "tenure_bin")
           .agg(
               F.count("*").alias("n_customers"),
               F.mean("Churned").alias("churn_rate"),
               F.mean("Lifetime_Value").alias("mean_LTV")
           )
           .orderBy("Signup_Quarter", "tenure_bin")
)

exercise3_df.show()


+--------------+----------+-----------+-------------------+------------------+
|Signup_Quarter|tenure_bin|n_customers|         churn_rate|          mean_LTV|
+--------------+----------+-----------+-------------------+------------------+
|            Q1|      1–3y|       2067| 0.2607643928398645| 1498.885384615386|
|            Q1|      3–5y|       1138|0.28910369068541303|1525.3039630931455|
|            Q1|       5y+|        761| 0.2706964520367937|1513.6032325886983|
|            Q1|       <1y|        610| 0.2901639344262295|1473.1064098360657|
|            Q2|      1–3y|       2036|0.29764243614931235|1477.2672642436169|
|            Q2|      3–5y|       1109|0.28043282236248873|1476.5880613165011|
|            Q2|       5y+|        713| 0.2805049088359046|1509.7086535764363|
|            Q2|       <1y|        625|             0.3056|1516.3123839999994|
|            Q3|      1–3y|       2050| 0.2629268292682927|1491.3493951219518|
|            Q3|      3–5y|       1168| 0.2688356164

# Exercise 4

For each pair (``Country``, ``City``), compute churn rate and compare it to the country’s overall churn rate.
Return only cities with enough volume and unusually high churn relative to their country.

Output columns must be: ``Country``, ``City``, ``n_customers``, ``city_churn``, ``country_churn``, ``churn_delta``


In [None]:
# CodeGrade Tag Ex4


# Country churn baseline
country_df = (
    churnDF.groupBy("Country")
           .agg(
               F.mean("Churned").alias("country_churn")
           )
)

# City churn rates
city_df = (
    churnDF.groupBy("Country", "City")
           .agg(
               F.count("*").alias("n_customers"),
               F.mean("Churned").alias("city_churn")
           )
)

# Compare city churn vs country churn
exercise4_df = (
    city_df.join(country_df, on="Country")
           .withColumn("churn_delta", col("city_churn") - col("country_churn"))
           .filter(col("n_customers") >= 200)
           .filter(col("churn_delta") > 0)
           .select(
               "Country", "City", "n_customers",
               "city_churn", "country_churn", "churn_delta"
           )
           .orderBy(col("churn_delta").desc())
)

exercise4_df.show()


+---------+---------+-----------+-------------------+-------------------+--------------------+
|  Country|     City|n_customers|         city_churn|      country_churn|         churn_delta|
+---------+---------+-----------+-------------------+-------------------+--------------------+
|    India|    Delhi|        248| 0.3225806451612903| 0.2783094098883573| 0.04427123527293303|
|   Canada|  Toronto|        449|0.31403118040089084| 0.2753149790013999|0.038716201399490946|
|   France|    Paris|        287| 0.2857142857142857|0.25216234198270127| 0.03355194373158443|
|Australia|   Sydney|        310| 0.3193548387096774| 0.2870683818551117|0.032286456854565715|
|Australia|Melbourne|        271|0.31365313653136534| 0.2870683818551117|0.026584754676253652|
|  Germany|   Berlin|        343|0.30612244897959184| 0.2823461759631972|0.023776273016394622|
|  Germany|  Cologne|        371| 0.3045822102425876| 0.2823461759631972| 0.02223603427939036|
|   Canada| Montreal|        451|0.297117516629711

# Exercise 5

In general, reading churn rate alone can mislead. In this exercise, you must build a table by grouping by signup quarter that measures value lost to churn.

First, you must group by ``Signup_Quarter``, then aggregate by number of customers, churn rate (i.e. average churn), total LTV, churned LTV (i.e. sum of LTV among churned), and the fraction churned (i.e. churned LTV divided by total LTV).



In [None]:
# CodeGrade Tag Ex5


exercise5_df = (
    churnDF.groupBy("Signup_Quarter")
           .agg(
               F.count("*").alias("n_customers"),
               F.mean("Churned").alias("churn_rate"),
               F.sum("Lifetime_Value").alias("total_LTV"),
               F.sum(
                   F.when(col("Churned") == 1, col("Lifetime_Value"))
                    .otherwise(0)
               ).alias("churned_LTV")
           )
           .withColumn(
               "fraction_churned",
               col("churned_LTV") / col("total_LTV")
           )
           .orderBy("Signup_Quarter")
)

exercise5_df.show()


+--------------+-----------+-------------------+-----------------+------------------+-------------------+
|Signup_Quarter|n_customers|         churn_rate|        total_LTV|       churned_LTV|   fraction_churned|
+--------------+-----------+-------------------+-----------------+------------------+-------------------+
|            Q1|       4576|0.27338286713286714|6884438.970000012| 1923971.310000001|   0.27946668107365|
|            Q2|       4483|0.29176890475128264|6669369.820000012|1956843.5799999996|0.29340756815311764|
|            Q3|       4552| 0.2662565905096661|6776289.480000011|1819802.0300000021| 0.2685543519607724|
|            Q4|       4372|0.26555352241537056|6569262.320000003|1811677.2000000004|0.27578091903627916|
+--------------+-----------+-------------------+-----------------+------------------+-------------------+

