# Manual Gaussian Naive Bayes (No scikit-learn)

This notebook re-implements a Gaussian Naive Bayes classifier from scratch to compare performance with vs. without the engineered `purity_score` feature.

Goals:
1) Load `fetal_health_with_purity.csv`
2) Implement utilities: stratified split, metrics (accuracy, confusion matrix, per-class precision/recall/F1)
3) Implement Gaussian Naive Bayes manually: priors, per-class feature means/variances, and log-likelihood scoring with numerical stability
4) Run two experiments: with `purity_score` and without
5) Compare metrics and optionally persist the better model

Notes:
- No scikit-learn is used for modeling or splitting; only pandas, numpy, and standard library
- Variance smoothing (epsilon) added to avoid divide-by-zero
- Computations are vectorized for clarity and performance

In [None]:
# === Manual Gaussian Naive Bayes training & evaluation ===
# - No scikit-learn
# - Two experiments: with and without `purity_score`

import os
import math
import pickle
import numpy as np
import pandas as pd
from collections import defaultdict

# -----------------------------
# Data loading
# -----------------------------
print("Loading dataset with purity score...")
df = pd.read_csv('fetal_health_with_purity.csv')
print(f"Dataset shape: {df.shape}")

target_col = 'fetal_health' if 'fetal_health' in df.columns else df.columns[-1]
feature_cols_all = [c for c in df.columns if c != target_col]
if 'purity_score' not in feature_cols_all:
    raise RuntimeError("purity_score not found. Ensure the feature engineering notebook saved fetal_health_with_purity.csv.")

feature_cols_without = [c for c in feature_cols_all if c != 'purity_score']

# -----------------------------
# Utilities: stratified split
# -----------------------------
def stratified_train_test_split(X: np.ndarray, y: np.ndarray, test_size=0.2, random_state=42):
    rng = np.random.RandomState(random_state)
    y = np.asarray(y)
    classes = np.unique(y)
    train_idx, test_idx = [], []
    for c in classes:
        idx = np.where(y == c)[0]
        rng.shuffle(idx)
        n_test = max(1, int(round(len(idx) * test_size)))
        test_idx.extend(idx[:n_test].tolist())
        train_idx.extend(idx[n_test:].tolist())
    # Shuffle final indices for randomness
    rng.shuffle(train_idx)
    rng.shuffle(test_idx)
    return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

# -----------------------------
# Metrics: accuracy, confusion matrix, report
# -----------------------------
def accuracy(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return (y_true == y_pred).mean()

def confusion_matrix_(y_true, y_pred, labels=None):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    if labels is None:
        labels = np.unique(np.concatenate([y_true, y_pred]))
    label_to_idx = {lbl: i for i, lbl in enumerate(labels)}
    cm = np.zeros((len(labels), len(labels)), dtype=int)
    for t, p in zip(y_true, y_pred):
        cm[label_to_idx[t], label_to_idx[p]] += 1
    return cm, labels

def classification_report_(y_true, y_pred, labels=None):
    cm, labels = confusion_matrix_(y_true, y_pred, labels)
    report_rows = []
    for i, lbl in enumerate(labels):
        tp = cm[i, i]
        fp = cm[:, i].sum() - tp
        fn = cm[i, :].sum() - tp
        prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        rec = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * prec * rec / (prec + rec) if (prec + rec) > 0 else 0.0
        support = cm[i, :].sum()
        report_rows.append((lbl, prec, rec, f1, int(support)))
    # Macro averages
    macro_prec = np.mean([r[1] for r in report_rows])
    macro_rec = np.mean([r[2] for r in report_rows])
    macro_f1 = np.mean([r[3] for r in report_rows])
    return report_rows, (macro_prec, macro_rec, macro_f1)

# -----------------------------
# Manual Gaussian Naive Bayes
# -----------------------------
class ManualGaussianNB:
    def __init__(self, var_smoothing=1e-9, prior_smoothing=0.0):
        self.var_smoothing = var_smoothing
        self.prior_smoothing = prior_smoothing
        self.classes_ = None
        self.class_log_prior_ = None
        self.theta_ = None  # means per class per feature [n_classes, n_features]
        self.sigma_ = None  # variances per class per feature [n_classes, n_features]

    def fit(self, X: np.ndarray, y: np.ndarray):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y)
        self.classes_, y_idx = np.unique(y, return_inverse=True)
        n_classes = len(self.classes_)
        n_features = X.shape[1]

        # Priors with optional smoothing
        class_counts = np.bincount(y_idx, minlength=n_classes).astype(float)
        if self.prior_smoothing > 0:
            class_counts += self.prior_smoothing
        class_priors = class_counts / class_counts.sum()
        self.class_log_prior = np.log(class_priors)

        # Means and variances per class
        theta = np.zeros((n_classes, n_features), dtype=float)
        sigma = np.zeros((n_classes, n_features), dtype=float)
        for c_i in range(n_classes):
            Xi = X[y_idx == c_i]
            if Xi.shape[0] == 0:
                # Handle empty class (should not happen with stratified split)
                theta[c_i, :] = 0.0
                sigma[c_i, :] = 1.0
            else:
                theta[c_i, :] = Xi.mean(axis=0)
                # Unbiased variance can be noisy for tiny classes; use population variance
                v = Xi.var(axis=0)
                # Variance smoothing to avoid zero
                v = np.maximum(v, self.var_smoothing)
                sigma[c_i, :] = v
        self.theta_ = theta
        self.sigma_ = sigma
        return self

    def _joint_log_likelihood(self, X: np.ndarray):
        X = np.asarray(X, dtype=float)
        # Using Gaussian log pdf per feature and summing (naive independence)
        # log P(x|c) = sum_j [ -0.5*log(2*pi*sigma_cj) - (x_j - mu_cj)^2 / (2*sigma_cj) ]
        n_classes, n_features = self.theta_.shape
        # Precompute constants
        log_prob = []
        for c_i in range(n_classes):
            mu = self.theta_[c_i]
            var = self.sigma_[c_i]
            # Avoid division by zero (already smoothed)
            log_det = -0.5 * np.sum(np.log(2.0 * np.pi * var))
            # Quadratic term
            quad = -0.5 * np.sum(((X - mu) ** 2) / var, axis=1)
            log_prob.append(self.class_log_prior[c_i] + log_det + quad)
        # Shape -> [n_samples, n_classes]
        return np.vstack(log_prob).T

    def predict(self, X: np.ndarray):
        jll = self._joint_log_likelihood(X)
        idx = np.argmax(jll, axis=1)
        return self.classes_[idx]

    def predict_proba(self, X: np.ndarray):
        jll = self._joint_log_likelihood(X)
        # Convert log-likelihoods to probabilities with log-sum-exp stabilization
        max_log = np.max(jll, axis=1, keepdims=True)
        exp_shifted = np.exp(jll - max_log)
        probs = exp_shifted / np.sum(exp_shifted, axis=1, keepdims=True)
        return probs

# -----------------------------
# Prepare data and run experiments
# -----------------------------
# Ensure numeric target if needed
labels = df[target_col].values
# If target is float but actually categorical like 1.0, 2.0, 3.0, keep as-is

# Two feature matrices
X_with = df[feature_cols_all].values.astype(float)
X_without = df[feature_cols_without].values.astype(float)

# Split with stratification
Xw_tr, Xw_te, yw_tr, yw_te = stratified_train_test_split(X_with, labels, test_size=0.2, random_state=42)
Xo_tr, Xo_te, yo_tr, yo_te = stratified_train_test_split(X_without, labels, test_size=0.2, random_state=42)

# Train models
nb_with = ManualGaussianNB(var_smoothing=1e-9, prior_smoothing=0.0).fit(Xw_tr, yw_tr)
nb_without = ManualGaussianNB(var_smoothing=1e-9, prior_smoothing=0.0).fit(Xo_tr, yo_tr)

# Predictions
yp_with = nb_with.predict(Xw_te)
yp_without = nb_without.predict(Xo_te)

# Metrics
acc_with = accuracy(yw_te, yp_with)
acc_without = accuracy(yo_te, yp_without)

rep_with, macro_with = classification_report_(yw_te, yp_with)
rep_without, macro_without = classification_report_(yo_te, yp_without)

cm_with, labels_with = confusion_matrix_(yw_te, yp_with)
cm_without, labels_without = confusion_matrix_(yo_te, yp_without)

print("\n=== Results: With purity_score ===")
print(f"Accuracy: {acc_with:.4f}")
print("Per-class (label, precision, recall, f1, support):")
for row in rep_with:
    print(row)
print(f"Macro avg (precision, recall, f1): {macro_with}")
print("Confusion matrix (rows=true, cols=pred):\n", cm_with)

print("\n=== Results: Without purity_score ===")
print(f"Accuracy: {acc_without:.4f}")
print("Per-class (label, precision, recall, f1, support):")
for row in rep_without:
    print(row)
print(f"Macro avg (precision, recall, f1): {macro_without}")
print("Confusion matrix (rows=true, cols=pred):\n", cm_without)

# -----------------------------
# Save best model (optional)
# -----------------------------
best_tag = 'with_purity' if acc_with >= acc_without else 'without_purity'
best_model = nb_with if best_tag == 'with_purity' else nb_without
os.makedirs('model', exist_ok=True)
model_path = f"model/gaussian_nb_manual_{best_tag}.pkl"
with open(model_path, 'wb') as f:
    pickle.dump({
        'model': best_model,
        'feature_cols': feature_cols_all if best_tag=='with_purity' else feature_cols_without,
        'target_col': target_col,
        'var_smoothing': best_model.var_smoothing,
        'classes_': best_model.classes_
    }, f)
print(f"\nSaved best manual model to: {model_path}")

# Quick sanity predictions on a few samples
n_show = min(5, len(Xw_te))
idxs = np.random.choice(len(Xw_te), n_show, replace=False)
print("\nSample predictions (with purity): True vs Pred")
for i in idxs:
    print(int(yw_te[i]), int(yp_with[i]))
