In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import UserDefinedFunction, col, lit
import pyspark.sql.functions as F
from typing import Dict, Any, List
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName('telco_data_processing').getOrCreate()

22/04/26 21:31:44 WARN Utils: Your hostname, nbb-173-74a resolves to a loopback address: 127.0.1.1; using 192.168.1.186 instead (on interface wlp0s20f3)
22/04/26 21:31:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 21:31:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('../data/raw/dataset.csv', header=True)

In [4]:
df.take(1)

[Row(customerID='7590-VHVEG', gender='Female', SeniorCitizen='0', Partner='Yes', Dependents='No', tenure='1', PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges='29.85', TotalCharges='29.85', Churn='No')]

Unique values for every column

In [5]:
for column in df.columns:
    if column in ['tenure', 'MonthlyCharges', 'TotalCharges', 'customerID']:
        continue
    print(column, ":")
    print(df.select(column).distinct().collect())

gender :
[Row(gender='Female'), Row(gender='Male')]
SeniorCitizen :
[Row(SeniorCitizen='0'), Row(SeniorCitizen='1')]
Partner :
[Row(Partner='No'), Row(Partner='Yes')]
Dependents :
[Row(Dependents='No'), Row(Dependents='Yes')]
PhoneService :
[Row(PhoneService='No'), Row(PhoneService='Yes')]
MultipleLines :
[Row(MultipleLines='No phone service'), Row(MultipleLines='No'), Row(MultipleLines='Yes')]
InternetService :
[Row(InternetService='Fiber optic'), Row(InternetService='No'), Row(InternetService='DSL')]
OnlineSecurity :
[Row(OnlineSecurity='No'), Row(OnlineSecurity='Yes'), Row(OnlineSecurity='No internet service')]
OnlineBackup :
[Row(OnlineBackup='No'), Row(OnlineBackup='Yes'), Row(OnlineBackup='No internet service')]
DeviceProtection :
[Row(DeviceProtection='No'), Row(DeviceProtection='Yes'), Row(DeviceProtection='No internet service')]
TechSupport :
[Row(TechSupport='No'), Row(TechSupport='Yes'), Row(TechSupport='No internet service')]
StreamingTV :
[Row(StreamingTV='No'), Row(Stream

In [6]:
def make_dummy(df: DataFrame, column: str) -> DataFrame:
    data = df
    options = data.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    options_expr = [F.when(F.col(column) == opt, 1).otherwise(0).alias(f"{column}_" + opt) for opt in options]
    data = data.select(*data.columns, *options_expr)
    data = data.drop(column)
    return data

In [7]:
def make_dummies(df: DataFrame, columns: List[str]) -> DataFrame:
    data = df
    for column in columns:
        data = make_dummy(data, column)
    return data

In [9]:
list(set(df.columns)
     - set(['tenure', 'MonthlyCharges', 'TotalCharges',
            'customerID', 'Churn']))

['PhoneService',
 'StreamingTV',
 'gender',
 'MultipleLines',
 'SeniorCitizen',
 'Contract',
 'Partner',
 'DeviceProtection',
 'OnlineSecurity',
 'StreamingMovies',
 'PaperlessBilling',
 'Dependents',
 'PaymentMethod',
 'OnlineBackup',
 'TechSupport',
 'InternetService']

In [8]:
df_dummed = make_dummies(df,list(set(df.columns) - set(['tenure', 'MonthlyCharges', 'TotalCharges', 'customerID', 'Churn']))
)

In [9]:
df_dummed.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- SeniorCitizen_0: integer (nullable = false)
 |-- SeniorCitizen_1: integer (nullable = false)
 |-- StreamingTV_No: integer (nullable = false)
 |-- StreamingTV_Yes: integer (nullable = false)
 |-- StreamingTV_No internet service: integer (nullable = false)
 |-- OnlineBackup_No: integer (nullable = false)
 |-- OnlineBackup_Yes: integer (nullable = false)
 |-- OnlineBackup_No internet service: integer (nullable = false)
 |-- PaymentMethod_Credit card (automatic): integer (nullable = false)
 |-- PaymentMethod_Mailed check: integer (nullable = false)
 |-- PaymentMethod_Bank transfer (automatic): integer (nullable = false)
 |-- PaymentMethod_Electronic check: integer (nullable = false)
 |-- Dependents_No: integer (nullable = false)
 |-- Dependents_Yes: integer (nullable = false)


In [10]:
df_dummed.take(1)

22/04/26 14:42:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(customerID='7590-VHVEG', tenure='1', MonthlyCharges='29.85', TotalCharges='29.85', Churn='No', SeniorCitizen_0=1, SeniorCitizen_1=0, StreamingTV_No=1, StreamingTV_Yes=0, StreamingTV_No internet service=0, OnlineBackup_No=0, OnlineBackup_Yes=1, OnlineBackup_No internet service=0, PaymentMethod_Credit card (automatic)=0, PaymentMethod_Mailed check=0, PaymentMethod_Bank transfer (automatic)=0, PaymentMethod_Electronic check=1, Dependents_No=1, Dependents_Yes=0, Partner_No=0, Partner_Yes=1, InternetService_Fiber optic=0, InternetService_No=0, InternetService_DSL=1, PaperlessBilling_No=0, PaperlessBilling_Yes=1, MultipleLines_No phone service=1, MultipleLines_No=0, MultipleLines_Yes=0, gender_Female=1, gender_Male=0, StreamingMovies_No=1, StreamingMovies_Yes=0, StreamingMovies_No internet service=0, DeviceProtection_No=1, DeviceProtection_Yes=0, DeviceProtection_No internet service=0, TechSupport_No=1, TechSupport_Yes=0, TechSupport_No internet service=0, OnlineSecurity_No=1, OnlineSec

In [11]:
df_dummed = df_dummed.drop('customerID')

In [16]:
df_dummed.printSchema()

root
 |-- tenure: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)
 |-- SeniorCitizen_0: integer (nullable = false)
 |-- SeniorCitizen_1: integer (nullable = false)
 |-- StreamingTV_No: integer (nullable = false)
 |-- StreamingTV_Yes: integer (nullable = false)
 |-- StreamingTV_No internet service: integer (nullable = false)
 |-- OnlineBackup_No: integer (nullable = false)
 |-- OnlineBackup_Yes: integer (nullable = false)
 |-- OnlineBackup_No internet service: integer (nullable = false)
 |-- PaymentMethod_Credit card (automatic): integer (nullable = false)
 |-- PaymentMethod_Mailed check: integer (nullable = false)
 |-- PaymentMethod_Bank transfer (automatic): integer (nullable = false)
 |-- PaymentMethod_Electronic check: integer (nullable = false)
 |-- Dependents_No: integer (nullable = false)
 |-- Dependents_Yes: integer (nullable = false)
 |-- Partner_No: integer (nullable = fals

In [17]:
df_dummed = df_dummed.withColumn('tenure', df_dummed.tenure.cast('int'))

In [18]:
df_dummed = df_dummed.withColumn('MonthlyCharges', df_dummed.MonthlyCharges.cast('double'))

In [19]:
df_dummed = df_dummed.withColumn('TotalCharges', df_dummed.TotalCharges.cast('double'))

In [20]:
yn_mapping = {'Yes': 1, 'No': 0}
map_fn = UserDefinedFunction(lambda x: yn_mapping[x])
df_dummed = df_dummed.withColumn('Churn', map_fn('Churn'))

In [60]:
def map_column_values(df: DataFrame, column: str, mapping: Dict) -> DataFrame:
    map_fn = UserDefinedFunction(lambda x: mapping[x])
    return df.withColumn(column, map_fn(column))

In [64]:
def change_column_type(df: DataFrame, column: str, new_type: str) -> DataFrame:
    return df.withColumn(column, df[column].cast(new_type))

In [63]:
df_dummed = df_dummed.withColumn('Churn', df_dummed['Churn'].cast('int'))

In [62]:
df_dummed.printSchema()

root
 |-- tenure: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: float (nullable = true)
 |-- SeniorCitizen_0: integer (nullable = false)
 |-- SeniorCitizen_1: integer (nullable = false)
 |-- StreamingTV_No: integer (nullable = false)
 |-- StreamingTV_Yes: integer (nullable = false)
 |-- StreamingTV_No internet service: integer (nullable = false)
 |-- OnlineBackup_No: integer (nullable = false)
 |-- OnlineBackup_Yes: integer (nullable = false)
 |-- OnlineBackup_No internet service: integer (nullable = false)
 |-- PaymentMethod_Credit card (automatic): integer (nullable = false)
 |-- PaymentMethod_Mailed check: integer (nullable = false)
 |-- PaymentMethod_Bank transfer (automatic): integer (nullable = false)
 |-- PaymentMethod_Electronic check: integer (nullable = false)
 |-- Dependents_No: integer (nullable = false)
 |-- Dependents_Yes: integer (nullable = false)
 |-- Partner_No: integer (nullable = false

In [27]:
df_dummed = df_dummed.replace('?', None).dropna(how='any')

In [28]:
df_dummed.count()

7032

In [29]:
df_dummed.columns

['tenure',
 'MonthlyCharges',
 'TotalCharges',
 'Churn',
 'SeniorCitizen_0',
 'SeniorCitizen_1',
 'StreamingTV_No',
 'StreamingTV_Yes',
 'StreamingTV_No internet service',
 'OnlineBackup_No',
 'OnlineBackup_Yes',
 'OnlineBackup_No internet service',
 'PaymentMethod_Credit card (automatic)',
 'PaymentMethod_Mailed check',
 'PaymentMethod_Bank transfer (automatic)',
 'PaymentMethod_Electronic check',
 'Dependents_No',
 'Dependents_Yes',
 'Partner_No',
 'Partner_Yes',
 'InternetService_Fiber optic',
 'InternetService_No',
 'InternetService_DSL',
 'PaperlessBilling_No',
 'PaperlessBilling_Yes',
 'MultipleLines_No phone service',
 'MultipleLines_No',
 'MultipleLines_Yes',
 'gender_Female',
 'gender_Male',
 'StreamingMovies_No',
 'StreamingMovies_Yes',
 'StreamingMovies_No internet service',
 'DeviceProtection_No',
 'DeviceProtection_Yes',
 'DeviceProtection_No internet service',
 'TechSupport_No',
 'TechSupport_Yes',
 'TechSupport_No internet service',
 'OnlineSecurity_No',
 'OnlineSecurity

In [30]:
assembler = VectorAssembler(
    inputCols=list(set(df_dummed.columns) - set(['Churn'])),
    outputCol='features',
)

In [31]:
transformed_data = assembler.transform(df_dummed)

In [32]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

In [33]:
transformed_data = scaler.fit(transformed_data).transform(transformed_data)