In [24]:
import torch 
import torch.nn as nn
import torch.nn.functional as F 
from torch.utils.data import DataLoader, random_split, TensorDataset
from torchvision import datasets, transforms
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


In [None]:
def set_seed(seed :int = 42):
    torch.manual_seed(seed)

    # If CUDA is present (not your case on Apple M4), this also seeds GPU randomness
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

    # MPS determinism is not guaranteed across all operations,
    # but seeding still helps make runs more stable and comparable

In [3]:
def get_device():
    if torch.backends.mps.is_available() and torch.backends.mps.is_built():
        return torch.device("mps")
    
    if torch.cuda.is_available():
        return torch.device("cuda")
    
    return torch.device("cpu")


In [16]:

class MLP(nn.Module):
    def __init__(
        self,
        in_dim: int = 28*28,
        hidden_dims=(512, 256),
        out_dim: int = 10,
        activation: str = "relu",
        use_batchnorm: bool = True,
        dropout_p: float = 0.3,
    ):
        super().__init__()

        def make_activation(name: str) -> nn.Module:
            name = name.lower()
            if name == "relu":
                return nn.ReLU()
            if name == "tanh":
                return nn.Tanh()
            if name == "sigmoid":
                return nn.Sigmoid()
            raise ValueError(f"Unknown activation: {name}")

        layers = [nn.Flatten()]  # <-- THIS fixes the 28x28 vs 784 mismatch

        prev = in_dim
        for h in hidden_dims:
            layers.append(nn.Linear(prev, h))
            if use_batchnorm:
                layers.append(nn.BatchNorm1d(h))
            layers.append(make_activation(activation))
            if dropout_p > 0.0:
                layers.append(nn.Dropout(dropout_p))
            prev = h

        layers.append(nn.Linear(prev, out_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


In [5]:
@torch.no_grad()
def evaluate_classification(model,loader, device):

    model.eval()
    
    loss_fn = nn.CrossEntropyLoss()

    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    for x, y in loader:
        x = x.to(device)
        y = y.to(device)

        # Fashion-MNIST images are [batch, 1, 28, 28]
        # MLP needs vectors [batch, 784]
        x = x.view(x.size(), -1)
        logits = model(x)  # logits shape: [batch, 10]
        loss = loss_fn(logits, y)

        total_loss += loss.item() * x.size(0)
        total_correct += (logits.argmax(dim=1) == y).sum().item()
        total_samples += x.size(0)

    return total_loss / total_samples, total_correct / total_samples
    

In [6]:
def train_classification(
    model,
    train_loader,
    val_loader,
    device,
    optimizer_name="adam",
    lr=1e-3,
    weight_decay=1e-4,   # L2 regularization via optimizer
    l1_lambda=0.0,       # optional L1 (manual addition to loss)
    epochs=10,
    grad_clip=None,      # optional: e.g., 1.0
):
    model.to(device)
    loss_fn = nn.CrossEntropyLoss()

    # Choose optimizer family (one of your required topics)
    optimizer_cls = {
        "sgd": torch.optim.SGD,
        "rmsprop": torch.optim.RMSprop,
        "adam": torch.optim.Adam,
    }[optimizer_name]

    # weight_decay in PyTorch optimizers implements L2 regularization (weight decay).
    # For SGD we usually add momentum to improve convergence.
    if optimizer_name == "sgd":
        optimizer = optimizer_cls(
            model.parameters(),
            lr=lr,
            momentum=0.9,
            weight_decay=weight_decay
        )
    else:
        optimizer = optimizer_cls(
            model.parameters(),
            lr=lr,
            weight_decay=weight_decay
        )

    for epoch in range(1, epochs + 1):
        # model.train() enables dropout and makes BatchNorm use batch statistics
        model.train()

        running_loss = 0.0
        running_correct = 0
        total_samples = 0

        for x, y in train_loader:
            x = x.to(device)
            y = y.to(device)

            # Flatten images for MLP
            x = x.view(x.size(0), -1)

            # Zero old gradients. set_to_none=True can be a bit faster and uses less memory.
            optimizer.zero_grad(set_to_none=True)

            # Forward pass
            logits = model(x)

            # Cross-entropy loss (classification)
            loss = loss_fn(logits, y)

            # Optional L1 regularization:
            # Adds lambda * sum(|W|) to the loss.
            # This encourages sparsity (many weights close to zero).
            if l1_lambda > 0.0:
                l1 = 0.0
                for p in model.parameters():
                    # Usually you apply L1 to weight matrices (dim >= 2), not biases
                    if p.dim() >= 2:
                        l1 = l1 + p.abs().sum()
                loss = loss + l1_lambda * l1

            # Backpropagation:
            # Autograd computes gradients of loss w.r.t. every parameter.
            loss.backward()

            # Optional gradient clipping:
            # If training becomes unstable (exploding gradients), clip norms.
            if grad_clip is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

            # Update parameters according to chosen optimizer (SGD/RMSprop/Adam)
            optimizer.step()

            # Track training stats
            running_loss += loss.item() * x.size(0)
            running_correct += (logits.argmax(dim=1) == y).sum().item()
            total_samples += x.size(0)

        train_loss = running_loss / total_samples
        train_acc = running_correct / total_samples

        val_loss, val_acc = evaluate_classification(model, val_loader, device)

        print(
            f"epoch {epoch:02d} | "
            f"train_loss={train_loss:.4f} train_acc={train_acc:.4f} | "
            f"val_loss={val_loss:.4f} val_acc={val_acc:.4f}"
        )


In [7]:
def get_fashion_mnist_loaders(batch_size=128, val_ratio=0.1, seed=42):
    set_seed(seed)

    # Transform pipeline:
    # ToTensor: converts PIL image (0..255) -> float tensor (0..1) shaped [1, 28, 28]
    # Normalize: centers/scales values to help optimization
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,)),
    ])

    dataset = datasets.FashionMNIST(
        root="./data",
        train=True,
        download=True,
        transform=transform
    )

    n_val = int(len(dataset) * val_ratio)
    n_train = len(dataset) - n_val

    # random_split uses a generator so we can make the split reproducible
    train_ds, val_ds = random_split(
        dataset,
        [n_train, n_val],
        generator=torch.Generator().manual_seed(seed)
    )

    # On macOS, DataLoader multiprocessing (num_workers>0) can sometimes be slower or problematic
    # depending on your environment. num_workers=0 is the most compatible.
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader

In [17]:
def main():
    device = get_device()
    print("Device:", device)

    train_loader, val_loader = get_fashion_mnist_loaders(batch_size=128, val_ratio=0.1, seed=42)

    # MLP configuration:
    # Change these knobs one at a time to learn each topic.
    model = MLP(
        in_dim=28 * 28,
        hidden_dims=(512, 256),
        out_dim=10,
        activation="relu",      # try: "tanh", "sigmoid"
        use_batchnorm=True,     # try: False
        dropout_p=0.3,          # try: 0.0, 0.1, 0.5
    )

    train_classification(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        device=device,
        optimizer_name="adam",  # try: "sgd", "rmsprop", "adam"
        lr=1e-3,                # tune per optimizer
        weight_decay=1e-4,      # L2 regularization
        l1_lambda=0.0,          # optional L1, e.g. 1e-7 or 1e-6 (start tiny)
        epochs=10,
        grad_clip=None,
    )


if __name__ == "__main__":
    main()

Device: mps
epoch 01 | train_loss=0.4936 train_acc=0.8242 | val_loss=0.4007 val_acc=0.8548
epoch 02 | train_loss=0.3788 train_acc=0.8604 | val_loss=0.3587 val_acc=0.8717
epoch 03 | train_loss=0.3437 train_acc=0.8737 | val_loss=0.3432 val_acc=0.8737
epoch 04 | train_loss=0.3231 train_acc=0.8795 | val_loss=0.3361 val_acc=0.8765
epoch 05 | train_loss=0.3106 train_acc=0.8847 | val_loss=0.3219 val_acc=0.8818
epoch 06 | train_loss=0.2993 train_acc=0.8879 | val_loss=0.3150 val_acc=0.8847
epoch 07 | train_loss=0.2874 train_acc=0.8926 | val_loss=0.3148 val_acc=0.8838
epoch 08 | train_loss=0.2806 train_acc=0.8956 | val_loss=0.3087 val_acc=0.8898
epoch 09 | train_loss=0.2727 train_acc=0.8993 | val_loss=0.3266 val_acc=0.8838
epoch 10 | train_loss=0.2671 train_acc=0.8995 | val_loss=0.2976 val_acc=0.8912


In [19]:

class MLPRegressor(nn.Module):
    def __init__(
        self,
        in_dim: int,
        hidden_dims=(128, 64),
        activation: str = "relu",
        use_batchnorm: bool = True,
        dropout_p: float = 0.1,
    ):
        super().__init__()

        act_layer = {
            "relu": nn.ReLU,
            "tanh": nn.Tanh,
            "sigmoid": nn.Sigmoid,
        }[activation]

        layers = []
        prev = in_dim

        for h in hidden_dims:
            layers.append(nn.Linear(prev, h))
            if use_batchnorm:
                layers.append(nn.BatchNorm1d(h))
            layers.append(act_layer())
            if dropout_p > 0.0:
                layers.append(nn.Dropout(dropout_p))
            prev = h

        # Regression output: a single continuous value
        layers.append(nn.Linear(prev, 1))

        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)  # shape [batch, 1]



In [20]:
@torch.no_grad()
def rmse(pred, target):
    return torch.sqrt(torch.mean((pred - target) ** 2)).item()

@torch.no_grad()
def mae(pred, target):
    return torch.mean(torch.abs(pred - target)).item()




In [21]:
@torch.no_grad()
def evaluate_regression(model, loader, device):
    model.eval()
    loss_fn = nn.MSELoss()

    total_loss = 0.0
    total_n = 0

    # For metrics
    preds_all = []
    targets_all = []

    for xb, yb in loader:
        xb = xb.to(device)
        yb = yb.to(device)

        pred = model(xb)
        loss = loss_fn(pred, yb)

        total_loss += loss.item() * xb.size(0)
        total_n += xb.size(0)

        preds_all.append(pred)
        targets_all.append(yb)

    preds_all = torch.cat(preds_all, dim=0)
    targets_all = torch.cat(targets_all, dim=0)

    return (total_loss / total_n), rmse(preds_all, targets_all), mae(preds_all, targets_all)



In [22]:

def train_regression(
    model,
    train_loader,
    val_loader,
    device,
    optimizer_name="adam",
    lr=1e-3,
    weight_decay=1e-4,   # L2 regularization
    l1_lambda=0.0,       # optional L1
    epochs=50,
    grad_clip=None
):
    model.to(device)
    loss_fn = nn.MSELoss()

    optimizer_cls = {
        "sgd": torch.optim.SGD,
        "rmsprop": torch.optim.RMSprop,
        "adam": torch.optim.Adam,
    }[optimizer_name]

    if optimizer_name == "sgd":
        optimizer = optimizer_cls(
            model.parameters(),
            lr=lr,
            momentum=0.9,
            weight_decay=weight_decay
        )
    else:
        optimizer = optimizer_cls(
            model.parameters(),
            lr=lr,
            weight_decay=weight_decay
        )

    for epoch in range(1, epochs + 1):
        model.train()
        running_loss = 0.0
        total_n = 0

        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device)

            optimizer.zero_grad(set_to_none=True)

            pred = model(xb)
            loss = loss_fn(pred, yb)

            # Optional L1 regularization
            if l1_lambda > 0.0:
                l1 = 0.0
                for p in model.parameters():
                    if p.dim() >= 2:
                        l1 = l1 + p.abs().sum()
                loss = loss + l1_lambda * l1

            # Backpropagation
            loss.backward()

            # Optional gradient clipping
            if grad_clip is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

            optimizer.step()

            running_loss += loss.item() * xb.size(0)
            total_n += xb.size(0)

        train_mse = running_loss / total_n
        val_mse, val_rmse, val_mae = evaluate_regression(model, val_loader, device)

        print(
            f"epoch {epoch:03d} | "
            f"train_mse={train_mse:.5f} | "
            f"val_mse={val_mse:.5f} val_rmse={val_rmse:.5f} val_mae={val_mae:.5f}"
        )


In [23]:
def get_california_housing_loaders(batch_size=128, val_ratio=0.2, seed=42):
    # 1) Load data (X: features, y: target)
    data = fetch_california_housing()
    X = data.data
    y = data.target.reshape(-1, 1)  # shape [N, 1]

    # 2) Train/val split
    X_train, X_val, y_train, y_val = train_test_split(
        X, y,
        test_size=val_ratio,
        random_state=seed,
        shuffle=True
    )

    # 3) Standardize features using ONLY the training statistics
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)

    # 4) Convert to torch tensors (float32 is standard)
    X_train_t = torch.tensor(X_train, dtype=torch.float32)
    y_train_t = torch.tensor(y_train, dtype=torch.float32)
    X_val_t = torch.tensor(X_val, dtype=torch.float32)
    y_val_t = torch.tensor(y_val, dtype=torch.float32)

    # 5) TensorDataset + DataLoader
    train_ds = TensorDataset(X_train_t, y_train_t)
    val_ds = TensorDataset(X_val_t, y_val_t)

    # On macOS: num_workers=0 is most compatible
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader, X_train.shape[1]


In [28]:
def main():
    set_seed(42)
    device = get_device()
    print("Device:", device)

    train_loader, val_loader, in_dim = get_california_housing_loaders(
        batch_size=128,
        val_ratio=0.2,
        seed=42
    )

    # Model configuration: change one knob at a time to study effects
    model = MLPRegressor(
        in_dim=in_dim,
        hidden_dims=(128, 64),
        activation="tanh",     # try: "tanh", "sigmoid"
        use_batchnorm=True,    # try: False
        dropout_p=0.1,         # try: 0.0, 0.2
    )

    train_regression(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        device=device,
        optimizer_name="adam",  # try: "sgd", "rmsprop", "adam"
        lr=3e-4,
        weight_decay=1e-4,      # L2
        l1_lambda=0.0,          # optional L1, start tiny: 1e-7 or 1e-6
        epochs=50,
        grad_clip=None
    )


if __name__ == "__main__":
    main()


Device: mps
epoch 001 | train_mse=4.55459 | val_mse=3.61438 val_rmse=1.90115 val_mae=1.75046
epoch 002 | train_mse=3.10202 | val_mse=2.15097 val_rmse=1.46662 val_mae=1.26562
epoch 003 | train_mse=1.87413 | val_mse=1.05130 val_rmse=1.02533 val_mae=0.79759
epoch 004 | train_mse=1.14297 | val_mse=0.67626 val_rmse=0.82235 val_mae=0.61206
epoch 005 | train_mse=0.82561 | val_mse=0.51963 val_rmse=0.72085 val_mae=0.52879
epoch 006 | train_mse=0.70214 | val_mse=0.48182 val_rmse=0.69413 val_mae=0.51605
epoch 007 | train_mse=0.63389 | val_mse=0.46443 val_rmse=0.68149 val_mae=0.50169
epoch 008 | train_mse=0.59966 | val_mse=0.42112 val_rmse=0.64894 val_mae=0.46240
epoch 009 | train_mse=0.56448 | val_mse=0.41030 val_rmse=0.64054 val_mae=0.46063
epoch 010 | train_mse=0.54817 | val_mse=0.40634 val_rmse=0.63745 val_mae=0.45413
epoch 011 | train_mse=0.54150 | val_mse=0.41062 val_rmse=0.64080 val_mae=0.46121
epoch 012 | train_mse=0.52444 | val_mse=0.41419 val_rmse=0.64358 val_mae=0.46928
epoch 013 | trai