# **MuOn optimizer - ready to run**

by Fantine MPACKO PRISO for EECS182

# Base Setup

In [None]:
import torch
from torch import nn
from torch.optim.optimizer import Optimizer #imports the base class for all optimizers

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('We will be using', device)



We will be using cuda


# Newton-Schulz implementation

In [None]:
import math

def newton_schulz(M : torch.tensor, epsilon=0.001) :
    ### M is a 2D pythorch tensor (i.e. a matrix)
    ### epsilon is a constant to avoid dividing by 0
    ### we'll use

    iters=2
    if M.ndim != 2: #we only want matrixes
      return M
    frobenius_norm=torch.linalg.norm(M)
    if not torch.isfinite(frobenius_norm) or frobenius_norm<epsilon :
      return M
    X= M/frobenius_norm #normalized M
    I = torch.eye(X.shape[1], device=X.device, dtype=X.dtype)
    for k in range(iters):
      XtX = X.T @ X
      X = 0.5 * X @ (3*I - XtX) #polynom seen in class
    return X * frobenius_norm #return to original scale




# MuOn optimizer

In [None]:
class MuOn(Optimizer):
    def __init__(self, params, lr=1e-3, momentum=0.9, weight_decay=0) :
        default=dict(lr=lr, momentum=momentum, weight_decay=weight_decay)
        super().__init__(params, default) #passes everything to the base Optimizer class.



    @torch.no_grad()
    def step(self, closure=None) :
        loss=None
        for group in self.param_groups:
            lr = group['lr']
            mom = group['momentum']
            wd  = group['weight_decay']
            for k in group['params']:
                if k.grad is None: # Before backward prop, gradients are None
                    continue
                grad=k.grad
                if wd != 0:
                    k.add_(k, alpha=-lr * wd) #L2 regularization
                state=self.state[k]
                if 'buf' not in state: #buf is the momentum vector (same size as k)
                    state['buf'] = torch.zeros_like(k)
                buf = state['buf']
                buf.mul_(mom).add_(grad)  # smooths gradients over time and accelerates in persistent directions

                update = newton_schulz(buf, 1e-5) #pass it through Newton–Schulz to make the update more orthogonal for 2D weight matrices
                k.add_(update, alpha=-lr)

        return loss

# Demo : MLP training
with help for implementation

In [None]:


import re, torch
from torch import nn
from torch.optim import Optimizer
from torch.utils.data import DataLoader, Dataset
from datasets import load_dataset
from sklearn.metrics import accuracy_score, f1_score
import math


# Load  Yelp Polarity : 5k train / 1k test
ds = load_dataset("yelp_polarity", split={"train": "train[:5000]", "test": "test[:1000]"})

#  BoW hashing simple
V = 30000
token_re = re.compile(r"[^a-zàâäéèêëîïôöùûüç'\s-]", re.IGNORECASE)

def tokenize(s: str):
    s = s.lower()
    s = token_re.sub(" ", s)
    return [t for t in s.split() if t]

def bow_hash(tokens):
    v = torch.zeros(V)
    for t in tokens:
        v[hash(t) % V] += 1.0
    return v

def to_tensor(batch):
    X = [bow_hash(tokenize(s)) for s in batch["text"]]
    y = torch.tensor(batch["label"], dtype=torch.long)
    return {"X": torch.stack(X), "y": y}

ds = {k: v.map(to_tensor, batched=True, batch_size=256).with_format("torch") for k, v in ds.items()}

class HFDS(Dataset):
    def __init__(self, d): self.d = d
    def __len__(self): return len(self.d)
    def __getitem__(self, i): return self.d[i]["X"], self.d[i]["y"]

train_loader = DataLoader(HFDS(ds["train"]), batch_size=128, shuffle=True, num_workers=2)
test_loader  = DataLoader(HFDS(ds["test"]),  batch_size=512, shuffle=False, num_workers=2)


class BoWLogReg(nn.Module):
    def __init__(self, d_in):
        super().__init__()
        self.fc = nn.Linear(d_in, 2)
    def forward(self, x):
        return self.fc(x)


device = "cuda" if torch.cuda.is_available() else "cpu"
model  = BoWLogReg(V).to(device)
opt    = MuOn(model.parameters(), lr=2e-2, momentum=0.9, weight_decay=1e-4)  # tu peux ajuster lr/momentum/wd
lossfn = nn.CrossEntropyLoss()

EPOCHS = 4
for ep in range(1, EPOCHS+1):
    # train
    model.train()
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        opt.zero_grad(set_to_none=True)
        logits = model(xb)
        loss = lossfn(logits, yb)
        loss.backward()
        opt.step()

    # test
    model.eval()
    preds, ys = [], []
    with torch.no_grad():
        for xb, yb in test_loader:
            logits = model(xb.to(device))
            preds.append(logits.argmax(1).cpu()); ys.append(yb)
    pred = torch.cat(preds); ytrue = torch.cat(ys)
    acc = accuracy_score(ytrue, pred)
    f1  = f1_score(ytrue, pred)
    print(f"[ep {ep:02d}] test acc={acc:.3f} f1={f1:.3f}")


Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1000 [00:00<?, ? examples/s]

[ep 01] test acc=0.774 f1=0.714
[ep 02] test acc=0.862 f1=0.847
[ep 03] test acc=0.851 f1=0.829
[ep 04] test acc=0.875 f1=0.867
