In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [None]:
!pip install pyspark==3.5.0

Collecting pyspark==3.5.0
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425346 sha256=8694390ea85e1d894857a9e54d78537e130bd6afd2bb1c9f20212a37fcad1dcd
  Stored in directory: /root/.cache/pip/wheels/84/40/20/65eefe766118e0a8f8e385cc3ed6e9eb7241c7e51cfc04c51a
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the fo

In [None]:
from pyspark.sql import SparkSession

# SparkSession
spark = SparkSession.builder \
    .appName("TelcoChurn") \
    .master("local[*]") \
    .getOrCreate()

spark


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Drive CSV path
file_path = "/content/drive/MyDrive/datasets/telcoDataset/telcoDataset.csv"

# PySpark DataFrame load
df = spark.read.csv(file_path, header=True, inferSchema=True)

# 5 first rows
df.show(5)

# schema
df.printSchema()


+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [None]:
from pyspark.sql.functions import col

# Handle empty or non-numeric values
df = df.withColumn("TotalCharges",
                   col("TotalCharges").cast("double"))

# Check for null values in the TotalCharges column
df.filter(col("TotalCharges").isNull()).count()


11

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# types transformation
# TotalCharges: string → double
df = df.withColumn("TotalCharges", col("TotalCharges").cast("double"))

# Check for missing values in TotalCharges
null_count = df.filter(col("TotalCharges").isNull()).count()
print(f"Missing values in TotalCharges: {null_count}")

# if nulls -> fill with median value
if null_count > 0:
    median_total = df.approxQuantile("TotalCharges", [0.5], 0.0)[0]
    df = df.na.fill({"TotalCharges": median_total})

# target transformation
# Churn Yes/No → label 1/0
churn_indexer = StringIndexer(inputCol="Churn", outputCol="label")
df = churn_indexer.fit(df).transform(df)

# delete customer id and churn collumns
df = df.drop("customerID", "Churn")

# Encoding categorical features
categorical_cols = ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines",
                    "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection",
                    "TechSupport", "StreamingTV", "StreamingMovies", "Contract",
                    "PaperlessBilling", "PaymentMethod"]

# StringIndexer for all categorical collumns
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name+"_Index")
            for col_name in categorical_cols]

# OneHotEncoder for all indexed collumns
encoder = OneHotEncoder(inputCols=[idx.getOutputCol() for idx in indexers],
                        outputCols=[col_name+"_OHE" for col_name in categorical_cols])

# indexers train
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + [encoder])
df = pipeline.fit(df).transform(df)

# show results
df.select(["label", "TotalCharges", "MonthlyCharges"] + [col+"_OHE" for col in categorical_cols]).show(5, truncate=False)


Missing values in TotalCharges: 11
+-----+------------+--------------+-------------+-------------+--------------+----------------+-----------------+-------------------+------------------+----------------+--------------------+---------------+---------------+-------------------+-------------+--------------------+-----------------+
|label|TotalCharges|MonthlyCharges|gender_OHE   |Partner_OHE  |Dependents_OHE|PhoneService_OHE|MultipleLines_OHE|InternetService_OHE|OnlineSecurity_OHE|OnlineBackup_OHE|DeviceProtection_OHE|TechSupport_OHE|StreamingTV_OHE|StreamingMovies_OHE|Contract_OHE |PaperlessBilling_OHE|PaymentMethod_OHE|
+-----+------------+--------------+-------------+-------------+--------------+----------------+-----------------+-------------------+------------------+----------------+--------------------+---------------+---------------+-------------------+-------------+--------------------+-----------------+
|0.0  |29.85       |29.85         |(1,[],[])    |(1,[],[])    |(1,[0],[1.0]) 

In [None]:
from pyspark.ml.feature import VectorAssembler

# Prepare list of feature columns
numeric_cols = ["MonthlyCharges", "TotalCharges"]

# One-hot encoded categorical columns
ohe_cols = [col+"_OHE" for col in ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines",
                                   "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection",
                                   "TechSupport", "StreamingTV", "StreamingMovies", "Contract",
                                   "PaperlessBilling", "PaymentMethod"]]

all_features = numeric_cols + ohe_cols

# VectorAssembler
assembler = VectorAssembler(inputCols=all_features, outputCol="features")
df = assembler.transform(df)

# show results(five first labels after vectorization)
df.select("label", "features").show(5, truncate=False)

# Train/Test Split
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)
print(f"Train rows: {train_df.count()}, Test rows: {test_df.count()}")


+-----+----------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                    |
+-----+----------------------------------------------------------------------------------------------------------------------------+
|0.0  |(28,[0,1,4,9,10,13,14,16,18,20,22,24,25],[29.85,29.85,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                         |
|0.0  |(28,[0,1,2,3,4,5,6,9,11,12,15,16,18,20,26],[56.95,1889.5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])              |
|1.0  |(28,[0,1,2,3,4,5,6,9,11,13,14,16,18,20,22,24,26],[53.85,108.15,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|0.0  |(28,[0,1,2,3,4,9,11,12,15,17,18,20,27],[42.3,1840.75,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                          |
|1.0  |(28,[0,1,3,4,5,6,8,10,12,14,16,18,20,22,24,25],[70.7,151.65,1.

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import time

# Random Forest model creation
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=400,
    maxDepth=16,
    seed=42
)

# spark Pipeline
pipeline_rf = Pipeline(stages=[rf])

start_time = time.time()
print("Training Random Forest...")

rf_model = pipeline_rf.fit(train_df)

end_time = time.time()
print(f"Training completed in {end_time - start_time:.2f} seconds.")


print("Random Forest trained with {} trees.".format(rf_model.stages[0].getNumTrees))



Training Random Forest...
Training completed in 272.87 seconds.
Random Forest trained with 400 trees.


In [None]:
# Performance analysis

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Prediction on test set
predictions = rf_model.transform(test_df)

# Prediction results(five first rows)
predictions.select("label", "prediction", "probability").show(5, truncate=False)

# Accuracy, F1-score, AUC
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)

evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test F1-score: {f1:.4f}")
print(f"Test AUC: {auc:.4f}")

# Confusion Matrix
confusion = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
print("\nConfusion Matrix:")
confusion.show()


+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0.0  |1.0       |[0.19423605648653466,0.8057639435134654]|
|1.0  |0.0       |[0.5828724589587136,0.41712754104128646]|
|0.0  |0.0       |[0.6220733355366334,0.37792666446336654]|
|1.0  |1.0       |[0.4460315522681298,0.5539684477318703] |
|0.0  |0.0       |[0.7248097863292838,0.2751902136707162] |
+-----+----------+----------------------------------------+
only showing top 5 rows

Test Accuracy: 0.7982
Test F1-score: 0.7838
Test AUC: 0.8435

Confusion Matrix:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0| 1357|
|  0.0|       1.0|  113|
|  1.0|       0.0|  292|
|  1.0|       1.0|  245|
+-----+----------+-----+

