Customer churn is a critical business metric.
This project aims to:


*   Understand customer behavior through session, transaction, and service data
*   Engineer meaningful churn predictors
*   Build a scalable churn prediction model using Spark ML



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Assuming SparkSession is already created as 'spark'
# If not, create it:
spark = SparkSession.builder.appName('Session1').getOrCreate()

# Define the path to your CSV file on Google Drive
drive_csv_path = '/content/drive/MyDrive/Colab Notebooks/Digital_Marketing/data1.csv'

# Read the CSV file into a PySpark DataFrame
# You might need to specify options like 'header', 'inferSchema', and 'delimiter'
# For your data, you mentioned delimiter=';'
df = spark.read.csv(drive_csv_path, header=True, inferSchema=True, sep=';')

# Display the schema and first few rows to verify
print("Schema of DataFrame read from Drive:")
df.printSchema()
print("First 5 rows of DataFrame read from Drive:")
df.show(5)

Schema of DataFrame read from Drive:
root
 |-- account length: integer (nullable = true)
 |-- location code: integer (nullable = true)
 |-- user id: integer (nullable = true)
 |-- credit card info save: string (nullable = true)
 |-- push status: string (nullable = true)
 |-- add to wishlist: integer (nullable = true)
 |-- desktop sessions: integer (nullable = true)
 |-- app sessions: integer (nullable = true)
 |-- desktop transactions: integer (nullable = true)
 |-- total product detail views: integer (nullable = true)
 |-- session duration: integer (nullable = true)
 |-- promotion clicks: integer (nullable = true)
 |-- avg order value: string (nullable = true)
 |-- sale product views: integer (nullable = true)
 |-- discount rate per visited products: string (nullable = true)
 |-- product detail view per app session: string (nullable = true)
 |-- app transactions: integer (nullable = true)
 |-- add to cart per session: string (nullable = true)
 |-- customer service calls: integer (null

In [None]:
from pyspark.sql.functions import col, count

# Detect skew
df.groupBy("location code") \
  .agg(count("*").alias("cnt")) \
  .orderBy(col("cnt").desc()) \
  .show()

+-------------+----+
|location code| cnt|
+-------------+----+
|          415|1655|
|          510| 840|
|          408| 838|
+-------------+----+



In [None]:
# Creating relative behavior features per location or user cohort.
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

window_loc = Window.partitionBy("location code")

df = df.withColumn(
    "avg_app_sessions_by_location",
    avg("app sessions").over(window_loc)
)

In [None]:
# Measure how effective sessions are at driving value.

from pyspark.sql.functions import col, try_divide

df = df.withColumn(
    "transaction_per_session",
    try_divide(
        (col("desktop transactions") + col("app transactions")),
        (col("desktop sessions") + col("app sessions"))
    )
)

In [None]:
# Bucket customers into churn risk groups.

from pyspark.sql.functions import when
from pyspark.sql.functions import col

df = df.withColumn(
    "churn_risk",
    when(col("customer service calls") >= 4, "High")
    .when(col("customer service calls") >= 2, "Medium")
    .otherwise("Low")
)

# Display the new column along with customer service calls to verify
df.select("customer service calls", "churn_risk").show(5)

+----------------------+----------+
|customer service calls|churn_risk|
+----------------------+----------+
|                     1|       Low|
|                     1|       Low|
|                     0|       Low|
|                     2|    Medium|
|                     3|    Medium|
+----------------------+----------+
only showing top 5 rows


In [None]:
# Convert categorical features for modeling.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCols=["credit card info save", "push status"], # Corrected column names
    outputCols=["cc_saved_idx", "push_status_idx"]
)

df_encoded = indexer.fit(df).transform(df)

# Display the new columns to verify
df_encoded.select("credit card info save", "cc_saved_idx", "push status", "push_status_idx").show(5)

+---------------------+------------+-----------+---------------+
|credit card info save|cc_saved_idx|push status|push_status_idx|
+---------------------+------------+-----------+---------------+
|                   no|         0.0|        yes|            1.0|
|                   no|         0.0|        yes|            1.0|
|                   no|         0.0|         no|            0.0|
|                  yes|         1.0|         no|            0.0|
|                  yes|         1.0|         no|            0.0|
+---------------------+------------+-----------+---------------+
only showing top 5 rows


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Step 1: StringIndexer to convert 'churn_risk' into numerical indices
indexer_churn_risk = StringIndexer(
    inputCol="churn_risk",
    outputCol="churn_risk_indexed"
)

df_indexed = indexer_churn_risk.fit(df_encoded).transform(df_encoded)

# Step 2: OneHotEncoder to convert the indexed column into a one-hot vector
encoder = OneHotEncoder(
    inputCol="churn_risk_indexed",
    outputCol="churn_risk_one_hot"
)

df_encoded = encoder.fit(df_indexed).transform(df_indexed)

# Display the original, indexed, and one-hot encoded columns to verify
df_encoded.select("churn_risk", "churn_risk_indexed", "churn_risk_one_hot").show(5, truncate=False)

+----------+------------------+------------------+
|churn_risk|churn_risk_indexed|churn_risk_one_hot|
+----------+------------------+------------------+
|Low       |0.0               |(2,[0],[1.0])     |
|Low       |0.0               |(2,[0],[1.0])     |
|Low       |0.0               |(2,[0],[1.0])     |
|Medium    |1.0               |(2,[1],[1.0])     |
|Medium    |1.0               |(2,[1],[1.0])     |
+----------+------------------+------------------+
only showing top 5 rows


In [None]:
# Check which features correlate with churn.

from pyspark.sql.functions import col, corr, regexp_replace
from pyspark.sql.types import FloatType

# Convert string columns with comma decimals to FloatType
df_encoded = df_encoded.withColumn(
    "discount_rate_per_visited_products_numeric",
    regexp_replace(col("discount rate per visited products"), ",", ".").cast(FloatType())
).withColumn(
    "avg_order_value_numeric",
    regexp_replace(col("avg order value"), ",", ".").cast(FloatType())
)

# Calculate correlations with the 'churn' column
df_encoded.select(
    corr("customer service calls", "churn").alias("cs_calls_corr"),
    corr("discount_rate_per_visited_products_numeric", "churn").alias("discount_corr"),
    corr("avg_order_value_numeric", "churn").alias("aov_corr")
).show()

+------------------+-------------------+--------------------+
|     cs_calls_corr|      discount_corr|            aov_corr|
+------------------+-------------------+--------------------+
|0.2087499987837943|0.03549555698872654|0.035492855996288104|
+------------------+-------------------+--------------------+



In [None]:
df_encoded.show(5)

+--------------+-------------+-------+---------------------+-----------+---------------+----------------+------------+--------------------+--------------------------+----------------+----------------+---------------+------------------+----------------------------------+-----------------------------------+----------------+-----------------------+----------------------+-----+----------------------------+-----------------------+----------+------------+---------------+------------------+------------------+------------------------------------------+-----------------------+
|account length|location code|user id|credit card info save|push status|add to wishlist|desktop sessions|app sessions|desktop transactions|total product detail views|session duration|promotion clicks|avg order value|sale product views|discount rate per visited products|product detail view per app session|app transactions|add to cart per session|customer service calls|churn|avg_app_sessions_by_location|transaction_per_sess

In [None]:
# Drop the specified columns, including the unsupported 'churn_risk_one_hot' vector column
df_dropped = df_encoded.drop("credit card info save", "push status", "churn_risk", "churn_risk_one_hot")

# Avoid too many small output files.
df_dropped.coalesce(10) \
  .write \
  .mode("overwrite") \
  .option("header", True) \
  .csv("/content/drive/MyDrive/Colab Notebooks/Digital_Marketing/optimized_output.csv")