In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Load and preprocess the dataset
data = pd.read_csv("/Users/shubhamgandhi/Desktop/aml/Churn-Prediction/preprocessed_with_smote.csv")
data_cleaned = data.drop([], axis=1)



# Separate features and target variable
X = data_cleaned.drop("Exited", axis=1)
y = data_cleaned["Exited"]

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42, stratify=y)

# Initialize base models
random_forest = RandomForest( n_estimators=100)
gboost = GradientBoosting( n_estimators=100)

# Prepare stacking features
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
stack_train_list = []  # List to collect stacking training data
stack_test = pd.DataFrame()
stack_test_preds = []

# Train base models using k-fold cross-validation for stacking
for train_idx, valid_idx in skf.split(X_train, y_train):
    X_fold_train, X_fold_valid = X_train[train_idx], X_train[valid_idx]
    y_fold_train, y_fold_valid = y_train.iloc[train_idx], y_train.iloc[valid_idx]
    
    # Train Random Forest and Gradient Boosting
    random_forest.fit(X_fold_train, y_fold_train)
    gboost.fit(X_fold_train, y_fold_train)
    
    # Generate predictions for stacking
    stack_train_list.append(pd.DataFrame({
        "rf_pred": random_forest.predict_proba(X_fold_valid)[:, 1],
        "gboost_pred": gboost.predict_proba(X_fold_valid)[:, 1],
        "target": y_fold_valid.values
    }))
    
    # Collect test predictions for averaging
    stack_test_preds.append({
        "rf_pred": random_forest.predict_proba(X_test)[:, 1],
        "gboost_pred": gboost.predict_proba(X_test)[:, 1],
    })

# Concatenate all stacking data into a single DataFrame
stack_train = pd.concat(stack_train_list, ignore_index=True)

# Average test predictions from each fold
stack_test["rf_pred"] = sum([pred["rf_pred"] for pred in stack_test_preds]) / len(stack_test_preds)
stack_test["gboost_pred"] = sum([pred["gboost_pred"] for pred in stack_test_preds]) / len(stack_test_preds)

# Separate training data for the meta-model
X_meta = stack_train[["rf_pred", "gboost_pred"]]
y_meta = stack_train["target"]

# Train Logistic Regression as the meta-model
logistic_regression = LogisticRegression(random_state=42)
logistic_regression.fit(X_meta, y_meta)

# Make predictions on the meta test set
meta_test_preds = logistic_regression.predict_proba(stack_test)[:, 1]
meta_test_labels = (meta_test_preds > 0.5).astype(int)

# Evaluate the performance of the stacked model
stacked_accuracy = accuracy_score(y_test, meta_test_labels)
print(f"Stacked Model Accuracy: {stacked_accuracy:.2%}")

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import numpy as np

# Classification report
print("Classification Report:")
print(classification_report(y_test, meta_test_labels))

# Confusion matrix
print("Confusion Matrix:")
conf_matrix = confusion_matrix(y_test, meta_test_labels)
print(conf_matrix)

# Plot confusion matrix
plt.figure(figsize=(6, 6))
plt.matshow(conf_matrix, cmap='Blues', fignum=1)
plt.title("Confusion Matrix", pad=20)
plt.colorbar()
plt.xlabel("Predicted")
plt.ylabel("Actual")
for (i, j), value in np.ndenumerate(conf_matrix):
    plt.text(j, i, f"{value}", ha="center", va="center")
plt.show()

# ROC AUC score
roc_auc = roc_auc_score(y_test, meta_test_preds)
print(f"ROC AUC Score: {roc_auc:.2f}")

# Plot ROC curve
fpr, tpr, _ = roc_curve(y_test, meta_test_preds)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"Stacked Model (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray", label="Random Guess")
plt.title("ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend(loc="lower right")
plt.grid()
plt.show()

In [None]:
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from scipy.special import expit  # for sigmoid if needed

# -----------------------------------------------
# Decision Tree and PreSortedDecisionTree Classes
# -----------------------------------------------
class DecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
        self.tree = None

    def fit(self, X, y, depth=0):
        if self.max_depth is not None and depth >= self.max_depth:
            return np.round(np.mean(y))  # Majority class as leaf

        if len(np.unique(y)) == 1:
            return y.iloc[0]

        if X.shape[1] == 0:
            return np.round(np.mean(y))

        best_feature, best_threshold = self._best_split(X, y)
        if best_feature is None:
            return np.round(np.mean(y))

        left_idx = X[best_feature] <= best_threshold
        right_idx = ~left_idx

        left_tree = self.fit(X[left_idx], y[left_idx], depth + 1)
        right_tree = self.fit(X[right_idx], y[right_idx], depth + 1)

        return {"feature": best_feature, "threshold": best_threshold, "left": left_tree, "right": right_tree}

    def _best_split(self, X, y):
        raise NotImplementedError("This method should be implemented in the subclass.")

    def predict(self, X):
        return X.apply(self._predict_row, axis=1, tree=self.tree)

    def _predict_row(self, row, tree):
        if isinstance(tree, dict):
            if row[tree["feature"]] <= tree["threshold"]:
                return self._predict_row(row, tree["left"])
            else:
                return self._predict_row(row, tree["right"])
        else:
            return tree


class PreSortedDecisionTree(DecisionTree):
    def _best_split(self, X, y):
        best_gain = -1
        best_feature = None
        best_threshold = None

        for feature in X.columns:
            sorted_indices = np.argsort(X[feature])
            X_sorted, y_sorted = X.iloc[sorted_indices], y.iloc[sorted_indices]

            for i in range(1, len(y_sorted)):
                if X_sorted[feature].iloc[i] == X_sorted[feature].iloc[i - 1]:
                    continue

                threshold = (X_sorted[feature].iloc[i] + X_sorted[feature].iloc[i - 1]) / 2
                left_idx = X_sorted[feature] <= threshold
                right_idx = ~left_idx

                if len(y_sorted[left_idx]) == 0 or len(y_sorted[right_idx]) == 0:
                    continue

                gain = self._information_gain(y_sorted, y_sorted[left_idx], y_sorted[right_idx])
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold

    @staticmethod
    def _information_gain(parent, left, right):
        def entropy(y):
            probabilities = y.value_counts(normalize=True)
            return -sum(p * np.log2(p) for p in probabilities if p > 0)

        parent_entropy = entropy(parent)
        left_entropy = entropy(left)
        right_entropy = entropy(right)

        n = len(parent)
        n_left = len(left)
        n_right = len(right)

        weighted_avg_entropy = (n_left / n) * left_entropy + (n_right / n) * right_entropy
        return parent_entropy - weighted_avg_entropy

# ---------------------------------------------------
# Custom Random Forest Implementation with predict_proba
# ---------------------------------------------------
class RandomForest:
    def __init__(self, n_trees=10, max_depth=None, max_features=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []

    def fit(self, X, y):
        self.trees = []
        n_features = X.shape[1]
        max_features = self.max_features or int(np.sqrt(n_features))

        for _ in range(self.n_trees):
            indices = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X.iloc[indices], y.iloc[indices]

            features = random.sample(list(X.columns), max_features)
            X_sample = X_sample[features]

            tree = PreSortedDecisionTree(max_depth=self.max_depth)
            tree.tree = tree.fit(X_sample, y_sample)

            self.trees.append((tree, features))

    def predict(self, X):
        predictions = []
        for tree, features in self.trees:
            X_subset = X[features]
            predictions.append(tree.predict(X_subset))

        predictions = np.array(predictions)  # shape (n_trees, n_samples)
        # Majority vote
        majority_vote = np.round(predictions.mean(axis=0))
        return majority_vote

    def predict_proba(self, X):
        # Probability as mean prediction across trees
        # Each tree returns 0 or 1 predictions
        all_preds = []
        for tree, features in self.trees:
            X_subset = X[features]
            preds = tree.predict(X_subset).values.astype(float)
            all_preds.append(preds)
        all_preds = np.array(all_preds)
        prob_class_1 = all_preds.mean(axis=0)
        prob_class_0 = 1 - prob_class_1
        return np.vstack((prob_class_0, prob_class_1)).T


# -------------------------------------------------
# OptimizedDecisionTree for Gradient Boosting
# -------------------------------------------------
class OptimizedDecisionTree:
    """
    A more efficient implementation of a regression tree for Gradient Boosting.
    """

    def __init__(self, max_depth=3, min_samples_split=2, num_thresholds=10):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.num_thresholds = num_thresholds
        self.tree = None

    def fit(self, X, y, depth=0):
        n_samples, n_features = X.shape
        if depth == self.max_depth or n_samples < self.min_samples_split:
            return np.mean(y)  # Return leaf value
        
        best_split = self.find_best_split(X, y, n_features)
        if not best_split:
            return np.mean(y)
        
        feature, threshold, left_idx, right_idx = best_split
        left_tree = self.fit(X[left_idx], y[left_idx], depth + 1)
        right_tree = self.fit(X[right_idx], y[right_idx], depth + 1)
        
        return {"feature": feature, "threshold": threshold, "left": left_tree, "right": right_tree}

    def find_best_split(self, X, y, n_features):
        best_feature, best_threshold = None, None
        best_variance = float("inf")
        best_left_idx, best_right_idx = None, None

        for feature in range(n_features):
            feature_values = X[:, feature]
            thresholds = np.percentile(feature_values, np.linspace(0, 100, self.num_thresholds))
            for threshold in thresholds:
                left_idx = feature_values <= threshold
                right_idx = feature_values > threshold
                if np.sum(left_idx) == 0 or np.sum(right_idx) == 0:
                    continue
                
                left_variance = np.var(y[left_idx]) * np.sum(left_idx)
                right_variance = np.var(y[right_idx]) * np.sum(right_idx)
                total_variance = left_variance + right_variance

                if total_variance < best_variance:
                    best_variance = total_variance
                    best_feature = feature
                    best_threshold = threshold
                    best_left_idx = left_idx
                    best_right_idx = right_idx

        if best_feature is None:
            return None
        return best_feature, best_threshold, best_left_idx, best_right_idx

    def predict_single(self, x, tree):
        if not isinstance(tree, dict):  # Leaf node
            return tree
        feature = tree["feature"]
        threshold = tree["threshold"]
        if x[feature] <= threshold:
            return self.predict_single(x, tree["left"])
        else:
            return self.predict_single(x, tree["right"])

    def predict(self, X):
        return np.array([self.predict_single(x, self.tree) for x in X])

# ---------------------------------------------
# Gradient Boosting Custom Class
# ---------------------------------------------
class GradientBoostingCustom:
    def __init__(self, n_estimators=100, learning_rate=0.01, max_depth=3):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.initial_prediction = None
        self.trees = []

    def fit(self, X, y):
        X = np.array(X)
        y = np.array(y)
        self.initial_prediction = np.mean(y)
        predictions = np.full(len(y), self.initial_prediction)

        for _ in range(self.n_estimators):
            residuals = y - predictions
            tree = OptimizedDecisionTree(max_depth=self.max_depth, num_thresholds=10)
            tree.tree = tree.fit(X, residuals)
            self.trees.append(tree)
            predictions += self.learning_rate * tree.predict(X)

    def predict(self, X):
        X = np.array(X)
        preds = np.full(X.shape[0], self.initial_prediction)
        for tree in self.trees:
            preds += self.learning_rate * tree.predict(X)
        # Convert continuous predictions to binary (threshold at 0.5)
        return (preds >= 0.5).astype(int)

    def predict_proba(self, X):
        # We'll apply a sigmoid to continuous predictions to convert them to probabilities
        X = np.array(X)
        preds = np.full(X.shape[0], self.initial_prediction)
        for tree in self.trees:
            preds += self.learning_rate * tree.predict(X)

        # Use sigmoid to ensure results are between 0 and 1
        prob_class_1 = expit(preds)
        prob_class_0 = 1 - prob_class_1
        return np.vstack((prob_class_0, prob_class_1)).T


# -------------------------------------------------
# Main code: Data loading, stacking, evaluation
# -------------------------------------------------
if __name__ == "__main__":
    # Load data
    data = pd.read_csv("/Users/shubhamgandhi/Desktop/aml/Churn-Prediction/preprocessed_with_smote.csv")

    # Separate features and target
    X = data.drop("Exited", axis=1)
    y = data["Exited"]

    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_df = pd.DataFrame(X_scaled, columns=X.columns)

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X_df, y, test_size=0.30, random_state=42, stratify=y)

    # Initialize base models
    random_forest = RandomForest(n_trees=100)
    gboost = GradientBoostingCustom(n_estimators=100, learning_rate=0.01, max_depth=3)

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    stack_train_list = []
    stack_test_preds = []

    # Stacking
    for train_idx, valid_idx in skf.split(X_train, y_train):
        X_fold_train, X_fold_valid = X_train.iloc[train_idx], X_train.iloc[valid_idx]
        y_fold_train, y_fold_valid = y_train.iloc[train_idx], y_train.iloc[valid_idx]

        # Fit base models
        random_forest.fit(X_fold_train, y_fold_train)
        gboost.fit(X_fold_train, y_fold_train)

        # Out-of-fold predictions
        rf_val_preds = random_forest.predict_proba(X_fold_valid)[:, 1]
        gb_val_preds = gboost.predict_proba(X_fold_valid)[:, 1]

        fold_df = pd.DataFrame({
            "rf_pred": rf_val_preds,
            "gboost_pred": gb_val_preds,
            "target": y_fold_valid.values
        })
        stack_train_list.append(fold_df)

        # Predictions on the test set
        rf_test_preds = random_forest.predict_proba(X_test)[:, 1]
        gb_test_preds = gboost.predict_proba(X_test)[:, 1]

        stack_test_preds.append({
            "rf_pred": rf_test_preds,
            "gboost_pred": gb_test_preds,
        })

    stack_train = pd.concat(stack_train_list, ignore_index=True)

    # Average test predictions from each fold
    stack_test = pd.DataFrame()
    stack_test["rf_pred"] = sum([pred["rf_pred"] for pred in stack_test_preds]) / len(stack_test_preds)
    stack_test["gboost_pred"] = sum([pred["gboost_pred"] for pred in stack_test_preds]) / len(stack_test_preds)

    # Train meta-model (Logistic Regression)
    X_meta = stack_train[["rf_pred", "gboost_pred"]]
    y_meta = stack_train["target"]

    logistic_regression = LogisticRegression(random_state=42)
    logistic_regression.fit(X_meta, y_meta)

    # Meta predictions
    meta_test_preds = logistic_regression.predict_proba(stack_test)[:, 1]
    meta_test_labels = (meta_test_preds > 0.5).astype(int)

    # Evaluate
    stacked_accuracy = accuracy_score(y_test, meta_test_labels)
    print(f"Stacked Model Accuracy: {stacked_accuracy:.2%}")