In [0]:

import sklearn as sk
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch as pt
import missingno as msno
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from pyspark.sql.functions import when, col, lit
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from sklearn import metrics
from sklearn.metrics import roc_curve
from sklearn.metrics import recall_score, confusion_matrix, precision_score, f1_score, accuracy_score, classification_report

spark = SparkSession.builder \
    .appName("OHE Example") \
    .getOrCreate()


In [0]:
DATA_PATH = "/Volumes/workspace/bronze/bronzevolume/data/" # set via env or Path for prod
try:
    df = spark.read.format("delta").load(DATA_PATH)
except Exception as e:
    raise RuntimeError(f"Failed loading delta at {DATA_PATH}: {e}")


print('Rows:', df.count())
df.printSchema()
# quick top rows
display(df.limit(5))

In [0]:
# Your categorical columns
catog = [
    "multiplelines","internetservice","onlinesecurity","onlinebackup",
    "deviceprotection","techsupport","streamingtv","streamingmovies",
    "contract","paymentmethod"
]

# Filter existing columns
df_cols_lower = {c.lower(): c for c in df.columns}
existing_catog = [df_cols_lower[c] for c in catog if c in df_cols_lower]


df_encoded = df
for cat_col in existing_catog:
    print(f"Processing column: {cat_col}")
    
    # Get unique values for this column
    unique_vals = [row[0] for row in df.select(cat_col).distinct().collect() if row[0] is not None]
    print(f"Unique values in {cat_col}: {unique_vals}")
    
    # Create dummy variables for each unique value
    for val in unique_vals:
        # Clean the value name for column naming
        clean_val = str(val).replace(" ", "_").replace("-", "_").replace("(", "").replace(")", "")
        dummy_col = f"{cat_col}_{clean_val}"
        
        df_encoded = df_encoded.withColumn(
            dummy_col,
            when(col(cat_col) == val, 1).otherwise(0).cast(IntegerType())
        )
     

In [0]:
display(df_encoded)

In [0]:
df_encoded =df_encoded.select([col(c).alias(c.lower()) for c in df_encoded.columns])


In [0]:
cols = ["gender","seniorcitizen","partner","dependents","phoneservice","paperlessbilling"]

renames = ["is_female","is_senior","has_partner","has_dependent","is_phoneservice","is_paperlessbilling"]

for i in range(len(cols)):
    df_encoded = df_encoded.withColumnRenamed(cols[i],renames[i])


In [0]:
display(df_encoded)

In [0]:


lis = ["is_paperlessbilling","has_partner","has_dependent","is_phoneservice","churn"]
for cl in lis:
    df_encoded = df_encoded.withColumn(
        f"{cl}",
        when(col(f"{cl}") == "Yes", "1")
        .when(col(f"{cl}") == "No", "0")
        .otherwise(None)
    )

df_encoded = df_encoded.withColumn(
    "is_female",
    when(col("is_female") == "Female", "1")
    .when(col("is_female") == "Male", "0")
    .otherwise(None)
)


In [0]:
display(df_encoded.columns)

In [0]:
display(df_encoded.columns)

In [0]:
subset=[ 'is_female', 'is_senior', 'has_partner', 'has_dependent', 'tenure', 'is_phoneservice', 'is_paperlessbilling',  'churn',  'multiplelines_No_phone_service', 'multiplelines_Yes', 'multiplelines_No', 'internetservice_DSL', 'internetservice_No', 'internetservice_Fiber_optic', 'onlinesecurity_No_internet_service', 'onlinesecurity_Yes', 'onlinesecurity_No', 'onlinebackup_No_internet_service', 'onlinebackup_Yes', 'onlinebackup_No', 'deviceprotection_No_internet_service', 'deviceprotection_Yes', 'deviceprotection_No', 'techsupport_No_internet_service', 'techsupport_Yes', 'techsupport_No', 'streamingtv_No_internet_service', 'streamingtv_Yes', 'streamingtv_No', 'streamingmovies_No_internet_service', 'streamingmovies_Yes', 'streamingmovies_No', 'contract_One_year', 'contract_Month_to_month', 'contract_Two_year', 'paymentmethod_Electronic_check', 'paymentmethod_Mailed_check', 'paymentmethod_Credit_card_automatic', 'paymentmethod_Bank_transfer_automatic',"monthlycharges","totalcharges"]


subset2=[ 'is_female', 'is_senior', 'has_partner', 'has_dependent', 'tenure', 'is_phoneservice', 'is_paperlessbilling',  'churn',  'multiplelines_No_phone_service', 'multiplelines_Yes', 'multiplelines_No', 'internetservice_DSL', 'internetservice_No', 'internetservice_Fiber_optic', 'onlinesecurity_No_internet_service', 'onlinesecurity_Yes', 'onlinesecurity_No', 'onlinebackup_No_internet_service', 'onlinebackup_Yes', 'onlinebackup_No', 'deviceprotection_No_internet_service', 'deviceprotection_Yes', 'deviceprotection_No', 'techsupport_No_internet_service', 'techsupport_Yes', 'techsupport_No', 'streamingtv_No_internet_service', 'streamingtv_Yes', 'streamingtv_No', 'streamingmovies_No_internet_service', 'streamingmovies_Yes', 'streamingmovies_No', 'contract_One_year', 'contract_Month_to_month', 'contract_Two_year', 'paymentmethod_Electronic_check', 'paymentmethod_Mailed_check', 'paymentmethod_Credit_card_automatic', 'paymentmethod_Bank_transfer_automatic']

df_encoded = df_encoded.select(subset)
for c in subset2:
    df_encoded = df_encoded.withColumn(c, col(c).cast("int"))

display(df_encoded)
     

In [0]:
df_encoded =df_encoded.select([col(c).alias(c.lower()) for c in df_encoded.columns])


In [0]:
display(df_encoded.filter(df_encoded['totalcharges'] == " "))

In [0]:
lst2 = ["monthlycharges","totalcharges"]


for c in lst2:
    df_encoded = df_encoded.withColumn(
        c,
        when(col(c) == ' ', None).otherwise(col(c)).cast("double")
    )

In [0]:

from pyspark.sql.functions import when, col

df_encoded = df_encoded.withColumn(
    "monthlychargesbucketed",
    when(col("MonthlyCharges") < 10, 1)
    .when((col("MonthlyCharges") >= 10) & (col("MonthlyCharges") < 20), 2)
    .when((col("MonthlyCharges") >= 20) & (col("MonthlyCharges") < 30), 3)
    .when((col("MonthlyCharges") >= 30) & (col("MonthlyCharges") < 40), 4)
    .when((col("MonthlyCharges") >= 40) & (col("MonthlyCharges") < 50), 5)
    .when((col("MonthlyCharges") >= 50) & (col("MonthlyCharges") < 60), 6)
    .when((col("MonthlyCharges") >= 60) & (col("MonthlyCharges") < 70), 7)
    .when((col("MonthlyCharges") >= 70) & (col("MonthlyCharges") < 80), 8)
    .when((col("MonthlyCharges") >= 80) & (col("MonthlyCharges") < 90), 9)
    .when((col("MonthlyCharges") >= 90) & (col("MonthlyCharges") < 100), 10)
    .when((col("MonthlyCharges") >= 100) & (col("MonthlyCharges") < 110), 11)
    .otherwise(12)
)


In [0]:
df_encoded = df_encoded.withColumn(
    "totalchargesbucketed",
    when(col("TotalCharges") < 100, 1)
    .when((col("TotalCharges") >= 100) & (col("TotalCharges") < 300), 2)
    .when((col("TotalCharges") >= 300) & (col("TotalCharges") < 500), 3)
    .when((col("TotalCharges") >= 500) & (col("TotalCharges") < 1000), 4)
    .when((col("TotalCharges") >= 1000) & (col("TotalCharges") < 1500), 5)
    .when((col("TotalCharges") >= 1500) & (col("TotalCharges") < 2000), 6)
    .when((col("TotalCharges") >= 2000) & (col("TotalCharges") < 2500), 7)
    .when((col("TotalCharges") >= 2500) & (col("TotalCharges") < 3000), 8)
    .when((col("TotalCharges") >= 3000) & (col("TotalCharges") < 3500), 9)
    .when((col("TotalCharges") >= 3500) & (col("TotalCharges") < 4000), 10)
    .when((col("TotalCharges") >= 4000) & (col("TotalCharges") < 4500), 11)
    .when((col("TotalCharges") >= 4500) & (col("TotalCharges") < 5000), 12)
    .when((col("TotalCharges") >= 5000) & (col("TotalCharges") < 5500), 13)
    .when((col("TotalCharges") >= 5500) & (col("TotalCharges") < 6000), 14)
    .when((col("TotalCharges") >= 6000) & (col("TotalCharges") < 6500), 15)
    .when((col("TotalCharges") >= 6500) & (col("TotalCharges") < 7000), 16)
    .when((col("TotalCharges") >= 7000) & (col("TotalCharges") < 7500), 17)
    .when((col("TotalCharges") >= 7500) & (col("TotalCharges") < 8000), 18)
    .when((col("TotalCharges") >= 8000) & (col("TotalCharges") < 8500), 19)
    .otherwise(20)   
)

In [0]:
lst = ["customerid","_rescued_data"]
df_encoded = df_encoded.drop(*lst)

In [0]:
display(df_encoded)

In [0]:
# Convert Spark DataFrame -> Pandas
df_encoded =df_encoded.fillna(0)
pdf = df_encoded.toPandas()

In [0]:

# Define features and target
X = pdf.drop("churn", axis=1)
y = pdf["churn"]

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


# Train Logistic Regression
regr = LogisticRegression(max_iter=2500)  # max_iter is often needed for convergence

regr.fit(X_train, y_train)

# Predictions
y_pred = regr.predict(X_test)


In [0]:
feat_importances = pd.Series(regr.coef_[0],index=X.columns)
plt.figure(figsize=(10, 12))  # (width, height) in inches
feat_importances.nlargest(40).plot(kind='barh')
plt.show()

In [0]:
tn,fp,fn,tp = confusion_matrix(y_test,y_pred).ravel()
print(f"TN: {tn}, FP: {fp} \nFN: {fn}, TP: {tp}")

In [0]:
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")

f1 = f1_score(y_test,y_pred)
print(f"F1 Score: {f1}")

print(classification_report(y_test,y_pred))