In [31]:
import numpy as np
import math
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    precision_score,
    recall_score,
    f1_score,
)
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision
from torchvision import datasets, transforms

In [None]:
# Load and normalize MNIST CSV (0-1)
df = pd.read_csv("mnist_train.csv")
print("Shape (raw):", df.shape)

# first column is label, remaining 784 columns are pixels
label_col = df.columns[0]
X = df.drop(columns=[label_col]).astype(np.float32)
y = df[label_col].astype(np.int64)

# Normalize to [0,1]
X_norm = X / 255.0

print("X_norm shape:", X_norm.shape, "y shape:", y.shape)
print("Min/Max after norm:", float(X_norm.min().min()), float(X_norm.max().max()))

In [None]:
# First, split off 20% test from the full dataset
X_temp, X_test, y_temp, y_test = train_test_split(
    X_norm, y, test_size=0.20, stratify=y
)

# From the remaining 80%, take 25% as validation (0.25 * 0.80 = 0.20 overall)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.25, stratify=y_temp
)

print(
    "Shapes ->",
    f"train: {X_train.shape}",
    f"val: {X_val.shape}",
    f"test: {X_test.shape}",
)

In [None]:
BATCH_SIZE = 64

def make_loader(X, y, batch_size=BATCH_SIZE, shuffle=False):
    tensor_X = torch.tensor(X.values, dtype=torch.float32)
    tensor_y = torch.tensor(y.values, dtype=torch.long)
    ds = TensorDataset(tensor_X, tensor_y)
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

train_loader = make_loader(X_train, y_train, shuffle=True)
val_loader = make_loader(X_val, y_val)
test_loader = make_loader(X_test, y_test)

print(f"Num train batches: {len(train_loader)}")
print(f"Num val batches: {len(val_loader)}")
print(f"Num test batches: {len(test_loader)}")

In [None]:
# Filter data to only labels 0 and 1
# Reuse normalized features X_norm and labels y
mask01 = y.isin([0, 1])
X01 = X_norm[mask01].reset_index(drop=True)
y01 = y[mask01].reset_index(drop=True)

print("Shapes (0/1 only):", X01.shape, y01.shape)
print("Class counts:\n", y01.value_counts().sort_index())


In [None]:
# First, split off 20% test from the full dataset
X_temp_01, X_test_01, y_temp_01, y_test_01 = train_test_split(
    X01, y01, test_size=0.20, stratify=y01
)

# From the remaining 80%, take 25% as validation (0.25 * 0.80 = 0.20 overall)
X_train_01, X_val_01, y_train_01, y_val_01 = train_test_split(
    X_temp_01, y_temp_01, test_size=0.25, stratify=y_temp_01
)

print(
    "Shapes ->",
    f"train: {X_train_01.shape}",
    f"val: {X_val_01.shape}",
    f"test: {X_test_01.shape}",
)

In [None]:
def sigmoid(z: torch.Tensor) -> torch.Tensor:
    return torch.sigmoid(z)

In [None]:
BATCH_SIZE_BIN = 64

def make_loader_bin(X_df, y_series, batch_size=BATCH_SIZE_BIN, shuffle=False):
    X_t = torch.tensor(X_df.values, dtype=torch.float32)
    y_t = torch.tensor(y_series.values, dtype=torch.float32).view(-1, 1)  # (N,1)
    ds = TensorDataset(X_t, y_t)
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

train_01_loader_bin = make_loader_bin(X_train_01, y_train_01, shuffle=True)
val_01_loader_bin = make_loader_bin(X_val_01, y_val_01)
test_01_loader_bin = make_loader_bin(X_test_01, y_test_01)

len(train_01_loader_bin), len(val_01_loader_bin), len(test_01_loader_bin)


In [None]:
# Torch forward functions: logit and probability

def logit_torch(X: torch.Tensor, W: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    return X @ W + b  # (N,1)

def predict_proba(X: torch.Tensor, W: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    return torch.sigmoid(logit_torch(X, W, b))


In [None]:
# Initialize parameters W and b (requires_grad=True)
INPUT_DIM = X_train_01.shape[1]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

W = torch.rand((INPUT_DIM, 1), dtype=torch.float32, requires_grad=True, device=device)
b = torch.rand((1,), dtype=torch.float32, requires_grad=True, device=device)



In [None]:
# Binary cross-entropy loss (PyTorch tensors) and manual GD training (lr=0.01)


EPS = 1e-7

def bce_loss(probs: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
    probs = torch.clamp(probs, EPS, 1.0 - EPS)
    return -(targets * torch.log(probs) + (1.0 - targets) * torch.log(1.0 - probs)).mean()

LR = 0.01
MAX_EPOCHS = 100
PATIENCE = 5

best_val_loss = float("inf")
patience_left = PATIENCE
history = {"epoch": [], "train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

for epoch in range(1, MAX_EPOCHS + 1):
    # Train
    W.grad = None
    b.grad = None
    model_train_loss_sum = 0.0
    train_correct = 0
    num_train_samples = 0

    for xb, yb in train_01_loader_bin:
        xb = xb.to(device)
        yb = yb.to(device)

        logits = logit_torch(xb, W, b)
        probs = torch.sigmoid(logits)
        loss = bce_loss(probs, yb)

        loss.backward()
        with torch.no_grad():
            W -= LR * W.grad
            b -= LR * b.grad
            W.grad.zero_()
            b.grad.zero_()

        preds = (probs >= 0.5).float()
        train_correct += (preds == yb).sum().item()

        model_train_loss_sum += loss.item() * xb.size(0)
        num_train_samples += xb.size(0)

    train_loss = model_train_loss_sum / max(1, num_train_samples)
    train_acc = train_correct / max(1, num_train_samples)

    # Validation
    val_loss_sum = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for xb, yb in val_01_loader_bin:
            xb = xb.to(device)
            yb = yb.to(device)
            logits = logit_torch(xb, W, b)
            probs = torch.sigmoid(logits)
            loss = bce_loss(probs, yb)
            val_loss_sum += loss.item() * xb.size(0)

            preds = (probs >= 0.5).float()
            val_correct += (preds == yb).sum().item()
            val_total += yb.numel()

    val_loss = val_loss_sum / max(1, val_total)
    val_acc = val_correct / max(1, val_total)

    history["epoch"].append(epoch)
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["train_acc"].append(train_acc)
    history["val_acc"].append(val_acc)

    print(f"Epoch {epoch:03d} | train_loss={train_loss:.4f} | val_loss={val_loss:.4f} | train_acc={train_acc:.4f} | val_acc={val_acc:.4f}")

    # Early stopping on val_loss
    if val_loss + 1e-5 < best_val_loss:
        best_val_loss = val_loss
        patience_left = PATIENCE
    else:
        patience_left -= 1
        if patience_left == 0:
            print("Early stopping: no improvement in validation loss.")
            break

history


In [None]:
# Plot training/validation loss and accuracy curves
import matplotlib.pyplot as plt

epochs = history["epoch"]
plt.figure(figsize=(12,4))

# Loss
plt.subplot(1,2,1)
plt.plot(epochs, history["train_loss"], label="Train Loss")
plt.plot(epochs, history["val_loss"], label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss Curves")
plt.legend()

# Accuracy
plt.subplot(1,2,2)
plt.plot(epochs, history["train_acc"], label="Train Acc")
plt.plot(epochs, history["val_acc"], label="Val Acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Accuracy Curves")
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# Final test accuracy and confusion matrix
from sklearn.metrics import accuracy_score, confusion_matrix

model_test_correct = 0
model_test_total = 0
all_true = []
all_pred = []

with torch.no_grad():
    for xb, yb in test_01_loader_bin:
        xb = xb.to(device)
        yb = yb.to(device)
        probs = torch.sigmoid(logit_torch(xb, W, b))
        preds = (probs >= 0.5).float()
        model_test_correct += (preds == yb).sum().item()
        model_test_total += yb.numel()

        all_true.append(yb.cpu())
        all_pred.append(preds.cpu())

y_true = torch.vstack(all_true).numpy().ravel()
y_pred = torch.vstack(all_pred).numpy().ravel()

acc = accuracy_score(y_true, y_pred)
cm = confusion_matrix(y_true, y_pred)

print(f"Test Accuracy: {acc:.4f}")
print("Confusion Matrix (rows: true, cols: pred):\n", cm)
acc, cm
