In [None]:
import os, math, random, time, copy
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("Torch:", torch.__version__, "| CUDA:", torch.cuda.is_available())


Torch: 2.6.0+cu124 | CUDA: False


In [None]:

class AdamOptimizer:
    def __init__(self, params, lr=0.001, betas=(0.9, 0.999), eps=1e-8):
        self.params = list(params)
        self.lr = lr
        self.beta1, self.beta2 = betas
        self.eps = eps
        self.t = 0
        self.m = [torch.zeros_like(p) for p in self.params]
        self.v = [torch.zeros_like(p) for p in self.params]

    def step(self):
        self.t += 1
        for i, p in enumerate(self.params):
            if p.grad is None:
                continue
            g = p.grad
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * g
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (g * g)
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)
            p.data -= self.lr * m_hat / (torch.sqrt(v_hat) + self.eps)

    def zero_grad(self):
        for p in self.params:
            if p.grad is not None:
                p.grad.zero_()


In [None]:

#@title Load Data
USE_UPLOAD = False #@param {type:"boolean"}
DATA_PATH = "cardio_train.csv" #@param {type:"string"}

if USE_UPLOAD:
  try:
    from google.colab import files
    uploaded = files.upload()
    DATA_PATH = list(uploaded.keys())[0]
  except Exception as e:
    print("Colab upload not available in this environment, set USE_UPLOAD=False and adjust DATA_PATH.")

# Fallback for this environment (you can delete in Colab)
fallback = "/mnt/data/synthetic_heart_data.csv"
if not os.path.exists(DATA_PATH) and os.path.exists(fallback):
  DATA_PATH = fallback

df = pd.read_csv(DATA_PATH)
print("Shape:", df.shape)
df.head()


Shape: (70000, 13)


Unnamed: 0,id,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,0,18393,2,168,62.0,110,80,1,1,0,0,1,0
1,1,20228,1,156,85.0,140,90,3,1,0,0,1,1
2,2,18857,1,165,64.0,130,70,3,1,0,0,0,1
3,3,17623,2,169,82.0,150,100,1,1,0,0,1,1
4,4,17474,1,156,56.0,100,60,1,1,0,0,0,0


In [None]:
# Candidate target column names to check
candidate_targets = ["target", "label", "class", "Outcome", "cardio"]
target_col = None
for c in candidate_targets:
    if c in df.columns:
        target_col = c
        break

assert target_col is not None, f"Could not find target col in {candidate_targets}. Edit this cell and set target_col."

# Separate target and features
y = df[target_col].astype(int).values
X = df.drop(columns=[target_col])

# Detect numeric vs categorical columns
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = [c for c in X.columns if c not in numeric_cols]

# Optionally treat low-cardinality integer columns as categorical
INT_AS_CAT = True
if INT_AS_CAT:
    for c in numeric_cols.copy():
        if str(X[c].dtype).startswith("int") and X[c].nunique() <= 10:
            categorical_cols.append(c)
            numeric_cols.remove(c)

print("Target column:", target_col)
print("Numeric columns count:", len(numeric_cols))
print("Categorical columns count:", len(categorical_cols))


Target column: cardio
Numeric columns count: 6
Categorical columns count: 6


In [None]:

#@title Preprocessing utilities (no sklearn)
def mode(series):
  return series.value_counts(dropna=True).idxmax() if series.notna().any() else np.nan

def impute_and_encode(df, numeric_cols, categorical_cols):
  df = df.copy()
  # Impute
  for c in numeric_cols:
    med = df[c].median()
    df[c] = df[c].fillna(med)
  for c in categorical_cols:
    m = mode(df[c])
    df[c] = df[c].fillna(m)

  # One-hot encode categoricals
  if categorical_cols:
    df_cat = pd.get_dummies(df[categorical_cols].astype("category"), dummy_na=False)
  else:
    df_cat = pd.DataFrame(index=df.index)

  # Standardize numerics
  df_num = df[numeric_cols].astype(float).copy()
  mu = df_num.mean(axis=0)
  sigma = df_num.std(axis=0).replace(0, 1.0)
  df_num = (df_num - mu) / sigma

  # Concatenate
  X_proc = pd.concat([df_num, df_cat], axis=1)
  meta = {
      "num_mean": mu.to_dict(),
      "num_std": sigma.to_dict(),
      "cat_levels": {c: sorted(df[c].astype("category").cat.categories.astype(str).tolist()) for c in categorical_cols},
      "ohe_cols": list(df_cat.columns),
      "numeric_cols": numeric_cols,
      "categorical_cols": categorical_cols,
  }
  return X_proc.values.astype(np.float32), meta

def transform_new(df_new, meta):
  df_new = df_new.copy()
  # Impute
  for c in meta["numeric_cols"]:
    med = df_new[c].median()
    df_new[c] = df_new[c].fillna(med)
  for c in meta["categorical_cols"]:
    m = mode(df_new[c])
    df_new[c] = df_new[c].fillna(m)

  # One-hot to the same columns
  if meta["categorical_cols"]:
    df_cat = pd.get_dummies(df_new[meta["categorical_cols"]].astype("category"), dummy_na=False)
  else:
    df_cat = pd.DataFrame(index=df_new.index)
  # Align to training columns
  for col in meta["ohe_cols"]:
    if col not in df_cat.columns:
      df_cat[col] = 0
  df_cat = df_cat[meta["ohe_cols"]]

  # Standardize numeric
  df_num = df_new[meta["numeric_cols"]].astype(float).copy()
  mu = pd.Series(meta["num_mean"])
  sigma = pd.Series(meta["num_std"]).replace(0, 1.0)
  df_num = (df_num - mu) / sigma

  X_proc = pd.concat([df_num, df_cat], axis=1)
  return X_proc.values.astype(np.float32)


In [None]:

#@title Outlier detection (IQR) — no sklearn
APPLY_IQR = True #@param {type:"boolean"}

X_raw = X.copy()
mask_keep = np.ones(len(X_raw), dtype=bool)

if APPLY_IQR:
  num = X_raw.select_dtypes(include=[np.number]).columns.tolist()
  if num:
    Q1 = X_raw[num].quantile(0.25)
    Q3 = X_raw[num].quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - 1.5 * IQR
    upper = Q3 + 1.5 * IQR
    m = ~((X_raw[num] < lower) | (X_raw[num] > upper)).any(axis=1)
    mask_keep &= m.values

X_clean = X_raw[mask_keep].reset_index(drop=True)
y_clean = y[mask_keep]
print(f"Kept {X_clean.shape[0]} / {len(X)} rows after IQR filtering.")


Kept 38023 / 70000 rows after IQR filtering.


In [None]:

#@title Fit preprocessing on cleaned data (no sklearn)
X_ready, preprocess_meta = impute_and_encode(X_clean, numeric_cols, categorical_cols)
print("Final feature matrix shape:", X_ready.shape)


Final feature matrix shape: (38023, 15)


In [None]:

#@title Stratified train/val/test split (from scratch)
def stratified_split(X, y, test_size=0.15, val_size=0.15, seed=42):
  rng = np.random.default_rng(seed)
  X = np.asarray(X)
  y = np.asarray(y).astype(int)
  classes = np.unique(y)
  idx = np.arange(len(y))
  train_idx, val_idx, test_idx = [], [], []
  for c in classes:
    c_idx = idx[y == c]
    rng.shuffle(c_idx)
    n = len(c_idx)
    n_test = int(round(test_size * n))
    n_val = int(round(val_size * (n - n_test)))
    test_idx.extend(c_idx[:n_test])
    val_idx.extend(c_idx[n_test:n_test+n_val])
    train_idx.extend(c_idx[n_test+n_val:])
  rng.shuffle(train_idx); rng.shuffle(val_idx); rng.shuffle(test_idx)
  return (X[train_idx], y[train_idx],
          X[val_idx], y[val_idx],
          X[test_idx], y[test_idx])

X_train, y_train, X_val, y_val, X_test, y_test = stratified_split(X_ready, y_clean, test_size=0.15, val_size=0.15, seed=RANDOM_STATE)
print("Shapes — Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)


Shapes — Train: (27472, 15) Val: (4848, 15) Test: (5703, 15)


In [None]:

#@title Metrics from scratch
def sigmoid(z):
  return 1 / (1 + np.exp(-np.clip(z, -50, 50)))

def confusion_matrix_(y_true, y_pred):
  y_true = y_true.astype(int)
  y_pred = y_pred.astype(int)
  tp = np.sum((y_true==1)&(y_pred==1))
  tn = np.sum((y_true==0)&(y_pred==0))
  fp = np.sum((y_true==0)&(y_pred==1))
  fn = np.sum((y_true==1)&(y_pred==0))
  return np.array([[tn, fp],[fn, tp]])

def accuracy_score_(y_true, y_pred):
  return float(np.mean(y_true.astype(int)==y_pred.astype(int)))

def precision_score_(y_true, y_pred):
  cm = confusion_matrix_(y_true, y_pred)
  tp = cm[1,1]; fp = cm[0,1]
  return float(tp / (tp+fp+1e-12))

def recall_score_(y_true, y_pred):
  cm = confusion_matrix_(y_true, y_pred)
  tp = cm[1,1]; fn = cm[1,0]
  return float(tp / (tp+fn+1e-12))

def f1_score_(y_true, y_pred):
  p = precision_score_(y_true, y_pred)
  r = recall_score_(y_true, y_pred)
  return float(2*p*r/(p+r+1e-12))

def roc_auc_score_(y_true, y_score):
  # Trapezoidal ROC AUC from scratch
  y_true = y_true.astype(int)
  order = np.argsort(-y_score)
  y_true = y_true[order]
  y_score = y_score[order]
  P = np.sum(y_true==1); N = np.sum(y_true==0)
  if P==0 or N==0: return float("nan")
  tps = 0; fps = 0
  prev_score = None
  auc = 0.0; prev_fpr = 0.0; prev_tpr = 0.0
  for i in range(len(y_true)):
    if prev_score is None or y_score[i] != prev_score:
      # integrate
      auc += (fps/N - prev_fpr) * (tps/P + prev_tpr) / 2.0
      prev_fpr = fps/N
      prev_tpr = tps/P
      prev_score = y_score[i]
    if y_true[i]==1: tps += 1
    else: fps += 1
  auc += (fps/N - prev_fpr) * (tps/P + prev_tpr) / 2.0
  return float(auc)

def average_precision_(y_true, y_score):
  # PR AUC (Average Precision) from scratch
  y_true = y_true.astype(int)
  order = np.argsort(-y_score)
  y_true = y_true[order]
  P = np.sum(y_true==1)
  if P==0: return float("nan")
  tp = 0; fp = 0; prev_recall = 0.0; ap = 0.0
  for i in range(len(y_true)):
    if y_true[i]==1: tp += 1
    else: fp += 1
    precision = tp / (tp+fp)
    recall = tp / P
    ap += precision * (recall - prev_recall)
    prev_recall = recall
  return float(ap)

def evaluate_(y_true, y_prob, threshold=0.5, prefix=""):
  y_pred = (y_prob >= threshold).astype(int)
  cm = confusion_matrix_(y_true, y_pred)
  return {
      f"{prefix}Accuracy": accuracy_score_(y_true, y_pred),
      f"{prefix}Precision": precision_score_(y_true, y_pred),
      f"{prefix}Recall": recall_score_(y_true, y_pred),
      f"{prefix}F1": f1_score_(y_true, y_pred),
      f"{prefix}ROC_AUC": roc_auc_score_(y_true, y_prob),
      f"{prefix}PR_AUC(AP)": average_precision_(y_true, y_prob),
      f"{prefix}ConfusionMatrix": cm.tolist()
  }


In [None]:

#@title Random Forest (from scratch, NumPy)
class DecisionTree:
  def __init__(self, max_depth=None, min_samples_split=2, min_samples_leaf=1, mtry=None, random_state=42):
    self.max_depth = max_depth
    self.min_samples_split = int(min_samples_split)
    self.min_samples_leaf = int(min_samples_leaf)
    self.mtry = mtry
    self.random_state = random_state
    self.tree_ = None

  @staticmethod
  def gini(y, w=None):
    if w is None:
      w = np.ones_like(y, dtype=float)
    wsum = np.sum(w);
    if wsum==0: return 0.0
    p1 = np.sum(w*(y==1))/wsum
    p0 = 1.0 - p1
    return 1.0 - p1*p1 - p0*p0

  def best_split(self, X, y, w, feat_idxs):
    n, d = X.shape
    best = None
    base_imp = self.gini(y, w)
    for j in feat_idxs:
      xj = X[:, j]
      # sort unique thresholds
      order = np.argsort(xj)
      x_sorted = xj[order]
      y_sorted = y[order]
      w_sorted = w[order]
      # cumulative stats to evaluate splits between unique values
      wl = 0.0; wr = np.sum(w_sorted)
      yl = 0.0; yr = np.sum(w_sorted*(y_sorted==1))
      for i in range(0, n-1):
        wi = w_sorted[i]
        yi = 1.0 if y_sorted[i]==1 else 0.0
        wl += wi; wr -= wi
        yl += wi*yi; yr -= wi*yi
        if wl < self.min_samples_leaf or wr < self.min_samples_leaf:
          continue
        if x_sorted[i] == x_sorted[i+1]:
          continue
        p1_l = yl / wl if wl>0 else 0.0
        p1_r = yr / wr if wr>0 else 0.0
        gini_l = 1 - p1_l**2 - (1-p1_l)**2
        gini_r = 1 - p1_r**2 - (1-p1_r)**2
        imp = (wl/(wl+wr))*gini_l + (wr/(wl+wr))*gini_r
        gain = base_imp - imp
        if (best is None) or (gain > best["gain"]):
          thr = 0.5*(x_sorted[i] + x_sorted[i+1])
          best = {"feat": j, "thr": thr, "gain": gain}
    return best

  def _build(self, X, y, w, depth, rng):
    n, d = X.shape
    num_pos = np.sum(w*(y==1))
    num_neg = np.sum(w*(y==0))
    prob = (num_pos/(num_pos+num_neg+1e-12))
    node = {"prob": float(prob), "depth": depth}
    # stopping criteria
    if (self.max_depth is not None and depth >= self.max_depth) or n < self.min_samples_split or prob in (0.0,1.0):
      return node
    # feature subsampling
    mtry = self.mtry if self.mtry is not None else int(np.sqrt(d))
    feat_idxs = rng.choice(d, size=max(1, mtry), replace=False)
    split = self.best_split(X, y, w, feat_idxs)
    if split is None or split["gain"] <= 1e-12:
      return node
    j, thr = split["feat"], split["thr"]
    left_mask = X[:, j] <= thr
    right_mask = ~left_mask
    if left_mask.sum() < self.min_samples_leaf or right_mask.sum() < self.min_samples_leaf:
      return node
    node.update({"feat": int(j), "thr": float(thr)})
    node["left"] = self._build(X[left_mask], y[left_mask], w[left_mask], depth+1, rng)
    node["right"] = self._build(X[right_mask], y[right_mask], w[right_mask], depth+1, rng)
    return node

  def fit(self, X, y, sample_weight=None):
    rng = np.random.default_rng(self.random_state)
    if sample_weight is None:
      sample_weight = np.ones(len(y), dtype=float)
    self.tree_ = self._build(np.asarray(X), np.asarray(y).astype(int), np.asarray(sample_weight, float), 0, rng)
    return self

  def _predict_row(self, row, node):
    while True:
      if "feat" not in node:
        return node["prob"]
      if row[node["feat"]] <= node["thr"]:
        node = node["left"]
      else:
        node = node["right"]

  def predict_proba(self, X):
    X = np.asarray(X)
    return np.array([self._predict_row(x, self.tree_) for x in X])

class RandomForestScratch:
  def __init__(self, n_estimators=200, max_depth=None, min_samples_split=2, min_samples_leaf=1, mtry=None, random_state=42, bootstrap=True, class_weight="balanced"):
    self.n_estimators = n_estimators
    self.max_depth = max_depth
    self.min_samples_split = min_samples_split
    self.min_samples_leaf = min_samples_leaf
    self.mtry = mtry
    self.random_state = random_state
    self.bootstrap = bootstrap
    self.class_weight = class_weight
    self.trees = []

  def fit(self, X, y):
    rng = np.random.default_rng(self.random_state)
    n = len(y)
    # class weights
    if self.class_weight=="balanced":
      n_pos = np.sum(y==1); n_neg = np.sum(y==0)
      w_pos = n/(2*max(n_pos,1)); w_neg = n/(2*max(n_neg,1))
      base_w = np.where(y==1, w_pos, w_neg).astype(float)
    else:
      base_w = np.ones(n, dtype=float)

    self.trees = []
    for t in range(self.n_estimators):
      if self.bootstrap:
        idx = rng.choice(n, size=n, replace=True)
      else:
        idx = np.arange(n)
      tree = DecisionTree(max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, mtry=self.mtry, random_state=int(rng.integers(1, 1e9)))
      tree.fit(X[idx], y[idx], sample_weight=base_w[idx])
      self.trees.append(tree)
    return self

  def predict_proba(self, X):
    probs = np.mean([tr.predict_proba(X) for tr in self.trees], axis=0)
    return np.clip(probs, 1e-6, 1-1e-6)

# Train RF from scratch
rf = RandomForestScratch(n_estimators=200, max_depth=None, min_samples_split=4, min_samples_leaf=2, mtry=None, random_state=RANDOM_STATE, bootstrap=True, class_weight="balanced")
rf.fit(X_train, y_train)
rf_val_prob = rf.predict_proba(X_val)
rf_test_prob = rf.predict_proba(X_test)
print("RF (scratch) — Val Metrics")
print(evaluate_(y_val, rf_val_prob, prefix="RF/Val/"))
print("\nRF (scratch) — Test Metrics")
print(evaluate_(y_test, rf_test_prob, prefix="RF/Test/"))


RF (scratch) — Val Metrics
{'RF/Val/Accuracy': 0.7421617161716172, 'RF/Val/Precision': 0.7568647029455813, 'RF/Val/Recall': 0.6652040368582709, 'RF/Val/F1': 0.7080803362909543, 'RF/Val/ROC_AUC': 0.8072970139976972, 'RF/Val/PR_AUC(AP)': 0.7770338680064336, 'RF/Val/ConfusionMatrix': [[2082, 487], [763, 1516]]}

RF (scratch) — Test Metrics
{'RF/Test/Accuracy': 0.7422409258285113, 'RF/Test/Precision': 0.7675651789659741, 'RF/Test/Recall': 0.6478925773964936, 'RF/Test/F1': 0.7026699029121246, 'RF/Test/ROC_AUC': 0.800472526352192, 'RF/Test/PR_AUC(AP)': 0.7770458029493591, 'RF/Test/ConfusionMatrix': [[2496, 526], [944, 1737]]}


In [None]:

#@title DNN (from-scratch training loop in PyTorch)
class DNN(nn.Module):
  def __init__(self, in_dim):
    super().__init__()
    self.fc1 = nn.Linear(in_dim, 256)
    self.bn1 = nn.BatchNorm1d(256)
    self.fc2 = nn.Linear(256, 128)
    self.bn2 = nn.BatchNorm1d(128)
    self.fc3 = nn.Linear(128, 1)
    self.drop = nn.Dropout(0.2)

  def forward(self, x):
    x = self.drop(F.relu(self.bn1(self.fc1(x))))
    x = self.drop(F.relu(self.bn2(self.fc2(x))))
    return self.fc3(x).squeeze(1)

def train_dnn(X_train, y_train, X_val, y_val, epochs=50, batch_size=512, lr=1e-3):
  Xtr = torch.tensor(X_train, dtype=torch.float32).to(device)
  ytr = torch.tensor(y_train, dtype=torch.float32).to(device)
  Xv = torch.tensor(X_val, dtype=torch.float32).to(device)
  yv = torch.tensor(y_val, dtype=torch.float32).to(device)

  model = DNN(X_train.shape[1]).to(device)
  pos = float(np.sum(y_train==1)); neg = float(np.sum(y_train==0))
  pos_weight = torch.tensor([neg/max(pos,1.0)], dtype=torch.float32).to(device)
  criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
  optim = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
  sched = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, mode="max", factor=0.5, patience=3)

  best_auc = -1; best = copy.deepcopy(model.state_dict())
  patience = 8; no_improve = 0

  for epoch in range(epochs):
    model.train()
    # mini-batches
    idx = torch.randperm(Xtr.size(0))
    for i in range(0, Xtr.size(0), batch_size):
      b = idx[i:i+batch_size]
      xb, yb = Xtr[b], ytr[b]
      optim.zero_grad()
      logits = model(xb)
      loss = criterion(logits, yb)
      loss.backward()
      torch.nn.utils.clip_grad_norm_(model.parameters(), 3.0)
      optim.step()
    # validate
    model.eval()
    with torch.no_grad():
      val_prob = torch.sigmoid(model(Xv)).cpu().numpy()
      auc = roc_auc_score_(y_val, val_prob)
    sched.step(auc)
    if auc > best_auc + 1e-4:
      best_auc = auc; best = copy.deepcopy(model.state_dict()); no_improve = 0
    else:
      no_improve += 1
      if no_improve >= patience: break

  model.load_state_dict(best)
  with torch.no_grad():
    val_prob = torch.sigmoid(model(Xv)).cpu().numpy()
  return model, val_prob

dnn_model, dnn_val_prob = train_dnn(X_train, y_train, X_val, y_val, epochs=40, batch_size=512, lr=1e-3)
with torch.no_grad():
  dnn_test_prob = torch.sigmoid(dnn_model(torch.tensor(X_test, dtype=torch.float32).to(device))).cpu().numpy()
print("DNN — Val Metrics")
print(evaluate_(y_val, dnn_val_prob, prefix="DNN/Val/"))
print("\nDNN — Test Metrics")
print(evaluate_(y_test, dnn_test_prob, prefix="DNN/Test/"))


DNN — Val Metrics
{'DNN/Val/Accuracy': 0.7431930693069307, 'DNN/Val/Precision': 0.7492767598842812, 'DNN/Val/Recall': 0.6818780166739795, 'DNN/Val/F1': 0.7139903514812375, 'DNN/Val/ROC_AUC': 0.8097803817788377, 'DNN/Val/PR_AUC(AP)': 0.7795127058794199, 'DNN/Val/ConfusionMatrix': [[2049, 520], [725, 1554]]}

DNN — Test Metrics
{'DNN/Test/Accuracy': 0.7431176573733123, 'DNN/Test/Precision': 0.7563237774030351, 'DNN/Test/Recall': 0.6691533010070867, 'DNN/Test/F1': 0.7100732238269308, 'DNN/Test/ROC_AUC': 0.8006702557472942, 'DNN/Test/PR_AUC(AP)': 0.7782351008033003, 'DNN/Test/ConfusionMatrix': [[2444, 578], [887, 1794]]}


In [None]:

#@title DBN — Stacked RBMs (from scratch in PyTorch) + fine-tune
class RBM(nn.Module):
  def __init__(self, n_visible, n_hidden, k=1, lr=1e-3, momentum=0.5, weight_decay=1e-4):
    super().__init__()
    self.W = nn.Parameter(torch.randn(n_visible, n_hidden) * 0.01)
    self.vb = nn.Parameter(torch.zeros(n_visible))
    self.hb = nn.Parameter(torch.zeros(n_hidden))
    self.k = k; self.lr = lr; self.momentum = momentum; self.weight_decay = weight_decay
    # velocity for momentum
    self.W_m = torch.zeros_like(self.W)
    self.vb_m = torch.zeros_like(self.vb)
    self.hb_m = torch.zeros_like(self.hb)

  def sample_h(self, v):
    p = torch.sigmoid(v @ self.W + self.hb)
    return p, torch.bernoulli(p)

  def sample_v(self, h):
    p = torch.sigmoid(h @ self.W.t() + self.vb)
    return p, torch.bernoulli(p)

  def contrastive_divergence(self, v0):
    ph0, h0 = self.sample_h(v0)
    vk = v0; hk = h0
    for _ in range(self.k):
      pvk, vk = self.sample_v(hk)
      phk, hk = self.sample_h(vk)
    positive_grad = v0.t() @ ph0
    negative_grad = vk.t() @ phk
    dW = (positive_grad - negative_grad) / v0.size(0) - self.weight_decay * self.W
    dvb = torch.mean(v0 - vk, dim=0)
    dhb = torch.mean(ph0 - phk, dim=0)

    self.W_m = self.momentum * self.W_m + self.lr * dW
    self.vb_m = self.momentum * self.vb_m + self.lr * dvb
    self.hb_m = self.momentum * self.hb_m + self.lr * dhb

    self.W.data += self.W_m
    self.vb.data += self.vb_m
    self.hb.data += self.hb_m

  def forward_hidden(self, v):
    return torch.sigmoid(v @ self.W + self.hb)

def pretrain_dbn(X, layer_sizes=[256, 128], epochs=10, batch_size=256, k=1):
  data = torch.tensor(X, dtype=torch.float32)
  rbms = []
  vis = X.shape[1]
  for h in layer_sizes:
    rbm = RBM(vis, h, k=k, lr=1e-3, momentum=0.5, weight_decay=1e-4)
    rbm.train()
    for epoch in range(epochs):
      perm = torch.randperm(data.size(0))
      for i in range(0, data.size(0), batch_size):
        batch_idx = perm[i:i+batch_size]
        v0 = data[batch_idx]
        rbm.contrastive_divergence(v0)
    with torch.no_grad():
      data = rbm.forward_hidden(data)
    rbms.append(rbm); vis = h
  return rbms

class DBNClassifier(nn.Module):
  def __init__(self, input_dim, layer_sizes=[256,128]):
    super().__init__()
    layers = []; last = input_dim
    for h in layer_sizes:
      layers += [nn.Linear(last, h), nn.ReLU()]
      last = h
    self.feat = nn.Sequential(*layers)
    self.head = nn.Linear(last, 1)
  def forward(self, x):
    return self.head(self.feat(x)).squeeze(1)

def init_from_rbms(model, rbms):
  lin_layers = [m for m in model.feat if isinstance(m, nn.Linear)]
  for lin, rbm in zip(lin_layers, rbms):
    with torch.no_grad():
      lin.weight.copy_(rbm.W.t())
      lin.bias.copy_(rbm.hb)

# Pretrain + fine-tune
rbm_layers = [256, 128]
rbms = pretrain_dbn(X_train, layer_sizes=rbm_layers, epochs=8, batch_size=256, k=1)
dbn = DBNClassifier(X_train.shape[1], layer_sizes=rbm_layers).to(device)
init_from_rbms(dbn, rbms)

pos = float(np.sum(y_train==1)); neg = float(np.sum(y_train==0))
pos_weight = torch.tensor([neg/max(pos,1.0)], dtype=torch.float32).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optim = torch.optim.AdamW(dbn.parameters(), lr=1e-3, weight_decay=5e-4)
sched = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, mode="max", factor=0.5, patience=3)

best_auc = -1; best = copy.deepcopy(dbn.state_dict())
patience = 8; no_improve = 0

Xtr_t = torch.tensor(X_train, dtype=torch.float32).to(device)
ytr_t = torch.tensor(y_train, dtype=torch.float32).to(device)
Xv_t = torch.tensor(X_val, dtype=torch.float32).to(device)

for epoch in range(40):
  dbn.train()
  idx = torch.randperm(Xtr_t.size(0))
  for i in range(0, Xtr_t.size(0), 512):
    b = idx[i:i+512]
    xb = Xtr_t[b]; yb = ytr_t[b]
    optim.zero_grad()
    logits = dbn(xb)
    loss = criterion(logits, yb)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(dbn.parameters(), 3.0)
    optim.step()
  dbn.eval()
  with torch.no_grad():
    val_prob = torch.sigmoid(dbn(Xv_t)).cpu().numpy()
    auc = roc_auc_score_(y_val, val_prob)
  sched.step(auc)
  if auc > best_auc + 1e-4:
    best_auc = auc; best = copy.deepcopy(dbn.state_dict()); no_improve = 0
  else:
    no_improve += 1
    if no_improve >= patience: break

dbn.load_state_dict(best)
with torch.no_grad():
  dbn_val_prob = torch.sigmoid(dbn(Xv_t)).cpu().numpy()
  dbn_test_prob = torch.sigmoid(dbn(torch.tensor(X_test, dtype=torch.float32).to(device))).cpu().numpy()

print("DBN — Val Metrics")
print(evaluate_(y_val, dbn_val_prob, prefix="DBN/Val/"))
print("\nDBN — Test Metrics")
print(evaluate_(y_test, dbn_test_prob, prefix="DBN/Test/"))


DBN — Val Metrics
{'DBN/Val/Accuracy': 0.7436056105610561, 'DBN/Val/Precision': 0.7648261758691203, 'DBN/Val/Recall': 0.656428258007898, 'DBN/Val/F1': 0.706493506493009, 'DBN/Val/ROC_AUC': 0.8094971075627263, 'DBN/Val/PR_AUC(AP)': 0.7809143786514124, 'DBN/Val/ConfusionMatrix': [[2109, 460], [783, 1496]]}

DBN — Test Metrics
{'DBN/Test/Accuracy': 0.7385586533403472, 'DBN/Test/Precision': 0.7665770609318994, 'DBN/Test/Recall': 0.6381947034688547, 'DBN/Test/F1': 0.6965194382246209, 'DBN/Test/ROC_AUC': 0.7990852114951598, 'DBN/Test/PR_AUC(AP)': 0.7768939730556379, 'DBN/Test/ConfusionMatrix': [[2501, 521], [970, 1711]]}


In [None]:

#@title TabNet-like (from scratch in PyTorch; simplified)
class GhostBN(nn.Module):
  def __init__(self, dim, eps=1e-5, momentum=0.1):
    super().__init__()
    self.bn = nn.BatchNorm1d(dim, eps=eps, momentum=momentum)
  def forward(self, x):
    return self.bn(x)

class GLU(nn.Module):
  def __init__(self, dim):
    super().__init__()
    self.fc = nn.Linear(dim, dim*2)
    self.bn = GhostBN(dim*2)
  def forward(self, x):
    x = self.bn(self.fc(x))
    a, b = x.chunk(2, dim=1)
    return a * torch.sigmoid(b)

class FeatureTransformer(nn.Module):
  def __init__(self, dim, n_glu=2):
    super().__init__()
    self.layers = nn.ModuleList([GLU(dim) for _ in range(n_glu)])
    self.rescale = nn.Parameter(torch.ones(1, dim))
  def forward(self, x):
    for glu in self.layers:
      x = (x + glu(x)) * math.sqrt(0.5)
    return x * self.rescale

class AttentiveTransformer(nn.Module):
  def __init__(self, in_dim, out_dim):
    super().__init__()
    self.fc = nn.Linear(in_dim, out_dim)
    self.bn = GhostBN(out_dim)
  def forward(self, x, prior):
    x = self.bn(self.fc(x))
    x = x * prior
    return F.softmax(x, dim=1)

class TabNetLike(nn.Module):
  def __init__(self, input_dim, decision_dim=64, n_steps=3, relaxation=1.5):
    super().__init__()
    self.input_dim = input_dim
    self.decision_dim = decision_dim
    self.n_steps = n_steps
    self.relax = relaxation
    self.shared = FeatureTransformer(input_dim, n_glu=2)
    self.step_ft = nn.ModuleList([FeatureTransformer(decision_dim, n_glu=2) for _ in range(n_steps)])
    self.att = nn.ModuleList([AttentiveTransformer(decision_dim, input_dim) for _ in range(n_steps)])
    self.fc_in = nn.Linear(input_dim, decision_dim)
    self.bn_in = GhostBN(decision_dim)
    self.head = nn.Linear(decision_dim, 1)

  def forward(self, x):
    prior = torch.ones(x.size(0), self.input_dim, device=x.device)
    features = self.shared(x)
    h = F.relu(self.bn_in(self.fc_in(features)))
    out_agg = 0
    for s in range(self.n_steps):
      mask = self.att[s](h, prior)
      x_masked = x * mask
      h = self.step_ft[s](F.relu(self.bn_in(self.fc_in(x_masked))))
      out_agg = out_agg + h
      prior = prior * (self.relax - mask)
    logits = self.head(out_agg)
    return logits.squeeze(1)

def train_tabnet(X_train, y_train, X_val, y_val, epochs=50, batch_size=512, lr=1e-3):
  model = TabNetLike(input_dim=X_train.shape[1], decision_dim=64, n_steps=3, relaxation=1.5).to(device)
  pos = float(np.sum(y_train==1)); neg = float(np.sum(y_train==0))
  pos_weight = torch.tensor([neg/max(pos,1.0)], dtype=torch.float32).to(device)
  criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
  optim = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
  sched = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, mode="max", factor=0.5, patience=3)

  Xtr = torch.tensor(X_train, dtype=torch.float32).to(device)
  ytr = torch.tensor(y_train, dtype=torch.float32).to(device)
  Xv = torch.tensor(X_val, dtype=torch.float32).to(device)

  best_auc = -1; best = copy.deepcopy(model.state_dict())
  patience = 8; no_improve = 0

  for epoch in range(epochs):
    model.train()
    idx = torch.randperm(Xtr.size(0))
    for i in range(0, Xtr.size(0), batch_size):
      b = idx[i:i+batch_size]
      xb = Xtr[b]; yb = ytr[b]
      optim.zero_grad()
      logits = model(xb)
      loss = criterion(logits, yb)
      loss.backward()
      torch.nn.utils.clip_grad_norm_(model.parameters(), 3.0)
      optim.step()
    model.eval()
    with torch.no_grad():
      val_prob = torch.sigmoid(model(Xv)).cpu().numpy()
      auc = roc_auc_score_(y_val, val_prob)
    sched.step(auc)
    if auc > best_auc + 1e-4:
      best_auc = auc; best = copy.deepcopy(model.state_dict()); no_improve = 0
    else:
      no_improve += 1
      if no_improve >= patience: break

  model.load_state_dict(best)
  with torch.no_grad():
    val_prob = torch.sigmoid(model(Xv)).cpu().numpy()
  return model, val_prob

tab_model, tab_val_prob = train_tabnet(X_train, y_train, X_val, y_val, epochs=50, batch_size=512, lr=1e-3)
with torch.no_grad():
  tab_test_prob = torch.sigmoid(tab_model(torch.tensor(X_test, dtype=torch.float32).to(device))).cpu().numpy()
print("TabNet-like — Val Metrics")
print(evaluate_(y_val, tab_val_prob, prefix="TabNet/Val/"))
print("\nTabNet-like — Test Metrics")
print(evaluate_(y_test, tab_test_prob, prefix="TabNet/Test/"))


TabNet-like — Val Metrics
{'TabNet/Val/Accuracy': 0.7341171617161716, 'TabNet/Val/Precision': 0.7915194346289749, 'TabNet/Val/Recall': 0.5897323387450634, 'TabNet/Val/F1': 0.6758863464918412, 'TabNet/Val/ROC_AUC': 0.7990973484611049, 'TabNet/Val/PR_AUC(AP)': 0.7631814274704951, 'TabNet/Val/ConfusionMatrix': [[2215, 354], [935, 1344]]}

TabNet-like — Test Metrics
{'TabNet/Test/Accuracy': 0.7271611432579345, 'TabNet/Test/Precision': 0.7937336814621406, 'TabNet/Test/Recall': 0.5669526296158148, 'TabNet/Test/F1': 0.6614447345512977, 'TabNet/Test/ROC_AUC': 0.7881598601428614, 'TabNet/Test/PR_AUC(AP)': 0.7669052891624447, 'TabNet/Test/ConfusionMatrix': [[2627, 395], [1161, 1520]]}


In [None]:

#@title Final comparison table (all from-scratch models)
def summarize(name, y_true, y_prob):
  m = evaluate_(y_true, y_prob)
  line = {
      "Model": name,
      "Accuracy": m["Accuracy"],
      "Precision": m["Precision"],
      "Recall": m["Recall"],
      "F1": m["F1"],
      "ROC_AUC": m["ROC_AUC"],
      "PR_AUC": m["PR_AUC(AP)"],
  }
  return line

summary = []
summary.append(summarize("RandomForest", y_test, rf_test_prob))
summary.append(summarize("DNN", y_test, dnn_test_prob))
summary.append(summarize("DBN", y_test, dbn_test_prob))
summary.append(summarize("TabNet", y_test, tab_test_prob))

pd.DataFrame(summary)


Unnamed: 0,Model,Accuracy,Precision,Recall,F1,ROC_AUC,PR_AUC
0,RandomForest,0.742241,0.767565,0.647893,0.70267,0.800473,0.777046
1,DNN,0.743118,0.756324,0.669153,0.710073,0.80067,0.778235
2,DBN,0.738559,0.766577,0.638195,0.696519,0.799085,0.776894
3,TabNet,0.727161,0.793734,0.566953,0.661445,0.78816,0.766905


In [None]:

from sklearn.metrics import roc_curve, auc, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

def plot_auc_roc(y_true, y_prob):
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(6,6))
    plt.plot(fpr, tpr, color="blue", lw=2, label="ROC curve (AUC = %0.2f)" % roc_auc)
    plt.plot([0, 1], [0, 1], color="white", lw=2, linestyle="--")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver Operating Characteristic (ROC)")
    plt.legend(loc="lower right")
    plt.show()

def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6,5))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()
