In [1]:
#|export
import pickle,gzip,math,os,time,shutil,torch,matplotlib as mpl,numpy as np,matplotlib.pyplot as plt
from pathlib import Path
from torch import tensor,nn
import torch.nn.functional as F

In [2]:
from fastcore.test import test_close

torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'

path_data = Path('../data')
path_gz = path_data/'mnist.pkl.gz'
with gzip.open(path_gz, 'rb') as f: ((x_train, y_train), (x_valid, y_valid), _) = pickle.load(f, encoding='latin-1')
x_train, y_train, x_valid, y_valid = map(tensor, [x_train, y_train, x_valid, y_valid])

In [3]:
x_train.shape, y_train.shape

(torch.Size([50000, 784]), torch.Size([50000]))

### Creating a simple model

In [4]:
n_train_data, n_features = x_train.shape
n_hidden_activations = 50
n_out_activation = 10

In [5]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = [nn.Linear(n_features,n_hidden_activations),
                       nn.ReLU(),
                      nn.Linear(n_hidden_activations,n_out_activation)]
    def forward(self,X):
        x = X.clone()
        for layer in self.layers:
            x = layer(x)
        return x

In [6]:
model = Model()
preds = model(x_train)
preds.shape

torch.Size([50000, 10])

### Training step

In [7]:
batch_size = 100
lr = 0.5 # learning rate
epochs = 3

In [8]:
def accuracy(preds,targets):
    return (preds.argmax(dim=1)==targets).float().mean()

In [9]:
for epoch in range(epochs):
    for i in range(0,n_train_data,batch_size):
        s = slice(i,min(n_train_data,i+batch_size))
        x_batch,y_batch = x_train[s],y_train[s]
        preds = model(x_batch)
        loss = F.cross_entropy(preds,y_batch)
        loss.backward()
        with torch.no_grad():
            for layer in model.layers:
                if hasattr(layer,"weight"):
                    layer.weight -= lr*layer.weight.grad
                    layer.bias -= lr*layer.bias.grad
                    layer.weight.grad.zero_()
                    layer.bias.grad.zero_()
    acc = accuracy(preds,y_batch)
    print(f"current epoch= {epoch}, current loss = {loss:.2f}, accuracy = {acc:.2f}")

current epoch= 0, current loss = 0.15, accuracy = 0.95
current epoch= 1, current loss = 0.12, accuracy = 0.96
current epoch= 2, current loss = 0.11, accuracy = 0.95


### Using parameters and Optim

In [10]:
class MyModule:
    def __init__(self):
        self._modules = {}
        self.l1 = nn.Linear(n_features,n_hidden_activations)
        self.l2 = nn.Linear(n_hidden_activations,n_out_activation)
    
    def __setattr__(self,attr_name,value):
        if not attr_name.startswith("_"):
            self._modules[attr_name] = value
        super().__setattr__(attr_name,value)
    
    def __repr__(self):
        return f"{self._modules}"
    
    def parameters(self):
        for layer in self._modules.values():
            for parameter in layer.parameters():
                yield parameter.data

In [11]:
module = MyModule()

In [12]:
module

{'l1': Linear(in_features=784, out_features=50, bias=True), 'l2': Linear(in_features=50, out_features=10, bias=True)}

In [13]:
for parameter in module.parameters():
    print(parameter.shape)

torch.Size([50, 784])
torch.Size([50])
torch.Size([10, 50])
torch.Size([10])


In [14]:
### creating the optim class
class Optimizer:
    def __init__(self,parameters,lr=0.5):
        self.parameters = list(parameters)
        self.lr = lr
    
    def step(self):
        with torch.no_grad():
            for parameter in self.parameters:
                parameter -= self.lr * parameter.grad
    
    def zero_grad(self):
        for parameter in self.parameters:
            parameter.grad.data.zero_()
            

In [15]:
model = nn.Sequential( 
    nn.Linear(n_features,n_hidden_activations),
    nn.ReLU(),
    nn.Linear(n_hidden_activations,n_out_activation)
)

In [16]:
opt = Optimizer(model.parameters())

In [17]:
for epoch in range(epochs):
    for i in range(0,n_train_data,batch_size):
        s = slice(i,min(n_train_data,i+batch_size))
        xb,yb = x_train[s], y_train[s]
        preds = model(xb)
        loss = F.cross_entropy(preds,yb)
        loss.backward()
        opt.step()
        opt.zero_grad()
    acc = accuracy(preds,yb)
    print(f"current epoch= {epoch}, current loss = {loss:.2f}, accuracy = {acc:.2f}")

current epoch= 0, current loss = 0.17, accuracy = 0.93
current epoch= 1, current loss = 0.10, accuracy = 0.96
current epoch= 2, current loss = 0.06, accuracy = 1.00


we can now use pytoch optimizer

In [18]:
from torch import optim

In [19]:
def get_model():
    model = nn.Sequential( 
        nn.Linear(n_features,n_hidden_activations),
        nn.ReLU(),
        nn.Linear(n_hidden_activations,n_out_activation)
    )
    opt = optim.SGD(model.parameters(),lr=0.5)
    return model,opt
    

In [89]:
model,opt = get_model()

In [21]:
for epoch in range(epochs):
    for i in range(0,n_train_data,batch_size):
        s = slice(i,min(n_train_data,i+batch_size))
        xb,yb = x_train[s], y_train[s]
        preds = model(xb)
        loss = F.cross_entropy(preds,yb)
        loss.backward()
        opt.step()
        opt.zero_grad()
    acc = accuracy(preds,yb)
    print(f"current epoch= {epoch}, current loss = {loss:.2f}, accuracy = {acc:.2f}")

current epoch= 0, current loss = 0.15, accuracy = 0.96
current epoch= 1, current loss = 0.08, accuracy = 0.98
current epoch= 2, current loss = 0.06, accuracy = 0.99


### Reimplementing Datasets and Dataloader

In [22]:
class Dataset:
    def __init__(self,train_data,label_data):
        self.train_data = train_data
        self.label_data = label_data
    def __len__(self):
        return len(self.train_data)
    def __getitem__(self,i):
        return self.train_data[i],self.label_data[i]

In [23]:
train_ds = Dataset(x_train,y_train)
val_ds = Dataset(x_valid,y_valid)

In [24]:
train_ds[:5]

(tensor([[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]),
 tensor([5, 0, 4, 1, 9]))

In [25]:
class Dataloader:
    def __init__(self,dataset,batch_size=50):
        self.dataset=dataset
        self.batch_size=batch_size
        self.n = len(dataset)
    
    def __iter__(self):
        for i in range(0,self.n,self.batch_size):
            s = slice(i,min(self.n,i+self.batch_size))
            yield self.dataset[s]

In [26]:
train_dl = Dataloader(train_ds)
val_dl = Dataloader(val_ds)

In [27]:
### traing using our implemented dataloader
for epoch in range(epochs):
    count = 0
    train_loss = 0
    train_accuracy = 0
    for xb,yb in train_dl:
        preds = model(xb)
        loss = F.cross_entropy(preds,yb)
        train_loss += loss
        loss.backward()
        opt.step()
        opt.zero_grad()
        train_accuracy += accuracy(preds,yb)
        count+=1
    
    train_loss/=count
    train_accuracy/=count
    
    model.eval()
    val_loss = 0
    count = 0
    val_accuracy = 0
    
    for xb,yb in val_dl:
        val_preds = model(xb)
        val_loss+= F.cross_entropy(val_preds,yb)
        val_accuracy += accuracy(val_preds,yb)
        count+=1
    
    val_loss/=count
    val_accuracy/=count
    
    print(f"current epoch= {epoch} ")
    print(f"current train loss = {train_loss:.2f}, train accuracy = {train_accuracy:.2f}")
    print(f"current val loss = {val_loss:.2f}, train accuracy = {val_accuracy:.2f} \n")
    

current epoch= 0 
current train loss = 0.12, train accuracy = 0.96
current val loss = 0.13, train accuracy = 0.96 

current epoch= 1 
current train loss = 0.09, train accuracy = 0.97
current val loss = 0.13, train accuracy = 0.97 

current epoch= 2 
current train loss = 0.07, train accuracy = 0.98
current val loss = 0.14, train accuracy = 0.96 



### Random sampling Dataloader

In [28]:
import random

In [31]:
class Sampler:
    def __init__(self,dataset,shuffle=False):
        self.n = len(dataset)
        self.shuffle = shuffle
    def __iter__(self):
        result = list(range(self.n))
        if self.shuffle:
            random.shuffle(result)
        return iter(result)

In [87]:
train_sampler = Sampler(train_ds,True)

In [81]:
from itertools import islice

In [None]:
zip()