In [10]:
!pip install torch wandb huggingface_hub



In [11]:
##################################################
# 1) Imports & Basic Setup
##################################################
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# Attempt to load secrets from colab
try:
    from google.colab import userdata
    # 1) HF_TOKEN
    hf_tok = userdata.get("HF_TOKEN")
    if hf_tok:
        os.environ["HF_TOKEN"] = hf_tok
        print("Set HF_TOKEN from colab secrets.")
    # 2) W&B
    wandb_key = userdata.get("wandb")
    if wandb_key:
        os.environ["wandb"] = wandb_key
        print("Set wandb from colab secrets.")
except:
    pass

hf_token  = os.environ.get("HF_TOKEN", None)
wandb_key = os.environ.get("wandb", None)
print("hf_token:", bool(hf_token))
print("wandb_key:", bool(wandb_key))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

##################################################
# 2) Larger MLP (~9.6k params)
##################################################
class LargeMLP(nn.Module):
    """
    We'll downsample MNIST to 16x16 => input dim=256
    Then 2 hidden layers of 32 each => total ~9.6k parameters.

    Calculation:
      fc1: (256 * 32) + 32 = 8192 + 32 = 8224
      fc2: (32 * 32) + 32  = 1024 + 32 = 1056
      fc3: (32 * 10) + 10  = 320 + 10 = 330
      total => 8224 + 1056 + 330 = 9610
    """
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(256, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, 10)

    def forward(self, x):
        # x shape: [B,1,16,16]
        x = x.view(x.size(0), -1)  # => [B,256]
        x = torch.relu(self.fc1(x)) # => [B,32]
        x = torch.relu(self.fc2(x)) # => [B,32]
        x = self.fc3(x)             # => [B,10]
        return x

##################################################
# 3) Naive Newton Optimizer
##################################################
class NaiveNewtonOptimizer(torch.optim.Optimizer):
    """
    Dense Hessian => O(N^3). Must call step(closure).
    Great for demonstration with small networks only.
    """
    def __init__(self, params, lr=1.0, tol=1e-6):
        defaults = dict(lr=lr, tol=tol)
        super().__init__(params, defaults)

    def step(self, closure=None):
        if closure is None:
            raise RuntimeError("NaiveNewtonOptimizer needs a closure returning the loss (tensor).")

        loss = closure()
        loss.backward(create_graph=True)

        for group in self.param_groups:
            lr = group['lr']
            tol = group['tol']

            for p in group['params']:
                if p.grad is None:
                    continue
                grad = p.grad.view(-1)
                if grad.norm() < tol:
                    continue

                n = grad.numel()
                # Build Hessian
                H = []
                for i in range(n):
                    g_i = grad[i]
                    p.grad = None
                    g_i.backward(retain_graph=True)
                    H_i = p.grad.view(-1).clone()
                    H.append(H_i)
                    p.grad = None
                H = torch.stack(H, dim=1)  # => [n, n]

                # Solve H dx = grad
                try:
                    dx, _ = torch.solve(grad.unsqueeze(1), H)
                    dx = dx.squeeze(1)
                except RuntimeError:
                    dx = grad  # fallback

                # Safely do in-place update using .data
                p.data.sub_((lr * dx).view(p.shape))

        return loss

##################################################
# 3) Naive Gradient Descent Optimizer
##################################################
class NaiveGradientDescent(torch.optim.Optimizer):
    """
    A pure "vanilla" Gradient Descent:
    p <- p - lr * grad(p)

    Must call step(closure) where the closure:
      - zeroes grads
      - does forward pass
      - returns the loss (tensor)
    Then we do normal .backward() for the first derivative only.
    """
    def __init__(self, params, lr=0.01):
        defaults = dict(lr=lr)
        super().__init__(params, defaults)

    def step(self, closure=None):
        if closure is None:
            raise RuntimeError("NaiveGradientDescent needs a closure returning the loss (tensor).")

        # 1) Recompute the forward pass
        loss = closure()
        # 2) Normal backward => first derivatives
        loss.backward()

        # 3) Update
        for group in self.param_groups:
            lr = group['lr']
            for p in group['params']:
                if p.grad is None:
                    continue
                p.data.sub_(lr * p.grad)

        return loss


##################################################
# 4) DataLoaders with downsampling to 16x16
##################################################
def get_mnist_loaders(batch_size=64):
    transform = transforms.Compose([
        transforms.Resize((16,16)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    train_ds = datasets.MNIST(root=".", train=True, download=True, transform=transform)
    test_ds  = datasets.MNIST(root=".", train=False, download=True, transform=transform)
    train_loader = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    test_loader  = torch.utils.data.DataLoader(test_ds, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader

##################################################
# 5) Train function
##################################################
def train_model(
    optimizer_name: str = "",
    learning_rate: float = 1e-2,
    epochs: int = 2,
    batch_size: int = 64,
    wandb_project: str = ""
):
    """
    We'll:
      - Create ~9.6k param MLP (16x16 input => 2 hidden layers of 32)
      - Use "gd", "newton", or "sgd"
      - Possibly log each step to wandb
    """
    model = LargeMLP().to(device)
    train_loader, test_loader = get_mnist_loaders(batch_size)
    criterion = nn.CrossEntropyLoss()

    # W&B
    wandb_key = os.environ.get("wandb", None)
    do_wandb = (wandb_project != "") and (wandb_key is not None)
    if do_wandb:
        import wandb
        wandb.login(key=wandb_key)
        wandb.init(project=wandb_project, config={
            "optimizer_name": optimizer_name,
            "learning_rate": learning_rate,
            "epochs": epochs,
            "batch_size": batch_size
        })

    # pick optimizer
    if optimizer_name.lower() == "gd":
        opt = NaiveGradientDescent(model.parameters(), lr=learning_rate)
    elif optimizer_name.lower() == "newton":
        opt = NaiveNewtonOptimizer(model.parameters(), lr=learning_rate)
    elif optimizer_name.lower() == "sgd":
        opt = torch.optim.SGD(model.parameters(), lr=learning_rate)
    else:
        raise ValueError(f"Unknown optimizer {optimizer_name}")

    global_step = 0
    for epoch in range(epochs):
        model.train()
        total_loss = 0.0

        for batch_idx, (data, targets) in enumerate(train_loader):
            data, targets = data.to(device), targets.to(device)

            def closure():
                # We do zero_grad INSIDE closure for both newton & naive GD
                opt.zero_grad()
                outputs = model(data)
                loss_t = criterion(outputs, targets)
                return loss_t

            # FIX => always use step(closure=closure)
            # so that naive GD also gets the closure
            loss_tensor = opt.step(closure=closure)
            loss_val = loss_tensor.item()

            total_loss += loss_val
            global_step += 1

            # Print every training step
            print(f"Epoch[{epoch}/{epochs}] Step[{global_step}] Loss={loss_val:.4f}")

            if do_wandb:
                import wandb
                wandb.log({"train_loss": loss_val}, step=global_step)

        # Evaluate
        model.eval()
        correct = 0
        test_loss = 0.0
        with torch.no_grad():
            for data, targets in test_loader:
                data, targets = data.to(device), targets.to(device)
                out = model(data)
                l = criterion(out, targets)
                test_loss += l.item() * data.size(0)
                _, pred = out.max(1)
                correct += pred.eq(targets).sum().item()
        test_loss /= len(test_loader.dataset)
        accuracy = 100.*correct / len(test_loader.dataset)
        epoch_loss = total_loss / len(train_loader)

        print(f"Epoch {epoch+1}/{epochs} => TrainLoss={epoch_loss:.4f} TestLoss={test_loss:.4f} Acc={accuracy:.2f}%")

        if do_wandb:
            wandb.log({
                "epoch": epoch,
                "train_loss_epoch": epoch_loss,
                "test_loss": test_loss,
                "test_accuracy": accuracy
            }, step=global_step)

    if do_wandb:
        import wandb
        wandb.finish()

    return model


Set HF_TOKEN from colab secrets.
Set wandb from colab secrets.
hf_token: True
wandb_key: True
Using device: cuda


In [12]:
##################################################
# 6) Main
##################################################
def main(
    optimizer_name="gd",
    lr=1.0,
    epochs=2,
    batch_size=16,
    wandb_project="AAH-IA__newton-rhapson__",
):
    hf_token = os.environ.get("HF_TOKEN", None)
    wandb_key = os.environ.get("wandb", None)

    print("HF token:", bool(hf_token))
    print("W&B key:", bool(wandb_key))
    print(f"Using {optimizer_name} with ~9.6k param model. LR={lr}, epochs={epochs}, batch_size={batch_size}")

    model = train_model(
        optimizer_name=optimizer_name,
        learning_rate=lr,
        epochs=epochs,
        batch_size=batch_size,
        wandb_project=wandb_project
    )
    print("Done with main(). Returning model.")
    return model


# Example if we want to run immediately:
if __name__ == "__main__":
    model = main(
        optimizer_name="gd",
        lr=0.01,
        epochs=2,
        batch_size=16,
        wandb_project="AAH-IA__gradient-descent__"  # or AAH-IA__gradient-descent__
    )

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


HF token: True
W&B key: True
Using gd with ~9.6k param model. LR=0.01, epochs=2, batch_size=16


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch[0/2] Step[2503] Loss=0.7506
Epoch[0/2] Step[2504] Loss=0.8466
Epoch[0/2] Step[2505] Loss=0.4546
Epoch[0/2] Step[2506] Loss=0.2943
Epoch[0/2] Step[2507] Loss=0.6110
Epoch[0/2] Step[2508] Loss=0.4485
Epoch[0/2] Step[2509] Loss=0.2769
Epoch[0/2] Step[2510] Loss=0.6344
Epoch[0/2] Step[2511] Loss=0.2812
Epoch[0/2] Step[2512] Loss=0.5307
Epoch[0/2] Step[2513] Loss=0.1942
Epoch[0/2] Step[2514] Loss=0.8236
Epoch[0/2] Step[2515] Loss=0.4240
Epoch[0/2] Step[2516] Loss=0.5177
Epoch[0/2] Step[2517] Loss=0.4554
Epoch[0/2] Step[2518] Loss=0.2865
Epoch[0/2] Step[2519] Loss=0.4628
Epoch[0/2] Step[2520] Loss=0.4715
Epoch[0/2] Step[2521] Loss=0.2599
Epoch[0/2] Step[2522] Loss=0.2047
Epoch[0/2] Step[2523] Loss=0.5134
Epoch[0/2] Step[2524] Loss=0.6395
Epoch[0/2] Step[2525] Loss=0.3021
Epoch[0/2] Step[2526] Loss=0.3673
Epoch[0/2] Step[2527] Loss=0.2934
Epoch[0/2] Step[2528] Loss=0.1952
Epoch[0/2] Step[2529] Loss=0.9152
Epoch[0/2] Step[2

0,1
epoch,▁█
test_accuracy,▁█
test_loss,█▁
train_loss,█▆▆▇▃▂▅▂▆▂▃▄▂▁▂▃▃▂▂▃▂▂▃▄▁▂▂▁▁▃▁▂▁▄▂▃▃▃▂▃
train_loss_epoch,█▁

0,1
epoch,1.0
test_accuracy,92.03
test_loss,0.26934
train_loss,0.09931
train_loss_epoch,0.32975


Done with main(). Returning model.


In [13]:
import wandb

wandb.finish()