In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import  classification_report, accuracy_score, confusion_matrix, roc_curve, auc, precision_recall_curve, average_precision_score, fbeta_score, roc_auc_score
from xgboost import XGBClassifier
from imblearn.over_sampling import SMOTE
import joblib

In [None]:
try:
    data = pd.read_csv('../data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv')
    churn_data = pd.DataFrame(data)
    df = churn_data.copy()
    df.drop("customerID", axis=1, inplace=True)
    print("Data set loaded.")
except FileNotFoundError:
    print("No data set found")

In [None]:
df.head()

In [None]:
columns_name = df.columns
print(columns_name)

In [None]:
df.describe()

In [None]:
df.info()

In [None]:
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce')

In [None]:
df.isnull().sum()

In [None]:
df[np.isnan(df['TotalCharges'])]

In [None]:
df["SeniorCitizen"] = df["SeniorCitizen"].map({0:"No", 1:"Yes"})

In [None]:
# Graph of the number of people who canceled their subscription and those who did not
# those who cancel are indicated with yes.
plt.figure(figsize=(6, 4))
sns.countplot(x="Churn", data=df)
plt.title("Churn Class Distribution")
plt.savefig('../results/plots/Churn_class_distribution.png')
plt.show()

In [None]:
target = "Churn"
categorical_cols, numerical_cols = [], []
for col in df.columns:
    if col == target:
        continue
    if df[col].dtype == "object":
        categorical_cols.append(col)
    else:
        numerical_cols.append(col)

print("Categorical Columns:", categorical_cols)
print("Numerical Columns:", numerical_cols)

for col in categorical_cols:
    plt.figure(figsize=(6, 3))
    sns.countplot(x=col, hue=target, data=df)
    
    # Create and sanitize title
    title = f"Distribution of {col} by Churn"
    plt.title(title)
    
    # Get sanitized filename from title
    safe_title = (title.lower()
                  .replace(" ", "_")
                  .replace("-", "")
                  .replace("/", "")
                  .replace("(", "")
                  .replace(")", ""))
    
    plt.xticks(rotation=45)
    plt.tight_layout()  # Prevent label cutoff
    
    # Save with sanitized filename
    plt.savefig(f'../results/plots/{safe_title}.png', 
                dpi=300, 
                bbox_inches='tight')
    
    plt.show()

In [None]:
for col in categorical_cols:
    col_churn = df.groupby(col)['Churn'].value_counts(normalize=True)
    print(col_churn)

In [None]:
plt.figure(figsize=(8, 4))
sns.scatterplot(data=df, x="tenure", y="MonthlyCharges", hue=target, alpha=0.6)
plt.title("Tenure and MonthlyCharges with Churn")
plt.tight_layout()
plt.savefig('../results/plots/Tenure_and_monthlyCharges_scatterplot.png', bbox_inches='tight')
plt.show()

In [None]:
df['MonthlyChargesGroup'] = pd.cut(df['MonthlyCharges'], bins=5)
# Churn rate pivot table
churn_rate = df.pivot_table(values='Churn', 
                            index='PaymentMethod', 
                            columns='MonthlyChargesGroup', 
                            observed=False,
                            aggfunc=lambda x: (x == 'Yes').mean())

# Costumer number pivot  table
customer_count = df.pivot_table(values='Churn', 
                                index='PaymentMethod', 
                                columns='MonthlyChargesGroup', 
                                observed=False,
                                aggfunc='count')

# Rate and number of customer are combined
churn_rate_rounded = churn_rate.round(2) 
combined_data = churn_rate_rounded.astype(str) + "\n(" + customer_count.astype(int).astype(str) + ")"

# Heatmap table
plt.figure(figsize=(12, 6))
sns.heatmap(churn_rate, annot=combined_data,  fmt="", cmap='coolwarm', cbar_kws={'label': 'Churn Rate'})
plt.title('Average Churn Rate and Number of Customers with MonthlyCharges and PaymentMethod')
plt.xlabel('MonthlyCharges Group')
plt.ylabel('PaymentMethod')
plt.savefig('../results/plots/Churn_rate_heatmap.png', bbox_inches='tight')
plt.show()

In [None]:
df = df.drop(columns=['MonthlyChargesGroup'])

In [None]:
for col in numerical_cols:
    plt.figure(figsize=(8, 4))
    sns.boxplot(x=target, y=col, data=df)
    title = f"Distribution of {col} by Churn"  
    plt.title(title)
    safe_title = (title.lower()
                  .replace(" ", "_")
                  .replace("-", "")
                  .replace("/", "")
                  .replace("(", "")
                  .replace(")", ""))
    
    plt.xticks(rotation=45)
    plt.tight_layout()  # Prevent label cutoff
    
    # Save with sanitized filename
    plt.savefig(f'../results/plots/{safe_title}.png', 
                dpi=300, 
                bbox_inches='tight')
    plt.show()

In [None]:
for col in numerical_cols:
    plt.figure(figsize=(8, 4))
    sns.histplot(df[col], kde=True, bins=30)
    plt.title(col)
    plt.show()
    skewness = df[col].skew()
    print(f"(Skewness): {skewness}")

In [None]:
df['Churn'] = df['Churn'].map({'Yes': 1, 'No': 0})

In [None]:
df.head()

In [None]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from imblearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from xgboost import XGBClassifier

# Assuming df, numerical_cols, and categorical_cols are defined

X = df.drop(columns=['Churn'])
y = df['Churn']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    random_state=42, 
    stratify=y
)

class HighRiskFeatureGenerator(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.train_80th_ = None
    
    def fit(self, X, y=None):
        self.train_80th_ = X['MonthlyCharges'].quantile(0.8)
        return self
    
    def transform(self, X):
        X = X.copy()
        X['HighRiskCustomers'] = (
            (X['PaymentMethod'] == 'Electronic check') & 
            (X['MonthlyCharges'] > self.train_80th_)
        ).astype(int)
        return X

class DataFrameImputer(BaseEstimator, TransformerMixin):
    def __init__(self, numerical_cols, strategy='median'):
        self.numerical_cols = numerical_cols
        self.strategy = strategy
        self.imputer = SimpleImputer(strategy=strategy)
    
    def fit(self, X, y=None):
        self.imputer.fit(X[self.numerical_cols])
        return self
    
    def transform(self, X):
        X = X.copy()
        X[self.numerical_cols] = self.imputer.transform(X[self.numerical_cols])
        return X

pipeline = Pipeline([
    ('imputer', DataFrameImputer(numerical_cols, strategy='median')),
    ('feature_engineer', HighRiskFeatureGenerator()),
    ('preprocessor', ColumnTransformer([
        ('num', StandardScaler(), numerical_cols),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
    ], remainder='passthrough')),
    ('smote', SMOTE(random_state=42)),
    ('classifier', XGBClassifier())
])


In [None]:
# Initialize and train model
pipeline.fit(X_train, y_train)

In [None]:
# Generate predictions
y_pred_xgb = pipeline.predict(X_test)
y_prob_xgb = pipeline.predict_proba(X_test)[:, 1]  # Probabilities for positive class

report = classification_report(y_test, y_pred_xgb)
auc_score = roc_auc_score(y_test, y_prob_xgb)

# Print classification metrics
print(f"\nClassification Report (AUC-ROC = {auc_score:.3f}):\n{report}")
print(f"Accuracy: {accuracy_score(y_test, y_pred_xgb):.3f}")

# ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_prob_xgb)
roc_auc = auc(fpr, tpr)

# Confusion Matrix
cm = confusion_matrix(y_test, y_pred_xgb)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.show()

fpr, tpr, thresholds = roc_curve(y_test, y_prob_xgb)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'AUC = {auc_score:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.tight_layout()
plt.show()

# Precision-Recall Curve
precision, recall, _ = precision_recall_curve(y_test, y_prob_xgb)
avg_precision = average_precision_score(y_test, y_prob_xgb)

plt.figure()
plt.plot(recall, precision, color='blue', lw=2, 
         label=f'Precision-Recall Curve (AP = {avg_precision:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="lower left")
plt.tight_layout()
plt.show()

joblib.dump({
    'pipeline': pipeline,
    'feature_names': X_train.columns.tolist(),
    'metrics': {'auc': auc_score, 'accuracy': accuracy_score(y_test, y_pred_xgb)}
}, 'churn_model_metadata.pkl')

In [None]:
from sklearn.base import clone
from sklearn.model_selection import StratifiedShuffleSplit

new_pipeline = clone(pipeline)

scale_pos_weight_resampled = 1.0

cv = StratifiedShuffleSplit(
    n_splits=1,           
    test_size=0.2,        
    random_state=42       
)

# Define grid
param_grid = {
    'classifier__n_estimators': [100, 200],
    'classifier__max_depth': [3, 5],
    'classifier__learning_rate': [0.05, 0.1],
    'classifier__subsample': [0.8, 1.0],
    'classifier__colsample_bytree': [0.8, 1.0],
    'classifier__gamma': [0, 0.2],
    'classifier__reg_alpha': [0, 0.5],
    'classifier__scale_pos_weight': [1]  # SMOTE balances classes
}

# Initialize GridSearchCV
grid_search = GridSearchCV(
    estimator=new_pipeline,
    param_grid=param_grid,
    scoring='roc_auc',
    cv=cv,
    verbose=2,
    n_jobs=-1
)

# Execute grid search
grid_search.fit(X_train, y_train)

In [None]:
best_grid_model = grid_search.best_estimator_

classifier = best_grid_model.named_steps['classifier']
preprocessor = best_grid_model.named_steps['preprocessor']
feature_names = preprocessor.get_feature_names_out()

y_pred_grid = best_grid_model.predict(X_test)
y_prob_grid = best_grid_model.predict_proba(X_test)[:, 1]

plt.figure(figsize=(8, 6))
baseline = y_test.mean()

# Include both models for comparison
for model, color, label in zip([pipeline, best_grid_model],  # Add original pipeline
                             ['blue', 'red'],                # Two colors
                             ['Original', 'Grid Tuned']):    # Two labels
    if hasattr(model, 'predict_proba'):
        y_prob = model.predict_proba(X_test)[:, 1]
    else:
        y_prob = model.decision_function(X_test)
        
    precision, recall, _ = precision_recall_curve(y_test, y_prob)
    average_precision = average_precision_score(y_test, y_prob)
    
    plt.plot(recall, precision, color=color, lw=2,
             label=f'{label} (AP = {average_precision:.2f})')

plt.hlines(y=baseline, xmin=0, xmax=1, 
           colors='k', linestyles='--', 
           label=f'Baseline (AP = {baseline:.2f})')

plt.title('Precision-Recall Curve Comparison')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.legend(loc='best')
plt.grid(True)
plt.xlim([0.0, 1.05])
plt.ylim([0.0, 1.05])
plt.tight_layout()
plt.show()

In [None]:
report = classification_report(y_test, y_pred_grid)
auc_score = roc_auc_score(y_test, y_prob_grid)
# Print classification metrics
print(f"\nClassification Report (AUC-ROC = {auc_score:.3f}):\n{report}")
print(f"Accuracy: {accuracy_score(y_test, y_pred_grid):.3f}")

plt.figure(figsize=(8, 6))
for model, color, label in zip([pipeline, best_grid_model], 
                              ['blue', 'red'], 
                              ['Original', 'Grid Tuned']):
    fpr, tpr, _ = roc_curve(y_test, model.predict_proba(X_test)[:, 1])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, color=color, lw=2, label=f'{label} (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.title('ROC Curve Comparison')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.show()

In [None]:
# Separate Confusion Matrix
plt.figure(figsize=(6, 5))
sns.heatmap(confusion_matrix(y_test, y_pred_grid), 
            annot=True, fmt='d', cmap='Blues')
plt.title('Grid Tuned Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()


In [None]:
plt.figure(figsize=(10, 8))
importances = classifier.feature_importances_
sorted_idx = importances.argsort()

# Plot top 15 features
top_n = 15
plt.barh(
    feature_names[sorted_idx][-top_n:],
    importances[sorted_idx][-top_n:],
    color='skyblue'
)
plt.title(f'Top {top_n} Feature Importance')
plt.xlabel('Importance Score')
plt.tight_layout()
plt.show()

In [None]:
# Save model
joblib.dump(best_grid_model, 'xgboost_grid_tuned.pkl')