In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.1/434.1 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:04[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.9 (from pyspark)
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741239 sha256=754f5908cb161df7900b66812ac1d7610b2a065c0ea64e910f6ffd14237752c4
  Stored in directory: /Users/madhav/Library/Caches/pip/wheels/2d/77/9b/12660be70f7f447940a0caede37ae208b2e0d1c8487dce52a6
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.9 pyspark-4.0.0
Note: you may need to restart the kernel to use updated pa

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml import Pipeline

In [5]:
# Create SparkSession
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# Load the data
df = spark.read.csv("customer_data.csv", header=True, inferSchema=True)
df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/03 09:24:33 WARN Utils: Your hostname, Madhavs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.29.26 instead (on interface en0)
25/09/03 09:24:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/03 09:24:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----------+------+------------------+----------------------+--------------+------------------+-----+
|customer_id|tenure|   monthly_charges|customer_service_calls| contract_type|       total_spend|churn|
+-----------+------+------------------+----------------------+--------------+------------------+-----+
|      49360|    28| 83.10578497022318|                     5|      Two year|2421.3282252871136|    0|
|     579739|     4|115.51495708804909|                     2|Month-to-month| 544.0921915409324|    1|
|     176055|    28| 82.07744951126622|                     4|Month-to-month| 2564.718560335923|    1|
|     644304|    71|138.45030504249746|                     6|      Two year| 9198.776074232932|    1|
|     382184|    95| 80.25744812322104|                     8|      Two year| 8078.927108991743|    0|
+-----------+------+------------------+----------------------+--------------+------------------+-----+
only showing top 5 rows


In [6]:
# Prepare the data for modeling
# 1. Handle categorical variables
categorical_cols = ['contract_type']
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_vec") for col in categorical_cols]

In [7]:
# 2. Combine all features into a single vector
feature_cols = ['tenure', 'monthly_charges', 'customer_service_calls', 'total_spend']
feature_cols_final = feature_cols + [f"{col}_vec" for col in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols_final, outputCol="features")

In [8]:
# 3. Create the two models
lr = LogisticRegression(labelCol="churn", featuresCol="features") # Model A (Control)
gbt = GBTClassifier(labelCol="churn", featuresCol="features", maxIter=10) # Model B (Challenger)

In [9]:
# 4. Create a single pipeline for each model
pipeline_lr = Pipeline(stages=indexers + encoders + [assembler, lr])
pipeline_gbt = Pipeline(stages=indexers + encoders + [assembler, gbt])

# Split data for training and testing
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [11]:
# Train the models
model_A = pipeline_lr.fit(train_df)
model_B = pipeline_gbt.fit(train_df)

In [12]:
# Save the models
model_A.write().overwrite().save("model_A")
model_B.write().overwrite().save("model_B")

In [13]:
# Save the test data to evaluate later
test_df.write.parquet("test_data.parquet", mode="overwrite")

print("Models trained and saved.")

Models trained and saved.
