In [1]:
# Cell M1 - Imports and file paths
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# Paths (change if needed)
mnist_train_path = 'train.csv'
mnist_test_path  = 'test.csv'

# Quick check
for p in (mnist_train_path, mnist_test_path):
    if not os.path.exists(p):
        print(f"Warning: {p} not found. Please upload the file or change the path.")
        
print("Imports ready")


Imports ready


In [2]:
# Cell M2 - Evaluation metrics implemented from scratch

def confusion_matrix(y_true, y_pred):
    classes = np.unique(np.concatenate([y_true, y_pred]))
    class_to_idx = {c:i for i,c in enumerate(classes)}
    cm = np.zeros((len(classes), len(classes)), dtype=int)
    for t, p in zip(y_true, y_pred):
        cm[class_to_idx[t], class_to_idx[p]] += 1
    return cm, classes

def accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred)

def precision_recall_f1(y_true, y_pred, average='macro'):
    # returns (precision, recall, f1)
    cm, classes = confusion_matrix(y_true, y_pred)
    precisions = []
    recalls = []
    f1s = []
    for i in range(len(classes)):
        tp = cm[i,i]
        fp = cm[:,i].sum() - tp
        fn = cm[i,:].sum() - tp
        prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        rec  = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1v = 2*prec*rec / (prec + rec) if (prec + rec) > 0 else 0.0
        precisions.append(prec); recalls.append(rec); f1s.append(f1v)
    if average == 'macro':
        return np.mean(precisions), np.mean(recalls), np.mean(f1s)
    elif average == 'micro':
        tp = sum(cm[i,i] for i in range(len(classes)))
        fp = (cm.sum(axis=0) - np.diag(cm)).sum()
        fn = (cm.sum(axis=1) - np.diag(cm)).sum()
        prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        rec  = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1v = 2*prec*rec / (prec + rec) if (prec + rec) > 0 else 0.0
        return prec, rec, f1v
    else:
        raise ValueError("average must be 'macro' or 'micro'")


In [3]:
# Cell M3 - Load MNIST train/test from CSV (format: label,pixel0,...,pixel783)
if not os.path.exists(mnist_train_path) or not os.path.exists(mnist_test_path):
    raise FileNotFoundError("MNIST CSV files not found at provided paths. Update paths and retry.")

mnist_train = pd.read_csv(mnist_train_path)
mnist_test  = pd.read_csv(mnist_test_path)

print("Train shape:", mnist_train.shape)
print("Test shape:", mnist_test.shape)

# Separate X,y
y_train = mnist_train.iloc[:,0].values
X_train = mnist_train.iloc[:,1:].values.astype(np.float32) / 255.0

# If test.csv contains labels, use them; otherwise we only predict
if mnist_test.shape[1] == 785:
    y_test = mnist_test.iloc[:,0].values
    X_test = mnist_test.iloc[:,1:].values.astype(np.float32) / 255.0
else:
    y_test = None
    X_test = mnist_test.values.astype(np.float32) / 255.0
    
print("X_train shape:", X_train.shape, "y_train shape:", y_train.shape)
if y_test is not None:
    print("X_test shape:", X_test.shape, "y_test shape:", y_test.shape)
else:
    print("X_test shape:", X_test.shape, " -- test labels not provided")


Train shape: (42000, 785)
Test shape: (28000, 784)
X_train shape: (42000, 784) y_train shape: (42000,)
X_test shape: (28000, 784)  -- test labels not provided


In [4]:
# Cell M4 - Multiclass logistic regression using one-vs-rest (from scratch)

class LogisticOVR:
    def __init__(self, lr=0.5, n_iter=1000, fit_intercept=True, reg_lambda=0.0, verbose=False):
        self.lr = lr
        self.n_iter = n_iter
        self.fit_intercept = fit_intercept
        self.reg_lambda = reg_lambda
        self.verbose = verbose
    
    def _add_intercept(self, X):
        if not self.fit_intercept:
            return X
        return np.hstack([np.ones((X.shape[0],1), dtype=X.dtype), X])
    
    def _sigmoid(self, z):
        z = np.clip(z, -500, 500)
        return 1.0 / (1.0 + np.exp(-z))
    
    def fit(self, X, y):
        self.classes_ = np.unique(y)
        n_samples, n_features = X.shape
        self.W = np.zeros((len(self.classes_), n_features + (1 if self.fit_intercept else 0)))
        for idx, cls in enumerate(self.classes_):
            # binary labels for this class
            y_bin = (y == cls).astype(np.float32)
            w = np.zeros(n_features + (1 if self.fit_intercept else 0), dtype=np.float32)
            Xb = self._add_intercept(X)
            for i in range(self.n_iter):
                z = Xb.dot(w)
                p = self._sigmoid(z)
                error = p - y_bin
                grad = (Xb.T @ error) / n_samples
                # L2 regularization (do not regularize intercept)
                if self.reg_lambda > 0:
                    reg = np.concatenate(([0.0], self.reg_lambda * w[1:] / n_samples)) if self.fit_intercept else self.reg_lambda * w / n_samples
                    grad += reg
                w -= self.lr * grad
                if self.verbose and (i % max(1, self.n_iter//5) == 0):
                    loss = -np.mean(y_bin*np.log(p+1e-12) + (1-y_bin)*np.log(1-p+1e-12))
                    print(f"Class {cls} iter {i}/{self.n_iter} loss {loss:.4f}")
            self.W[idx] = w
        return self
    
    def predict_proba(self, X):
        Xb = self._add_intercept(X)
        logits = Xb @ self.W.T  # shape (n_samples, n_classes)
        probs = 1.0 / (1.0 + np.exp(-logits))
        # for OVR, normalize to pseudo-probabilities across classes
        # but better to use softmax-like normalization: convert OVR-scores to probabilities by normalizing logits via softmax
        # compute softmax of logits for multi-class probabilities
        exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
        probs_multi = exp_logits / exp_logits.sum(axis=1, keepdims=True)
        return probs_multi  # shape (n_samples, n_classes)
    
    def predict(self, X):
        probs = self.predict_proba(X)
        idx = np.argmax(probs, axis=1)
        return self.classes_[idx]


In [5]:
# Cell M5 - Train OVR logistic on MNIST (can be slow; tune n_iter if needed)
# Choose hyperparameters (these are moderate; adjust for speed)
lr = 0.5
n_iter = 800  # increase for better convergence, but slower
reg_lambda = 0.01

logreg = LogisticOVR(lr=lr, n_iter=n_iter, reg_lambda=reg_lambda, verbose=False)
print("Training logistic OVR (may take several minutes depending on dataset size and n_iter)...")
logreg.fit(X_train, y_train)
print("Training completed.")

# Predict
y_pred_test = logreg.predict(X_test)
if y_test is not None:
    acc = accuracy(y_test, y_pred_test)
    prec, rec, f1 = precision_recall_f1(y_test, y_pred_test, average='macro')
    print(f"MNIST Test metrics (macro): Accuracy={acc:.4f}, Precision={prec:.4f}, Recall={rec:.4f}, F1={f1:.4f}")
    cm, classes = confusion_matrix(y_test, y_pred_test)
    print("Confusion matrix shape:", cm.shape)
else:
    print("Test labels not provided; predictions available in y_pred_test array.")


Training logistic OVR (may take several minutes depending on dataset size and n_iter)...
Training completed.
Test labels not provided; predictions available in y_pred_test array.


In [6]:
# Cell M6 - Convert array->image and single-sample predict wrapper

def array_to_image(arr, size=(28,28), scale=255, cmap='gray'):
    """
    Convert 1D array length 784 (values in [0,1] or [0,255]) to PIL Image.
    Returns PIL.Image.
    """
    a = np.array(arr, dtype=np.float32).copy()
    if a.max() <= 1.0:
        a = a * scale
    a = a.reshape(size)
    # Convert to uint8 and PIL image
    a_uint8 = np.clip(a, 0, 255).astype(np.uint8)
    return Image.fromarray(a_uint8, mode='L')

def predict_single_image(arr784, model):
    """
    arr784: 1D array length 784 (pixel intensities 0-255 or 0-1)
    model: trained LogisticOVR instance
    Returns: PIL image, predicted label
    """
    x = np.array(arr784, dtype=np.float32).reshape(1, -1)
    # If pixels appear in 0-255 range, scale to [0,1] as model trained on scaled
    if x.max() > 1.0:
        x = x / 255.0
    pred = model.predict(x)[0]
    img = array_to_image(x.ravel(), size=(28,28))
    return img, int(pred)

# Example usage if you want to test on the first test sample:
# img, label_pred = predict_single_image(X_test[0]*255.0, logreg)  # pass original scale optionally
# display(img); print("Predicted:", label_pred)


In [7]:
# Cell B1 - Load bank-full.csv and set split parameters (reuse same split as Assignment 9: 80:20 with random_state=42)
bank_path = 'bank-full.csv'
if not os.path.exists(bank_path):
    raise FileNotFoundError("bank-full.csv not found at /mnt/data. Upload or change path.")

bank_df = pd.read_csv(bank_path, sep=';')
print("Bank dataset shape:", bank_df.shape)
bank_df.head()


Bank dataset shape: (45211, 17)


Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


In [8]:
# Cell B2 - Preprocessing for Naive Bayes

def preprocess_bank(df, drop_duration=True):
    df_proc = df.copy()
    if drop_duration and 'duration' in df_proc.columns:
        df_proc = df_proc.drop(columns=['duration'])
    # Map target
    df_proc['y'] = df_proc['y'].map({'no':0, 'yes':1})
    # Identify categorical and numeric
    cat_cols = df_proc.select_dtypes(include=['object']).columns.tolist()
    num_cols = df_proc.select_dtypes(include=[np.number]).columns.tolist()
    num_cols.remove('y')
    # Fill 'unknown' in categorical with a string 'unknown' (keep it as a category)
    # For safety, replace missing with mode/median:
    for c in cat_cols:
        df_proc[c] = df_proc[c].fillna('unknown')
    for n in num_cols:
        df_proc[n] = df_proc[n].fillna(df_proc[n].median())
    return df_proc, cat_cols, num_cols

bank_proc, cat_cols, num_cols = preprocess_bank(bank_df, drop_duration=True)
print("Categorical cols:", cat_cols)
print("Numeric cols:", num_cols)


Categorical cols: ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome']
Numeric cols: ['age', 'balance', 'day', 'campaign', 'pdays', 'previous']


In [9]:
# Cell B3 - create 80:20 split (same as earlier assignment)
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(bank_proc, test_size=0.2, shuffle=True, random_state=42)
print("Train shape:", train_df.shape, "Test shape:", test_df.shape)

X_train_bank = train_df.drop(columns=['y']).reset_index(drop=True)
y_train_bank = train_df['y'].values
X_test_bank  = test_df.drop(columns=['y']).reset_index(drop=True)
y_test_bank  = test_df['y'].values


Train shape: (36168, 16) Test shape: (9043, 16)


In [10]:
# Cell B4 - Hybrid Naive Bayes classifier (from scratch)

class HybridNaiveBayes:
    def __init__(self, categorical_features, numeric_features, laplace=1.0):
        self.cat_features = list(categorical_features)
        self.num_features = list(numeric_features)
        self.laplace = laplace
        self.class_priors = {}
        self.cat_cond_prob = {}  # dict[class][feature][value] = prob
        self.num_params = {}     # dict[class][feature] = (mean, var)
        self.classes_ = None
    
    def fit(self, X_df, y):
        self.classes_, counts = np.unique(y, return_counts=True)
        n = len(y)
        # class priors
        for cls, cnt in zip(self.classes_, counts):
            self.class_priors[cls] = cnt / n
        # categorical conditionals
        for cls in self.classes_:
            Xc = X_df[y == cls]
            self.cat_cond_prob[cls] = {}
            for f in self.cat_features:
                vals, vc = np.unique(Xc[f], return_counts=True)
                # All possible values in training overall for that feature (important for smoothing)
                all_vals = np.unique(X_df[f])
                prob_dict = {}
                total_count = Xc.shape[0]
                for v in all_vals:
                    # count occurrences of value v in feature f for class cls
                    cnt_v = np.sum(Xc[f] == v)
                    prob = (cnt_v + self.laplace) / (total_count + self.laplace * len(all_vals))
                    prob_dict[v] = prob
                self.cat_cond_prob[cls][f] = prob_dict
        # numeric parameters (Gaussian)
        for cls in self.classes_:
            Xc = X_df[y == cls]
            self.num_params[cls] = {}
            for f in self.num_features:
                vals = Xc[f].astype(float).values
                mean = np.mean(vals) if len(vals)>0 else 0.0
                var  = np.var(vals)  if len(vals)>0 else 1.0
                # ensure non-zero variance
                if var == 0.0:
                    var = 1e-6
                self.num_params[cls][f] = (mean, var)
        return self
    
    def _gaussian_logpdf(self, x, mean, var):
        # log pdf of normal distribution
        return -0.5 * np.log(2*np.pi*var) - ((x-mean)**2) / (2*var)
    
    def predict(self, X_df):
        # X_df: pandas DataFrame, same columns as train
        n = X_df.shape[0]
        y_pred = np.zeros(n, dtype=int)
        for i in range(n):
            row = X_df.iloc[i]
            class_logpost = {}
            for cls in self.classes_:
                # log prior
                logp = np.log(self.class_priors[cls] + 1e-12)
                # categorical features
                for f in self.cat_features:
                    val = row[f]
                    # If unseen value (shouldn't happen because we used overall training set values), handle
                    prob = self.cat_cond_prob[cls][f].get(val, None)
                    if prob is None:
                        # smooth unseen value: assign minimal probability
                        all_vals = list(self.cat_cond_prob[cls][f].keys())
                        prob = self.laplace / (self.laplace * (len(all_vals)+1))
                    logp += np.log(prob + 1e-12)
                # numeric features (Gaussian logpdf)
                for f in self.num_features:
                    mean, var = self.num_params[cls][f]
                    logp += self._gaussian_logpdf(row[f], mean, var)
                class_logpost[cls] = logp
            # choose class with highest log posterior
            preds_sorted = sorted(class_logpost.items(), key=lambda x: x[1], reverse=True)
            y_pred[i] = preds_sorted[0][0]
        return y_pred


In [11]:
# Cell B5 - Train and evaluate the Hybrid Naive Bayes
nb = HybridNaiveBayes(categorical_features=cat_cols, numeric_features=num_cols, laplace=1.0)
nb.fit(X_train_bank, y_train_bank)
print("Naive Bayes training complete.")

y_pred_bank = nb.predict(X_test_bank)

acc_b = accuracy(y_test_bank, y_pred_bank)
prec_b, rec_b, f1_b = precision_recall_f1(y_test_bank, y_pred_bank, average='macro')
cm_b, _ = confusion_matrix(y_test_bank, y_pred_bank)

print("Bank dataset (Naive Bayes) evaluation (macro):")
print(f"Accuracy = {acc_b:.4f}")
print(f"Precision = {prec_b:.4f}")
print(f"Recall = {rec_b:.4f}")
print(f"F1-score = {f1_b:.4f}")
print("Confusion matrix (rows=true classes in order 0,1):")
print(cm_b)


Naive Bayes training complete.
Bank dataset (Naive Bayes) evaluation (macro):
Accuracy = 0.8404
Precision = 0.6317
Recall = 0.6400
F1-score = 0.6356
Confusion matrix (rows=true classes in order 0,1):
[[7190  762]
 [ 681  410]]
