<h1> CHURN PREDICTION for SaaS using ML



<h2> Project Objective : </h2>

To build and compare several machine learning models (Softmax Regression, SVM, Random Forest) to predict a customer's churn risk score (1-5) and identify the key drivers of churn.

Importing all necessary libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score

<h2> Data loading and initial inspection

In [None]:
train_df = pd.read_csv("train.csv")
train_df.head()

In [None]:
train_df.describe()

In [None]:
train_df.info()

In [None]:
train_df.isnull().sum()

<h2> Exploratory Data Analysis (EDA) & Visualization

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
sns.countplot(x='churn_risk_score', data=train_df)
plt.title('Distribution of Churn Risk Scores')
plt.xlabel('Churn Risk Score')
plt.ylabel('Number of Customers')
plt.show()

In [None]:
# Select only numerical columns from your dataframe
numerical_df = train_df.select_dtypes(include=np.number)

plt.figure(figsize=(14, 12))
sns.heatmap(numerical_df.corr(), annot=True, fmt='.2f', cmap='coolwarm')
plt.title('Correlation Heatmap of Numerical Features')
plt.show()

<h2>Train-test split

In [None]:
from sklearn.model_selection import train_test_split

# Separate features and target
X = train_df.drop(columns=['churn_risk_score'])
y = train_df['churn_risk_score']

# Filter out rows where churn_risk_score is -1
valid_indices = y[y != -1].index
X = X.loc[valid_indices]
y = y[valid_indices]

# Split into train/validation sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

<h2>Data Preprocessing & Feature Engineering

To fill the null values we will add 'None' to categorical columns null values and mean of the present values for numerical columns

In [None]:
# Fill null values in X_train and X_test separately after the split
for col in X_train.columns:
    if X_train[col].isnull().sum() != 0:
        if X_train[col].dtype == 'object':
            # Fill categorical columns with mode
            X_train[col].fillna(X_train[col].mode()[0], inplace=True)
        elif X_train[col].dtype == 'float64' or X_train[col].dtype == 'int64':
            # Fill numerical columns with mean
            X_train[col].fillna(X_train[col].mean(), inplace=True)

for col in X_test.columns:
    if X_test[col].isnull().sum() != 0:
        if X_test[col].dtype == 'object':
            # Fill categorical columns with mode
            X_test[col].fillna(X_test[col].mode()[0], inplace=True)
        elif X_test[col].dtype == 'float64' or X_test[col].dtype == 'int64':
            # Fill numerical columns with mean
            X_test[col].fillna(X_test[col].mean(), inplace=True)

In [None]:
X_train.isnull().sum()

Dropping unnecessary columns -- customer_id, Name, security_no, referral_id

In [None]:
X_train = X_train.drop(labels = ['customer_id','Name','security_no','referral_id'], axis=1)
X_test = X_test.drop(labels = ['customer_id','Name','security_no','referral_id'], axis=1)

In [None]:
from datetime import datetime

def add_tenure(df):
    df = df.copy()
    df['joining_date'] = pd.to_datetime(df['joining_date'])
    df['tenure_days'] = (datetime.now() - df['joining_date']).dt.days
    df.drop(columns=['joining_date'], inplace=True)
    return df


In [None]:
X_train = add_tenure(X_train)
X_test = add_tenure(X_test)

In [None]:
X_train.head()

we have days_since_last_login and tenure_days column so it is safe to drop last_visit_time column as it would not provide any deeper insights for churn prediction in saas business

In [None]:
X_train = X_train.drop(['last_visit_time'],axis = 1)
X_test = X_test.drop(['last_visit_time'],axis = 1)

<h2> converting categorical columns to numerical

In [None]:
for col in X_test.columns:
    if X_train[col].dtype=='object':
        print(col)

one interesting column is feedback we can use NLP to convert this categorical column to numerical

In [None]:
import nltk
nltk.download('stopwords')
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import re

In [None]:
paragraph=[]
for line in X_train['feedback']:
    paragraph.append(line)

In [None]:
wordnet=WordNetLemmatizer()

In [None]:
corpus=[]

for i in range(len(paragraph)):
    review=re.sub('[^a-zA-Z]',' ',paragraph[i])
    review=review.lower()
    review=review.split()
    review=[wordnet.lemmatize(word) for word in review if not word in set(stopwords.words('english'))]
    review=' '.join(review)
    corpus.append(review)

In [None]:
xx = pd.DataFrame(corpus)
xx.columns=['name']
xx.head()

In [None]:
xx['name'].nunique()

In [None]:
feedback=xx['name'].unique()
feedback

In [None]:
for i in range(9):
    xx.replace(feedback[i],i,inplace=True)
xx.head()

In [None]:
df1=pd.DataFrame({'1':xx['name'],'2':X_train['feedback']})
df1.head(15)

In [None]:
X_train['feedback']=xx['name']

In [None]:
X_train.head()

doing the same thing with feedback column of test data frame

In [None]:
paragraph=[]
for line in X_test['feedback']:
    paragraph.append(line)

In [None]:
corpus=[]

for i in range(len(paragraph)):
    review=re.sub('[^a-zA-Z]',' ',paragraph[i])
    review=review.lower()
    review=review.split()
    review=[wordnet.lemmatize(word) for word in review if not word in set(stopwords.words('english'))]
    review=' '.join(review)
    corpus.append(review)


In [None]:
xx=pd.DataFrame({'name':corpus})

In [None]:
for i in range(9):
    xx.replace(feedback[i],i,inplace=True)
xx.head()

In [None]:
df1=pd.DataFrame({'1':xx['name'],'2':X_test['feedback']})
df1.head(15)

In [None]:
X_test['feedback']=xx['name']

In [None]:
X_test.head()

Now we have check if any categorical feature has more than 20 unique values , then we will omit that cause too much variety in data will simply make the dataset more complex to predict correctly.

In [None]:
for col in X_test.columns:
    if X_train[col].dtype=='object':
        if X_train[col].nunique() >20:
            X_train.drop(columns=[col], inplace=True)
            X_test.drop(columns=[col], inplace=True)
        else:
            k=0
            for val in X_train[col].value_counts().index:
                X_train[col].replace(val,k,inplace=True)
                X_test[col].replace(val,k,inplace=True)
                k+=1

In [None]:
X_train.head()

let's remove NaN values in feedback column

In [None]:
# Fill NaN values in feedback column with the mode
X_train['feedback'].fillna(X_train['feedback'].mode()[0], inplace=True)
X_test['feedback'].fillna(X_test['feedback'].mode()[0], inplace=True)

In [None]:
X_train.isnull().sum()

In [None]:
X_test.isnull().sum()

In [None]:
from sklearn.preprocessing import StandardScaler

# Identify numerical columns
numerical_cols = X_train.select_dtypes(include=np.number).columns

# Initialize StandardScaler
scaler = StandardScaler()

# Fit on training data and transform both training and test data
X_train[numerical_cols] = scaler.fit_transform(X_train[numerical_cols])
X_test[numerical_cols] = scaler.transform(X_test[numerical_cols])

<h2>Encoding target variable

In [None]:
y_train.unique()

In [None]:
y_train = y_train - 1

In [None]:
y_train.unique()

In [None]:
y_test.unique()

In [None]:
y_test = y_test - 1

In [None]:
!pip install imblearn

<h2> Modeling

<h3> Softmax Regression implementation

In [None]:
#Scratch implementation of softmax regression

class SoftmaxRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.weights = None
        self.bias = None

    def _softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def _one_hot(self, y, n_classes):
        """Converts a 1D array of labels into a one-hot encoded matrix."""
        one_hot_y = np.zeros((len(y), n_classes))
        one_hot_y[np.arange(len(y)), y] = 1
        return one_hot_y

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.n_classes = len(np.unique(y))

        # Initialize parameters
        self.weights = np.zeros((n_features, self.n_classes))
        self.bias = np.zeros((1, self.n_classes))
        y_one_hot = self._one_hot(y, self.n_classes)

        # Gradient Descent
        for _ in range(self.n_iterations):
            linear_model = np.dot(X, self.weights) + self.bias
            y_predicted_proba = self._softmax(linear_model)

            dw = (1 / n_samples) * np.dot(X.T, (y_predicted_proba - y_one_hot))
            db = (1 / n_samples) * np.sum(y_predicted_proba - y_one_hot, axis=0)

            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def predict_proba(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        return self._softmax(linear_model)

    def predict(self, X):
        probabilities = self.predict_proba(X)
        return np.argmax(probabilities, axis=1)

In [None]:
print("softmax regression from scratch")
model_softmax = SoftmaxRegression(learning_rate=0.1, n_iterations=1000)
model_softmax.fit(X_train, y_train)
y_pred_scratch = model_softmax.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_scratch)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_scratch))

In [None]:
print("--- Training Logistic Regression (Scikit-learn) ---")
# The 'lbfgs' solver is a good default for multinomial problems.
model_sklearn_softmax = LogisticRegression(C = 0.001, class_weight = None, max_iter = 1000, penalty = 'l2', solver = 'saga')


model_sklearn_softmax.fit(X_train, y_train)
y_pred_sklearn = model_sklearn_softmax.predict(X_test)

accuracy = accuracy_score(y_test, y_pred_sklearn)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_sklearn))

In [None]:
#finding the best parameters for softmax regression
param_grid = {
    'C': [0.001, 0.01, 0.1, 1, 10],
    'solver': ['lbfgs', 'saga'],
    'penalty': ['l2'],
    'max_iter': [500, 1000, 2000],
    'class_weight': [None, 'balanced']
}

grid = GridSearchCV(
    LogisticRegression(multi_class='multinomial'),
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid.fit(X_train, y_train)
print("Best params:", grid.best_params_)
print("Best score:", grid.best_score_)


<h3> Random Forest and CART decision tree

In [None]:

import numpy as np
from collections import Counter
from sklearn.metrics import accuracy_score, classification_report

class Node:
    """A helper class representing a single node in the decision tree."""
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value # Value if it's a leaf node

    def is_leaf_node(self):
        return self.value is not None

class DecisionTreeMultiClass:
    """A from-scratch implementation of a CART Decision Tree for multi-class classification."""
    def __init__(self, min_samples_split=2, max_depth=100):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.root = None

    def _gini_impurity(self, y):
        """Calculates the Gini impurity of a set of labels."""
        # np.bincount is efficient for integer-labeled classes
        hist = np.bincount(y)
        ps = hist / len(y)
        return 1 - np.sum([p**2 for p in ps if p > 0])

    def _best_split(self, X, y):
        """Finds the best feature and threshold to split the data by maximizing information gain."""
        n_samples, n_features = X.shape
        if n_samples <= 1:
            return None, None

        parent_gini = self._gini_impurity(y)
        best_gain = -1
        split_idx, split_thresh = None, None

        for feat_idx in range(n_features):
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)
            for thr in thresholds:
                left_idxs = np.where(X_column <= thr)[0]
                right_idxs = np.where(X_column > thr)[0]

                if len(left_idxs) == 0 or len(right_idxs) == 0:
                    continue

                n_l, n_r = len(left_idxs), len(right_idxs)
                gini_l, gini_r = self._gini_impurity(y[left_idxs]), self._gini_impurity(y[right_idxs])
                child_gini = (n_l / n_samples) * gini_l + (n_r / n_samples) * gini_r

                gain = parent_gini - child_gini
                if gain > best_gain:
                    best_gain, split_idx, split_thresh = gain, feat_idx, thr

        return split_idx, split_thresh

    def _grow_tree(self, X, y, depth=0):
        """Recursively builds the decision tree."""
        n_samples = len(y)

        if n_samples == 0:
            return Node(value=None) # Return None if no samples

        n_labels = len(np.unique(y))

        # Check stopping criteria to create a leaf node
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            # FIXED: Extract only the class label (integer) from the Counter result
            leaf_value = Counter(y).most_common(1)[0][0]
            return Node(value=leaf_value)

        best_feat, best_thresh = self._best_split(X, y)

        # If no gain, create a leaf node
        if best_feat is None:
            # FIXED: Extract only the class label (integer) from the Counter result
            leaf_value = Counter(y).most_common(1)[0][0]
            return Node(value=leaf_value)

        # Recursively grow child nodes
        left_idxs = np.where(X[:, best_feat] <= best_thresh)[0]
        right_idxs = np.where(X[:, best_feat] > best_thresh)[0]

        left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)
        return Node(best_feat, best_thresh, left, right)

    def fit(self, X, y):
        """Starts the tree growing process."""
        self.root = self._grow_tree(X, y)

    def _traverse_tree(self, x, node):
        """Traverses the tree to predict a label for a single sample."""
        if node is None or node.is_leaf_node():
            return node.value if node else None

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def predict(self, X):
        """Predicts labels for a set of samples."""
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [None]:
print("--- Training Decision Tree (From Scratch) ---")
model_scratch_tree = DecisionTreeMultiClass(max_depth=10)
model_scratch_tree.fit(X_train.values, y_train.values) # Convert to NumPy arrays

y_pred_scratch = model_scratch_tree.predict(X_test.values) # Convert to NumPy arrays

accuracy = accuracy_score(y_test, y_pred_scratch)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_scratch))

In [None]:
print("--- Training Random Forest (Scikit-learn) ---")
# n_estimators is the number of trees in the forest.
model_sklearn_rf = RandomForestClassifier(n_estimators =  181, max_depth = 28, min_samples_split = 4, min_samples_leaf = 2, max_features = 'sqrt')
model_sklearn_rf.fit(X_train.values, y_train.values) # Convert to NumPy arrays
y_pred_sklearn = model_sklearn_rf.predict(X_test.values) # Convert to NumPy arrays

accuracy = accuracy_score(y_test, y_pred_sklearn)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_sklearn))

In [None]:
!pip install optuna

In [None]:
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.datasets import make_classification


def objective(trial):
    # Define hyperparameter search space
    n_estimators = trial.suggest_int("n_estimators", 50, 300)
    max_depth = trial.suggest_int("max_depth", 5, 30)
    min_samples_split = trial.suggest_int("min_samples_split", 2, 10)
    min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 10)
    max_features = trial.suggest_categorical("max_features", ["sqrt", "log2", None])

    # Define model
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf,
        max_features=max_features,
        random_state=42,
        n_jobs=-1,
    )

    # Cross-validation for robust evaluation
    score = cross_val_score(model, X_train, y_train, cv=3, scoring="accuracy", n_jobs=-1).mean()

    return score


# --- Run Optuna Study ---
print("--- Starting Optuna Hyperparameter Optimization ---")
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=30, show_progress_bar=True)

# --- Results ---
print("\n--- Best Hyperparameters Found ---")
print(study.best_params)
print(f"Best cross-validation accuracy: {study.best_value:.4f}")

# --- Train Final Model using Best Hyperparameters ---
best_params = study.best_params
model_sklearn_rf = RandomForestClassifier(**best_params, random_state=42)
model_sklearn_rf.fit(X_train, y_train)

# --- Evaluate on Test Set ---
y_pred_sklearn = model_sklearn_rf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_sklearn)

from sklearn.metrics import classification_report
print(f"\nTest Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_sklearn))

In [None]:
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score

print("--- Applying SMOTE to balance the training data ---")
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)



# 2. Train your BEST model (from Optuna) on the NEW resampled data
print("\n--- Training Final Model on Resampled Data ---")
best_params = {
    'n_estimators': 181,
    'max_depth': 28,
    'min_samples_split': 4,
    'min_samples_leaf': 2,
    'max_features': 'sqrt'
}

final_model = RandomForestClassifier(**best_params, random_state=42)
final_model.fit(X_train_resampled, y_train_resampled)


# 3. Evaluate on the ORIGINAL, UNTOUCHED test set
print("\n--- Evaluating on the Original Test Set ---")
y_pred_final = final_model.predict(X_test)
final_accuracy = accuracy_score(y_test, y_pred_final)

print(f"Final Test Accuracy: {final_accuracy:.4f}")
print("Final Classification Report:")
print(classification_report(y_test, y_pred_final))

## Feature Importance for Random Forest

In [None]:

feature_importances = model_sklearn_rf.feature_importances_

feature_names = X_train.columns

feature_importance_series = pd.Series(feature_importances, index=feature_names)

sorted_feature_importances = feature_importance_series.sort_values(ascending=False)

print("Feature Importances (Random Forest):")
print(sorted_feature_importances)

plt.figure(figsize=(10, 6))
sorted_feature_importances.plot(kind='bar')
plt.title("Feature Importances (Random Forest)")
plt.ylabel("Importance")
plt.tight_layout()
plt.show()

<h3> Support Vector Machine (SVM)

In [None]:
print("--- Training Support Vector Machine (Scikit-learn) ---")
# The 'kernel' can be 'linear', 'poly', 'rbf', etc. 'rbf' is a good default.
# C is the regularization parameter.
model_sklearn_svm = SVC(kernel='rbf', C=1.0, random_state=42)
model_sklearn_svm.fit(X_train.values, y_train.values) # Convert to NumPy arrays
y_pred_sklearn = model_sklearn_svm.predict(X_test.values) # Convert to NumPy arrays

accuracy = accuracy_score(y_test, y_pred_sklearn)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(classification_report(y_test, y_pred_sklearn))

In [None]:
import optuna
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score

# --- Define Objective Function for SVM ---
def objective_svm(trial):
    # Define hyperparameter search space
    # These ranges are standard for SVM tuning
    C = trial.suggest_loguniform('C', 1e-2, 1e2)
    gamma = trial.suggest_loguniform('gamma', 1e-3, 1e-1)
    kernel = trial.suggest_categorical('kernel', ['rbf']) # RBF is usually best for this kind of problem

    # Define model
    model = SVC(
        C=C,
        gamma=gamma,
        kernel=kernel,
        random_state=42
    )

    # Cross-validation on the balanced training data for robust evaluation
    # Make sure you have X_train_resampled and y_train_resampled from your SMOTE step
    score = cross_val_score(model, X_train_resampled, y_train_resampled, cv=3, scoring="accuracy", n_jobs=-1).mean()

    return score


# --- Run Optuna Study ---
print("--- Starting Optuna Hyperparameter Optimization for SVM ---")
study_svm = optuna.create_study(direction="maximize")
study_svm.optimize(objective_svm, n_trials=30, show_progress_bar=True) # 30 trials is a good start

# --- Results ---
print("\n--- Best Hyperparameters Found for SVM ---")
print(study_svm.best_params)
print(f"Best cross-validation accuracy: {study_svm.best_value:.4f}")

# --- Train Final SVM using Best Hyperparameters on Resampled Data ---
best_svm_params = study_svm.best_params
model_sklearn_svm = SVC(**best_svm_params, random_state=42)
model_sklearn_svm.fit(X_train_resampled, y_train_resampled)

# --- Evaluate on the Original Test Set ---
y_pred_svm = model_sklearn_svm.predict(X_test)
accuracy_svm = accuracy_score(y_test, y_pred_svm)

print(f"\nTest Accuracy (Tuned SVM): {accuracy_svm:.4f}")
print("Classification Report (Tuned SVM):")
print(classification_report(y_test, y_pred_svm))

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# 'final_model' is your best trained Random Forest
# 'X_test' and 'y_test' are your original, untouched test sets
y_pred_final = final_model.predict(X_test)
cm = confusion_matrix(y_test, y_pred_final)

disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for Final Random Forest Model')
plt.show()

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Create a dictionary with your final model results
# Replace these with your actual final scores
model_scores = {
    'Softmax Regression': {'Accuracy': 0.57, 'Macro F1-Score': 0.50},
    'Tuned SVM': {'Accuracy': 0.62, 'Macro F1-Score': 0.51}, # Use your tuned SVM score
    'Tuned Random Forest': {'Accuracy': 0.73, 'Macro F1-Score': 0.67} # Use your final RF score
}

# Convert to a DataFrame for easy plotting
scores_df = pd.DataFrame(model_scores).T.reset_index().rename(columns={'index': 'Model'})
scores_df_melted = scores_df.melt(id_vars='Model', var_name='Metric', value_name='Score')

# Plotting
plt.figure(figsize=(12, 7))
sns.barplot(x='Model', y='Score', hue='Metric', data=scores_df_melted)
plt.title('Final Model Performance Comparison', fontsize=16)
plt.ylabel('Score')
plt.xlabel('Model')
plt.ylim(0, 1.0) # Set y-axis from 0 to 1 for scores
plt.xticks(rotation=15)
plt.legend(title='Metric')
plt.show()