In [139]:
import torch
from torch import nn
from torchvision import datasets
import fastai 
from torchvision.transforms import ToTensor
# from fastai.data.core import DataLoader
from torch.utils.data import DataLoader
from fastai.data.core import DataLoaders
from fastai.callback.core import Callback
from fastai.vision.all import Learner, Metric
from fastai import optimizer
import torch.nn.functional as F


In [3]:
model = nn.Sequential(
    nn.Linear(28*28, 30),
    nn.ReLU(), 
    nn.Linear(28*28, 10)
)

In [4]:
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [5]:
batch_size = 256

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([256, 1, 28, 28])
Shape of y: torch.Size([256]) torch.int64


In [6]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [143]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(), 
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits 

model = NeuralNetwork().to(device)
model


NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=10, bias=True)
  )
)

In [41]:
# def loss_fn_custom(pred, y):
#     fn = nn.CrossEntropyLoss()
#     curr = fn(pred, y)

    

loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [42]:
class SGDBasic:
    def __init__(self, params, lr): 
        self.params,self.lr = list(params),lr
        self.state = {p: {} for p in self.params}
        self.hypers = [{'lr': lr}]
        
    def step(self, *args, **kwargs):
        for p in self.params: 
            p.data -= p.grad.data * self.lr

    def zero_grad(self, *args, **kwargs):
        for p in self.params:
            p.grad = None

    def set_hypers(self, **kwargs):
        if 'lr' in kwargs:
            self.lr = kwargs['lr']
            self.hypers[0]['lr'] = kwargs['lr']
            
optimizer = BasicOptimizer(model.parameters(), lr=1e-3)

In [142]:

class ProxSGD:
    def __init__(self, params, lr):
        self.params, self.lr = list(params), lr
        self.state = {p: {} for p in self.params}
        self.hypers = [{'lr': lr}]
    
    def soft_threshold(self, x, eta):
        # Apply the soft-thresholding operator
        return F.softshrink(x, lambd=eta)
        
    def prox_operator(self, x, eta):
        # Use the soft-thresholding operator as the proximal step
        return self.soft_threshold(x, eta)

    def Gt(self, x, eta, x_grad):
        return (1/self.lr) * (x - self.prox_operator(x - self.lr * x_grad, eta))
        
    def step(self, *args, **kwargs):
        for p in self.params:
            if p.grad is not None:  # Ensure gradients exist
                p.data -= self.lr * self.Gt(p.data, 1e-5, p.grad.data)

    def zero_grad(self, *args, **kwargs):
        for p in self.params:
            p.grad = None

    def set_hypers(self, **kwargs):
        if 'lr' in kwargs:
            self.lr = kwargs['lr']
            self.hypers[0]['lr'] = kwargs['lr']


optimizer = BasicOptimizer(model.parameters(), lr=1e-3)
    

In [9]:
# def train(dataloader, model, loss_fn, optimizer):
#     size = len(dataloader.dataset)
#     model.train()
#     for batch, (X, y) in enumerate(dataloader):
#         X, y = X.to(device), y.to(device)

#         # Compute prediction error
#         pred = model(X)
#         loss = loss_fn(pred, y)

#         # Backpropagation
#         loss.backward()
#         optimizer.step()
#         optimizer.zero_grad()

#         if batch % 100 == 0:
#             print(f"BATCH: {batch} of {size/batch_size} batches")
#             loss, current = loss.item(), (batch + 1) * len(X)
#             print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


In [11]:
# def test(dataloader, model, loss_fn):
#     size = len(dataloader.dataset)
#     num_batches = len(dataloader)
#     model.eval()
#     test_loss, correct = 0, 0
#     with torch.no_grad():
#         for X, y in dataloader:
#             X, y = X.to(device), y.to(device)
#             pred = model(X)
#             test_loss += loss_fn(pred, y).item()
#             correct += (pred.argmax(1) == y).type(torch.float).sum().item()
#     test_loss /= num_batches
#     correct /= size
#     print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    

In [43]:
dls = DataLoaders(train_dataloader, test_dataloader)

In [13]:
class CustomAccuracy1(Metric):
    def __init__(self):
        self.correct = 0
        self.total = 0

    def reset(self):
        self.correct = 0
        self.total = 0

    def accumulate(self, learn):
        # Get max probability of the final dimension
        preds = learn.pred.argmax(dim=-1)
        self.correct += (preds == learn.y).sum().item()
        self.total += len(learn.y)

    @property
    def value(self):
        return (self.correct / self.total) * 100 if self.total > 0 else None

    @property
    def name(self):
        return "Accuracy"

In [14]:
class CustomAccuracy2(Metric):
    def __init__(self):
        self.correct = 0
        self.total = 0

    def reset(self):
        self.correct = 0
        self.total = 0

    def accumulate(self, learn):
        # Get max probability of the final dimension
        preds = learn.pred.argmax(dim=-1)
        self.correct += (preds == learn.y).sum().item()
        self.total += len(learn.y)

    @property
    def value(self):
        return (self.correct / self.total) * 100 if self.total > 0 else None

    @property
    def name(self):
        return "meowMeow2"

In [15]:
class MixAccuracy(Metric):
    def __init__(self):
        self.c1 = CustomAccuracy1()
        self.c2 = CustomAccuracy2()

    def reset(self):
        self.c1.reset()
        self.c2.reset()

    def accumulate(self, learn):
        self.c1.accumulate(learn)
        self.c2.accumulate(learn)

    @property
    def value(self):
        return self.c1.value, self.c2.value

    @property
    def name(self):
        return f"{self.c1.name}_{self.c2.name}"

In [103]:
class RegularizationCallback(Callback):
    # def __init__(self, model, lambda_reg):
    #     self.model = model
    #     self.lambda_reg = lambda_reg
    def __init__(self):
        self.lambda_reg = 1e-3
    def after_loss(self):
        # Compute L2 regularization term
        l1_reg = 0
        for param in self.model.parameters():
            l1_reg += torch.sum(torch.abs(param))
        # self.learn.loss += self.lambda_reg * l1_reg * self.lr
    def after_epoch(self):
        print(self.loss)
        
        

In [144]:
learner = Learner(dls, model, opt_func=ProxSGD, loss_func=loss_fn, metrics=CustomAccuracy1)

In [146]:
df = learner.fit(20, lr=1e-1)
df

epoch,train_loss,valid_loss,Accuracy,time
0,0.842067,0.593232,81.2,00:06
1,0.399875,0.405525,87.71,00:06
2,0.333211,0.344204,89.74,00:06
3,0.294047,0.303387,90.93,00:06
4,0.261955,0.271596,91.91,00:06
5,0.234735,0.245626,92.62,00:06
6,0.212463,0.225291,93.44,00:06


KeyboardInterrupt: 

In [72]:
s = 0
for param in model.parameters():
    s += torch.sum(torch.abs(param))
print(s)

tensor(19129.4316, device='cuda:0', grad_fn=<AddBackward0>)


In [77]:
s = 0
for param in model.parameters():
    s += torch.sum(torch.abs(param))
print(s)

tensor(14883.8662, device='cuda:0', grad_fn=<AddBackward0>)


In [62]:
m = nn.Threshold(0.1, 20)
input = torch.randn(2)
output = m(input)

In [64]:
output, input

(tensor([20.0000,  2.4018]), tensor([-1.2521,  2.4018]))