In [6]:
import os
import random
import argparse
from typing import Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader

# ---------------------------
# Reproducibility & device
# ---------------------------
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------------
# Model
# ---------------------------
class MLP(nn.Module):
    def __init__(self, in_dim: int, hidden=[64, 32], dropout=0.3):
        super().__init__()
        layers = []
        prev = in_dim
        for h in hidden:
            layers.append(nn.Linear(prev, h))
            layers.append(nn.ReLU(inplace=True))
            layers.append(nn.Dropout(dropout))
            prev = h
        layers.append(nn.Linear(prev, 1))  # single logit for BCEWithLogits
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x).squeeze(-1)  # [batch]

# ---------------------------
# Data helpers
# ---------------------------
def load_data(csv_path: str, target_col: str = "Outcome", test_size=0.2, val_size=0.1, random_state=42) -> Tuple:
    df = pd.read_csv(csv_path)
    assert target_col in df.columns, f"{target_col} not in CSV"
    X = df.drop(columns=[target_col]).values.astype(np.float32)
    y = df[target_col].values.astype(np.int64)

    X_train, X_tmp, y_train, y_tmp = train_test_split(X, y, test_size=test_size+val_size,
                                                      random_state=random_state, stratify=y)
    rel_val = val_size / (test_size + val_size)
    X_val, X_test, y_val, y_test = train_test_split(X_tmp, y_tmp, test_size=rel_val,
                                                    random_state=random_state, stratify=y_tmp)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    X_test = scaler.transform(X_test)

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), scaler

def make_loader(X, y, batch_size=32, shuffle=True):
    tX = torch.from_numpy(X)
    ty = torch.from_numpy(y).float()  # BCEWithLogits expects float targets
    ds = TensorDataset(tX, ty)
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

# ---------------------------
# Train / Eval
# ---------------------------
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0.0
    for Xb, yb in loader:
        Xb, yb = Xb.to(device), yb.to(device)
        logits = model(Xb)  # [batch]
        loss = criterion(logits, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * Xb.size(0)
    return total_loss / len(loader.dataset)

def evaluate(model, loader):
    model.eval()
    ys, preds, probs = [], [], []
    with torch.no_grad():
        for Xb, yb in loader:
            Xb = Xb.to(device)
            logits = model(Xb)
            p = torch.sigmoid(logits).cpu().numpy()
            pred = (p >= 0.5).astype(int)
            probs.extend(p.tolist())
            preds.extend(pred.tolist())
            ys.extend(yb.numpy().astype(int).tolist())
    acc = accuracy_score(ys, preds)
    try:
        auc = roc_auc_score(ys, probs)
    except Exception:
        auc = float("nan")
    return acc, auc

# ---------------------------
# Main
# ---------------------------
def main(args):
    set_seed(args.seed)
    (X_train, y_train), (X_val, y_val), (X_test, y_test), scaler = load_data(args.csv,
                target_col=args.target, test_size=args.test_size, val_size=args.val_size, random_state=args.seed)
    train_loader = make_loader(X_train, y_train, batch_size=args.batch_size, shuffle=True)
    val_loader = make_loader(X_val, y_val, batch_size=args.batch_size, shuffle=False)
    test_loader = make_loader(X_test, y_test, batch_size=args.batch_size, shuffle=False)

    model = MLP(in_dim=X_train.shape[1], hidden=[args.h1, args.h2], dropout=args.dropout).to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)

    best_val_auc = 0.0
    os.makedirs(args.save_dir, exist_ok=True)
    for epoch in range(1, args.epochs + 1):
        train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        val_acc, val_auc = evaluate(model, val_loader)
        print(f"[Epoch {epoch}] train_loss={train_loss:.4f} val_acc={val_acc:.4f} val_auc={val_auc:.4f}")

        if val_auc > best_val_auc:
            best_val_auc = val_auc
            ckpt = os.path.join(args.save_dir, "diabetes_mlp_best.pt")
            torch.save({"model_state": model.state_dict(), "scaler": scaler}, ckpt)
            print(f"  Saved best checkpoint to {ckpt}")

    test_acc, test_auc = evaluate(model, test_loader)
    print(f"Test acc={test_acc:.4f} auc={test_auc:.4f}")

In [7]:
if __name__ == "__main__":
    class Args:
        def __init__(self):
            self.csv = "diabetes.csv"
            self.target = "Outcome"
            self.epochs = 30
            self.batch_size = 32
            self.lr = 1e-3
            self.h1 = 64
            self.h2 = 32
            self.dropout = 0.3
            self.save_dir = "checkpoints"
            self.seed = 42
            self.test_size = 0.2
            self.val_size = 0.1

    args = Args()
    main(args)

FileNotFoundError: [Errno 2] No such file or directory: 'diabetes.csv'