# Customer Churn Project — Part 1: Data Preparation and Cleaning

This notebook is the starting point for the Customer Churn Analysis Project. Its primary goals are to ingest, explore, clean, and augment the customer activity logs to build a robust dataset for understanding user behavior and churn.

Specifically, in this notebook, I:

1. Load the raw  Customer Interaction Log Data (JSON format) into a Spark DataFrame.
2. Explore and inspect the data schema and content.
3. Engineer a churn label, date and prtiod columns, location columns for each user by identifying cancellation events.
5. Validate the processed dataset to ensure it is ready for downstream exploratory data analysis (see [02_EDA.ipynb](02_EDA.ipynb)).

The focus here is on data integrity, reproducibility, and preparing a clean analytical foundation for modeling customer retention and churn in later stages of the project.

In [18]:
import json
import pandas as pd
import os
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import from_unixtime, col, month, year, to_date

### I. Data Loading

In [13]:
spark = SparkSession.builder \
    .appName("CustomerChurn") \
    .master("local[*]") \
    .getOrCreate()

data = spark.read.json("../data/customer_churn_data.json")

# check schema and rows
data.printSchema()
data.show(5)
print(data.count(), "rows")

                                                                                

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------------------+---------+---------+------+-------------+--------+----------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|              artist|     auth|firstName|gender|itemInSession|lastName|    length|



26259199 rows


                                                                                

### II. Creating New Variables

#### 1. Churn Flag

I create the variable `churn_flag` to store the unique user IDs of customers who triggered the “Cancellation Confirmation” event in the activity logs.
This variable serves as the basis for identifying churned users, enabling the addition of a binary churn indicator (churn_flag) to the main dataset.

In [14]:
# users who canceled subscription)
churn_users = (
    data.filter(F.col("page") == "Cancellation Confirmation")
      .select("userId")
      .distinct()
)

data = (
    data.join(churn_users.withColumn("churn_flag", F.lit(1)), on="userId", how="left")
      .fillna({"churn_flag": 0})
)

# churn counts per user
data.select("userId", "churn_flag").distinct().groupBy("churn_flag").count().show()

[Stage 23:>                                                         (0 + 8) / 9]

+----------+-----+
|churn_flag|count|
+----------+-----+
|         1| 5003|
|         0|17275|
+----------+-----+



                                                                                

The records below show users who triggered the “Cancellation Confirmation” event. These users are considered churned, as they have canceled their subscriptions and are expected to stop using the application afterward.

In [16]:
data.filter(F.col("page") == "Cancellation Confirmation") \
  .show(10, truncate=False)

                                                                                

+-------+------+---------+---------+------+-------------+---------+------+-----+------------------------------------------+------+-------------------------+-------------+---------+----+------+-------------+--------------------------------------------------------------------------------------------------------------------------+----------+
|userId |artist|auth     |firstName|gender|itemInSession|lastName |length|level|location                                  |method|page                     |registration |sessionId|song|status|ts           |userAgent                                                                                                                 |churn_flag|
+-------+------+---------+---------+------+-------------+---------+------+-----+------------------------------------------+------+-------------------------+-------------+---------+----+------+-------------+----------------------------------------------------------------------------------------------------------------

                                                                                

#### 2. State and City

In [17]:
# Add 'state' and 'city' columns to main dataset 'data'
data = data.withColumn("state", F.split(F.col("location"), ", ").getItem(1))
data = data.withColumn("city", F.split(F.col("location"), ", ").getItem(0))

### 3. Date, Month and Year

In [19]:
data = data.withColumn("datetime", from_unixtime(col("ts")/1000, "yyyy-MM-dd HH:mm:ss")) \
           .withColumn("date", to_date(from_unixtime(col("ts")/1000))) \
           .withColumn("month", month(from_unixtime(col("ts")/1000))) \
           .withColumn("year", year(from_unixtime(col("ts")/1000)))

In [23]:
data.columns

['userId',
 'artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'churn_flag',
 'state',
 'city',
 'datetime',
 'date',
 'month',
 'year']

### III. Save data

In [24]:
data.write.mode("overwrite").parquet("../data/cleaned_churn_data.parquet")

25/10/13 14:12:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/13 14:12:40 WARN MemoryManager: Total allocation exceeds 95.00% 

25/10/13 17:13:18 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 977056 ms exceeds timeout 120000 ms
25/10/13 17:13:18 WARN SparkContext: Killing executors is not supported by current scheduler.
25/10/13 17:13:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o

Checking whether users stopped used app after cancelation