In [1]:
# Installing PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=252e9f4f028246e54dc155afc9d0babd6c6699fb2c5de4539bd9d64b711bd687
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [9]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

In [3]:
# Creating a SparkSession
spark = SparkSession.builder.appName("TelecomChurnPrediction").getOrCreate()
# Loading  the dataset
dataset = spark.read.csv("/content/telecom_dataset (1).csv", header=True, inferSchema=True)

# Data Preprocessing
dataset = dataset.dropna()  # Drop rows with missing values

In [10]:
# Calculating call duration (assuming call_start_time and call_end_time columns are present)
dataset = dataset.withColumn("call_duration", (col("TotalCharges") - col("MonthlyCharges")) / 60)

# Calculating average monthly spend
dataset = dataset.withColumn("average_monthly_spend", col("MonthlyCharges"))

In [11]:
# Encodinging categorical variables
categorical_cols = ['Contract', 'Churn','Gender']
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(dataset) for col in categorical_cols]
pipeline = Pipeline(stages=indexers)
dataset = pipeline.fit(dataset).transform(dataset)
# Feature scaling
assembler = VectorAssembler(inputCols=['Age', 'average_monthly_spend', 'call_duration', 'Gender_index'], outputCol='features')
dataset = assembler.transform(dataset)

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(dataset)
dataset = scaler_model.transform(dataset)

In [12]:
# Splitting the data into training and testing sets
(train_data, test_data) = dataset.randomSplit([0.8, 0.2], seed=42)
# Model training and evaluation
lr = LogisticRegression(labelCol='Churn_index', featuresCol='scaled_features')

# Model selection and training
classifiers = [
    LogisticRegression(labelCol='Churn_index', featuresCol='scaled_features'),
    RandomForestClassifier(labelCol='Churn_index', featuresCol='scaled_features'),
    GBTClassifier(labelCol='Churn_index', featuresCol='scaled_features'),
    LinearSVC(labelCol='Churn_index', featuresCol='scaled_features')
]

In [13]:
# Defining the parameter grid for each classifier
paramGrids = [
    ParamGridBuilder()
        .addGrid(LogisticRegression.regParam, [0.1, 0.01])
        .addGrid(LogisticRegression.elasticNetParam, [0.0, 0.5, 1.0])
        .build(),
    ParamGridBuilder()
        .addGrid(RandomForestClassifier.numTrees, [10, 20, 30])
        .addGrid(RandomForestClassifier.featureSubsetStrategy, ['auto', 'sqrt'])
        .build(),
    ParamGridBuilder()
        .addGrid(GBTClassifier.maxDepth, [5, 10])
        .addGrid(GBTClassifier.maxIter, [20, 30])
        .build(),
    ParamGridBuilder()
        .addGrid(LinearSVC.maxIter, [10, 20])
        .addGrid(LinearSVC.regParam, [0.1, 0.01])
        .build()
]

In [14]:
evaluator = BinaryClassificationEvaluator(labelCol='Churn_index')

best_model = None
best_accuracy = 0.0

# Iterating over classifiers and parameter grids
for classifier, paramGrid in zip(classifiers, paramGrids):
    pipeline = Pipeline(stages=[classifier])
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=5)
    cv_model = crossval.fit(train_data)

In [20]:
# Model evaluation on test data
predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy for {classifier.__class__.__name__}: {accuracy}")

if accuracy > best_accuracy:
        best_model = cv_model.bestModel
        best_accuracy = accuracy




Accuracy for LinearSVC: 0.2285714285714286


In [21]:
# Getting the best model and its parameters
print("Best Model:")
print(best_model.stages[0])

# Use the best model for predictions
best_predictions = best_model.transform(test_data)

# Performing evaluation on the best model
best_accuracy = evaluator.evaluate(best_predictions)
print("Best Model in Terms of Accuracy:", best_accuracy)


Best Model:
LinearSVCModel: uid=LinearSVC_bd86d0035d18, numClasses=2, numFeatures=4
Best Model in Terms of Accuracy: 0.2285714285714286


In [18]:
# Creating a MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Churn_index', predictionCol='prediction')

# Evaluating the best model on test data
accuracy = evaluator.evaluate(best_predictions, {evaluator.metricName: 'accuracy'})
precision = evaluator.evaluate(best_predictions, {evaluator.metricName: 'weightedPrecision'})
recall = evaluator.evaluate(best_predictions, {evaluator.metricName: 'weightedRecall'})
f1_score = evaluator.evaluate(best_predictions, {evaluator.metricName: 'f1'})

In [19]:
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")

Accuracy: 0.4
Precision: 0.16
Recall: 0.4
F1-Score: 0.2285714285714286
