<a href="https://colab.research.google.com/github/Danh1905/Scalable-Project/blob/main/scalableProj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

To upload a file from your local system, you can use `google.colab.files.upload()`:

In [6]:
from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
  print(f'User uploaded file "{fn}" with length {len(uploaded[fn])} bytes')

Saving Customer-Churn-Prediction.csv to Customer-Churn-Prediction.csv
User uploaded file "Customer-Churn-Prediction.csv" with length 977501 bytes


Install pyspark


In [7]:
!pip install pyspark py4j



In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read CSV").getOrCreate()
churn_rate = spark.read.csv("/content/Customer-Churn-Prediction.csv", header=True, inferSchema=True)
churn_rate.printSchema()


root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



**Preprocessing**

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, sum, when, trim, lit

#Drop the useless ID
churn_rate = churn_rate.drop("customerID")

# Fix TotalCharges (Convert String to Number)
# Replace empty strings with nulls first, then cast to Double, and fill actual nulls with 0
churn_rate = churn_rate.withColumn("TotalCharges",
                                   when(trim(col("TotalCharges")) == "", lit(None)).otherwise(col("TotalCharges")))
churn_rate = churn_rate.withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
churn_rate = churn_rate.fillna(0, subset=["TotalCharges"])

# Check null values
def count_nulls(churn_rate):
    null_counts = churn_rate.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in churn_rate.columns
    ])
    return null_counts

null_churn_rate = count_nulls(churn_rate)
null_churn_rate.show()

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0|            0|             0|           0|   

In [10]:
# ENCODE ALL TEXT COLUMNS

# List of columns that are text (categorical) and need to be numbers
# Note: I excluded 'Churn' because we handle it separately as the label
categorical_cols = [
    "gender", "Partner", "Dependents", "PhoneService", "MultipleLines",
    "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection",
    "TechSupport", "StreamingTV", "StreamingMovies", "Contract",
    "PaperlessBilling", "PaymentMethod"
]

# This loop converts each text column into a number column
# Example: "gender" becomes "gender_index" (0 or 1)
indexers = []
for column in categorical_cols:
    indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
    churn_rate = indexer.fit(churn_rate).transform(churn_rate)

# Don't forget to encode the Target variable (Churn) too!
label_indexer = StringIndexer(inputCol="Churn", outputCol="label")
churn_rate = label_indexer.fit(churn_rate).transform(churn_rate)



In [11]:
# ASSEMBLE

# Gather all the NEW numbered columns + the original numerical columns
input_features = [c + "_index" for c in categorical_cols] + \
                 ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"]

# Squash them into one vector
assembler = VectorAssembler(inputCols=input_features, outputCol="features")
final_data = assembler.transform(churn_rate)

# Show the result
final_data.select("features", "label").show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(19,[0,1,3,4,5,7,...|  0.0|
|(19,[5,6,8,12,13,...|  0.0|
|(19,[5,6,7,14,16,...|  1.0|
|[0.0,0.0,0.0,1.0,...|  0.0|
|(19,[0,16,17,18],...|  1.0|
+--------------------+-----+
only showing top 5 rows


In [12]:
from pyspark.ml.feature import StandardScaler

# SCALE THE FEATURES

scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)

# Compute the scaling statistics
scalerModel = scaler.fit(final_data)

# Apply the scaling
data_ready = scalerModel.transform(final_data)

# Show the difference
data_ready.select("features", "scaled_features").show(5, truncate=False)

+-----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                           |scaled_features                                                                                                                                                                                                                                                |
+-----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Split data

In [14]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [15]:


# STEP 5: SPLIT DATA

#80% (train & 20% (test)
train_data, test_data = data_ready.randomSplit([0.8, 0.2], seed=42)

print(f"Train: {train_data.count()}")
print(f"Test: {test_data.count()}")

Train: 5698
Test: 1345


In [19]:
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

In [20]:
lrm =lr.fit(train_data)

In [22]:
trainingSummary = lrm.summary

In [30]:
predictions = lrm.transform(test_data)

print("Result of prediction :")
predictions.select("prediction", "label", "probability").show(5)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy of the Model : {accuracy:.2%}")

Result of prediction :
+----------+-----+--------------------+
|prediction|label|         probability|
+----------+-----+--------------------+
|       1.0|  0.0|[0.43998226671737...|
|       1.0|  1.0|[0.38879493641823...|
|       0.0|  0.0|[0.64019648497660...|
|       0.0|  0.0|[0.58344828683453...|
|       0.0|  1.0|[0.57250268816098...|
+----------+-----+--------------------+
only showing top 5 rows
Accuracy of the Model : 80.74%


In [33]:
# Accuracy in train dataset
print(f"Accuracy: {trainingSummary.accuracy:.2%}")

# Area Under ROC (càng gần 1 càng tốt)
print(f"Area Under ROC: {trainingSummary.areaUnderROC:.4f}")

# FPR: False Positive Rate, TPR: True Positive Rate
print("ROC:")
trainingSummary.roc.show(5)

# 4.Objective History
print("Objective History:")
for i, loss in enumerate(trainingSummary.objectiveHistory):
    print(f"Iteration {i}: {loss}")

Accuracy: 80.84%
Area Under ROC: 0.8430
ROC:
+--------------------+--------------------+
|                 FPR|                 TPR|
+--------------------+--------------------+
|                 0.0|                 0.0|
|                 0.0| 0.00333555703802535|
|                 0.0|  0.0066711140760507|
|2.381519409383186...|0.009339559706470981|
|2.381519409383186...|0.012675116744496331|
+--------------------+--------------------+
only showing top 5 rows
Objective History:
Iteration 0: 0.5762485159619298
Iteration 1: 0.5327361899350359
Iteration 2: 0.45399520952730466
Iteration 3: 0.4378860236276876
Iteration 4: 0.4324420932275601
Iteration 5: 0.4236087304550611
Iteration 6: 0.42196237200370273
Iteration 7: 0.4215173468459716
Iteration 8: 0.42115565106767644
Iteration 9: 0.420752534740688
Iteration 10: 0.4203338204634572
Iteration 11: 0.4194805358269721
Iteration 12: 0.41867967494528674
Iteration 13: 0.4178052349780378
Iteration 14: 0.41758867515712506
Iteration 15: 0.41757474381

In [36]:
#Confusion Matrix
print("Confusion Matrix:")
predictions.groupby('label', 'prediction').count().show()

Confusion Matrix:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  197|
|  0.0|       1.0|   86|
|  1.0|       0.0|  173|
|  0.0|       0.0|  889|
+-----+----------+-----+

