In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TelcoChurnWithPySpark").getOrCreate()

In [None]:
df = spark.read.csv("WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)
df.show(5)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [None]:
print("Rows:", df.count(), "Columns:", len(df.columns))
df.printSchema()
df.groupBy("Churn").count().show()

Rows: 7043 Columns: 21
root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 5174|
| 

In [None]:
from pyspark.sql.functions import col

df = df.na.drop()
df = df.withColumn("TotalCharges", col("TotalCharges").cast("float"))

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

categoricalCols = [c for c, t in df.dtypes if t == "string" and c not in ["customerID", "Churn"]]
stages = []

for col_name in categoricalCols:
    idx = StringIndexer(inputCol=col_name, outputCol=col_name + "Idx")
    enc = OneHotEncoder(
        inputCols=[col_name + "Idx"],
        outputCols=[col_name + "Vec"]
    )
    stages += [idx, enc]

# Index the target
label_indexer = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_indexer]

# Assemble all features
numericCols = [c for c, t in df.dtypes if t in ["double", "int", "float"]]
assemblerInputs = [c + "Vec" for c in categoricalCols] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid= "keep")
stages += [assembler]


In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
model = pipeline.fit(df)
df_transformed = model.transform(df)

df_transformed.select("features", "label").show(5, truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                       |label|
+-----------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(30,[2,7,8,11,12,14,16,18,20,22,23,27,28,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,29.85,29.850000381469727])                       |0.0  |
|(30,[0,1,2,3,4,7,9,10,13,14,16,18,24,27,28,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,34.0,56.95,1889.5])                        |0.0  |
|(30,[0,1,2,3,4,7,9,11,12,14,16,18,20,22,24,27,28,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,53.85,108.1500015258789])|1.0  |
|(30,[0,1,2,7,9,10,13,15,16,18,25,27,28,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0

In [None]:
train, test = df_transformed.randomSplit([0.7, 0.3], seed=42)
print("Train Rows:", train.count(), "Test Rows:", test.count())


Train Rows: 5036 Test Rows: 2007


In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [None]:
from pyspark.ml.feature import StringIndexer

# Don't overwrite "label", use another name first
indexer = StringIndexer(inputCol="Churn", outputCol="label_index")
df_indexed = indexer.fit(df_transformed).transform(df_transformed)

# If you want "label" specifically, rename later
df_indexed = df_indexed.withColumnRenamed("label_index", "label")


In [None]:
train, test = df_indexed.randomSplit([0.7, 0.3], seed=42)
print("Train Rows:", train.count(), "Test Rows:", test.count())


Train Rows: 5036 Test Rows: 2007


In [None]:
from pyspark.ml.feature import StringIndexer

# Convert Churn into numeric label (0/1)
indexer = StringIndexer(inputCol="Churn", outputCol="label")
df_indexed = indexer.fit(df_indexed).transform(df_indexed)


In [None]:
df_final = df_indexed.select("features", "label")


In [None]:
train, test = df_final.randomSplit([0.7, 0.3], seed=42)


In [None]:
from pyspark.sql.functions import col

# Replace empty strings in TotalCharges with null, then drop rows with nulls
df = df.withColumn("TotalCharges",
                   col("TotalCharges").cast("double"))

# Drop rows with NaN or nulls
df = df.na.drop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Start Spark session
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# --- Load dataset ---
df = spark.read.csv("WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)

# --- Data Cleaning ---
# Convert TotalCharges to double and drop null rows
df = df.withColumn("TotalCharges", col("TotalCharges").cast("double"))
df = df.na.drop()

# --- Index categorical columns ---
categorical_cols = [col for col, dtype in df.dtypes if dtype == "string" and col != "Churn"]

indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in categorical_cols]

for indexer in indexers:
    df = indexer.transform(df)

# --- Index target column (Churn) ---
label_indexer = StringIndexer(inputCol="Churn", outputCol="label").fit(df)
df = label_indexer.transform(df)

# --- Assemble features ---
feature_cols = [c+"_index" for c in categorical_cols] + ["tenure", "MonthlyCharges", "TotalCharges"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# --- Train-test split ---
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

print(f"Train Rows: {train_data.count()} Test Rows: {test_data.count()}")

# --- Train Logistic Regression Model ---
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# --- Predictions ---
predictions = lr_model.transform(test_data)

# --- Evaluate Model ---
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy:.4f}")

# --- Save Model ---
model_path = "/content/model/logistic_regression_churn"
lr_model.save(model_path)
print(f"✅ Model saved at {model_path}")


Train Rows: 5028 Test Rows: 2004
Test Accuracy = 0.8129
✅ Model saved at /content/model/logistic_regression_churn


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegressionModel
import random

# -------------------------
# Step 0: Start Spark session
# -------------------------
spark = SparkSession.builder.appName("LogisticRegressionTest").getOrCreate()

# -------------------------
# Step 1: Load pre-trained Logistic Regression model
# -------------------------
model_path = "/content/model/logistic_regression_churn"  # Update path if needed
model = LogisticRegressionModel.load(model_path)

# Get expected number of features
num_features = model.numFeatures
print(f"Model expects {num_features} features per sample.")

# -------------------------
# Step 2: Create synthetic test data
# -------------------------
# Create 10 samples with correct feature size
data = []
for _ in range(10):
    features = [random.uniform(0, 3) for _ in range(num_features)]
    label = float(random.randint(0, 1))  # Ensure label is float
    data.append(Row(features=Vectors.dense(features), label=label))

test_df = spark.createDataFrame(data)

# -------------------------
# Step 3: Run predictions
# -------------------------
predictions = model.transform(test_df)

# -------------------------
# Step 4: Show results
# -------------------------
predictions.select("features", "label", "probability", "prediction").show(truncate=False)

# -------------------------
# Step 5: Stop Spark session
# -------------------------
spark.stop()


Model expects 19 features per sample.
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----------------------------------------+----------+
|features                                                                                                                                                                                                                                                                                                                                                                |label|probability                              |prediction|
+---------------------------------------------------------------------------------------------------------------------