In [None]:
# Feedforward Assignment Notebook
# This is a single-file Python notebook script intended to be run as a Jupyter notebook
# It contains: data generation class, model class with flexible depth/width, training loop
# using PyTorch DataLoader and Adam, visualizations for train/val sets similar to Figure 1,
# experiments over depths and widths, parameter counts, and BCE loss explanation.

# NOTE: The original assignment PDF is included at: /mnt/data/DL_Assignment_1_2025.pdf
# (This path was provided in the conversation.)

# --- Cell 1: Imports and helpers ---
import os
import math
import random
import time
from typing import Tuple, List, Dict

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, TensorDataset

# for saving snapshots / movie
import imageio

In [None]:
# reproducibility
RSEED = 42
random.seed(RSEED)
np.random.seed(RSEED)
torch.manual_seed(RSEED)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', DEVICE)

In [None]:
# --- Cell 2: Noisy XOR Dataset class ---
class NoisyXORDataset(Dataset):
    """Generates samples for the noisy-xor problem.
    Samples (x1, x2) are drawn from four cluster centers (0/1,0/1) with gaussian noise.
    y is the XOR of the cluster center bits.
    """
    def __init__(self, n_samples: int = 1000, s: float = 0.1, seed: int = None):
        super().__init__()
        if seed is not None:
            np.random.seed(seed)
        # cluster centers
        centers = np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]])
        k = centers.shape[0]
        # pick centers uniformly
        indices = np.random.randint(0, k, size=n_samples)
        samples = centers[indices] + s * np.random.randn(n_samples, 2)
        labels = (centers[indices][:,0].astype(int) ^ centers[indices][:,1].astype(int)).astype(np.float32)
        self.x = torch.tensor(samples, dtype=torch.float32)
        self.y = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

# quick plot function like Figure 1
def plot_dataset(dataset: NoisyXORDataset, ax=None, title=None):
    x = dataset.x.numpy()
    y = dataset.y.numpy().squeeze()
    if ax is None:
        fig, ax = plt.subplots(figsize=(4,4))
    ax.scatter(x[y==0,0], x[y==0,1], label='Class 0', alpha=0.6)
    ax.scatter(x[y==1,0], x[y==1,1], label='Class 1', alpha=0.6)
    ax.set_xlim(-0.5, 1.5)
    ax.set_ylim(-0.5, 1.5)
    ax.set_xlabel('x1')
    ax.set_ylabel('x2')
    if title:
        ax.set_title(title)
    ax.legend()
    return ax

In [None]:
# --- Cell 3: Flexible feed-forward network class ---
class FeedForwardNet(nn.Module):
    def __init__(self, in_features:int=2, out_features:int=1, hidden_layers:List[int]=[3], activation=nn.Tanh):
        super().__init__()
        layers = []
        last = in_features
        for h in hidden_layers:
            layers.append(nn.Linear(last, h))
            layers.append(activation())
            last = h
        # final linear layer (output), no activation here: we'll use BCEWithLogitsLoss
        layers.append(nn.Linear(last, out_features))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

# helper to count parameters
def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
# --- Cell 4: Training & evaluation utilities ---

def train_one_epoch(model, loader, criterion, optimizer, device=DEVICE):
    model.train()
    running_loss = 0.0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)
    return running_loss / len(loader.dataset)


def evaluate(model, loader, criterion, device=DEVICE):
    model.eval()
    running_loss = 0.0
    correct = 0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            running_loss += loss.item() * xb.size(0)
            probs = torch.sigmoid(logits)
            preds = (probs >= 0.5).float()
            correct += (preds == yb).sum().item()
    avg_loss = running_loss / len(loader.dataset)
    accuracy = correct / len(loader.dataset)
    return avg_loss, accuracy

# decision boundary plotting
def plot_decision_boundary(model, ax=None, title=None, device=DEVICE, resolution=200):
    if ax is None:
        fig, ax = plt.subplots(figsize=(4,4))
    xx = np.linspace(-0.5, 1.5, resolution)
    yy = np.linspace(-0.5, 1.5, resolution)
    grid = np.stack(np.meshgrid(xx, yy), axis=-1).reshape(-1,2)
    with torch.no_grad():
        model.to(device)
        logits = model(torch.tensor(grid, dtype=torch.float32, device=device)).cpu().numpy().reshape(resolution, resolution)
        probs = 1 / (1 + np.exp(-logits))
    cs = ax.contourf(xx, yy, probs, levels=50, cmap=cm.RdBu, alpha=0.6)
    ax.contour(xx, yy, probs, levels=[0.5], colors='k')
    ax.set_xlim(-0.5, 1.5)
    ax.set_ylim(-0.5, 1.5)
    if title:
        ax.set_title(title)
    return ax


In [None]:

# --- Cell 5: Experiment runner for depths/widths ---

def run_experiment(seed:int=RSEED, s:float=0.1, n_samples_train:int=1000, n_samples_val:int=500,
                   batch_size:int=64, lr:float=1e-3, epochs:int=200,
                   depths:List[int]=[0,1,2,3], widths:List[int]=[1,2,3], runs_per_setting:int=5):
    results = []
    for depth in depths:
        for width in widths:
            setting_losses = []
            setting_accs = []
            param_counts = []
            for run in range(runs_per_setting):
                rs = seed + run
                # generate datasets
                train_ds = NoisyXORDataset(n_samples=n_samples_train, s=s, seed=rs)
                val_ds = NoisyXORDataset(n_samples=n_samples_val, s=s, seed=rs+1000)
                train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
                val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
                # build architecture: for depth=0 means no hidden layers -> direct linear model
                if depth == 0:
                    hidden = []
                else:
                    hidden = [width] * depth
                model = FeedForwardNet(in_features=2, out_features=1, hidden_layers=hidden).to(DEVICE)
                param_counts.append(count_parameters(model))
                # use BCEWithLogitsLoss for numerical stability
                criterion = nn.BCEWithLogitsLoss()
                optimizer = torch.optim.Adam(model.parameters(), lr=lr)
                # train for given epochs
                for epoch in range(epochs):
                    train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device=DEVICE)
                val_loss, val_acc = evaluate(model, val_loader, criterion, device=DEVICE)
                setting_losses.append(val_loss)
                setting_accs.append(val_acc)
            results.append({
                'depth': depth,
                'width': width,
                'mean_val_loss': float(np.mean(setting_losses)),
                'std_val_loss': float(np.std(setting_losses)),
                'mean_val_acc': float(np.mean(setting_accs)),
                'std_val_acc': float(np.std(setting_accs)),
                'param_counts': int(np.mean(param_counts))
            })
            print(f"Depth {depth} Width {width}: mean loss {np.mean(setting_losses):.4f} Â± {np.std(setting_losses):.4f}, mean acc {np.mean(setting_accs):.3f}")
    return results

In [None]:
# --- Cell 6: Run a small, quick experiment (this is adjustable) ---
if __name__ == '__main__':
    # WARNING: full experiments can be compute heavy. The defaults below are modest.
    results = run_experiment(seed=RSEED, s=0.1, n_samples_train=500, n_samples_val=300,
                             batch_size=64, lr=1e-3, epochs=120,
                             depths=[0,1,2,3], widths=[1,2,3], runs_per_setting=3)
    # print summary
    import pandas as pd
    df = pd.DataFrame(results)
    print('\nSummary of experiments:')
    print(df)
    # save to csv
    df.to_csv('experiment_results.csv', index=False)



In [None]:
# --- Cell 7: Train minimal XOR network and save snapshots for decision boundary movie ---
# We'll train 1 hidden layer with 2 units (tanh) and save snapshots over epochs.


def train_and_snapshot(hidden_layers=[2], s=0.1, n_samples_train=500, n_samples_val=300,
                       batch_size=64, lr=1e-3, epochs=200, snapshot_freq=10, out_dir='snapshots'):
    os.makedirs(out_dir, exist_ok=True)
    train_ds = NoisyXORDataset(n_samples=n_samples_train, s=s, seed=0)
    val_ds = NoisyXORDataset(n_samples=n_samples_val, s=s, seed=100)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    model = FeedForwardNet(in_features=2, out_features=1, hidden_layers=hidden_layers).to(DEVICE)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)


    snapshots = []
    for epoch in range(epochs+1):
        if epoch > 0:
            train_one_epoch(model, train_loader, criterion, optimizer, device=DEVICE)
        if epoch % snapshot_freq == 0:
            fig, ax = plt.subplots(figsize=(4,4))
            plot_decision_boundary(model, ax=ax, title=f'Epoch {epoch}')
            plot_dataset(train_ds, ax=ax)  # overlay training points
            fname = os.path.join(out_dir, f'snapshot_{epoch:04d}.png')
            fig.savefig(fname)
            plt.close(fig)
            snapshots.append(fname)
            print('Saved', fname)
    # create gif
    frames = [imageio.imread(fn) for fn in snapshots]
    gif_path = os.path.join(out_dir, 'decision_evolution.gif')
    imageio.mimsave(gif_path, frames, fps=2)
    print('Saved gif to', gif_path)
    return model, snapshots, gif_path



In [None]:

# If running interactively, uncomment the following to run snapshot saving (it may take time):
# model, snaps, gif = train_and_snapshot(hidden_layers=[2], epochs=200, snapshot_freq=10)


# --- Cell 8: Discussion / Answer to subquestion (a) ---
# (a) There are two versions of the binary cross entropy loss function in PyTorch.
#
# 1) torch.nn.BCELoss
#    - This expects probabilities (i.e., the output should already have a sigmoid applied).
#    - Numerically less stable when used in combination with a separate sigmoid layer, because
#      the sigmoid and log operations can cause low-precision issues for extreme logits.
#
# 2) torch.nn.BCEWithLogitsLoss
#    - This combines a sigmoid layer and the binary cross entropy loss in a single class.
#    - It takes raw logits (no sigmoid activation on the model output) and applies a
#      numerically stable formulation that avoids precision problems.
#
# Recommendation: Use BCEWithLogitsLoss when your network outputs raw logits (recommended).
# If you intentionally output probabilities (after sigmoid), use BCELoss, but beware of
# numerical stability for very large/small logits.


# --- Cell 9: Notes and reproducibility ---
# - The notebook is written to be modular: adjust the run_experiment parameters for larger sweeps.
# - The 'train_and_snapshot' function specifically implements the request to save weights/snapshots
#   for a minimal XOR network (1 hidden layer, 2 units) and produce a movie of the decision boundary.
# - The experiments compute mean/std for loss and accuracy across multiple runs per setting.
# - The code uses BCEWithLogitsLoss for stability (see discussion above).


# Save this script as a Jupyter notebook or run the cells interactively.


print('\nNotebook script loaded. To execute the experiments, run this file as a notebook and use the functions provided.')