In [1]:
import urllib.request
import gzip
import numpy as np
from tqdm import trange

from giagrad import Tensor
import giagrad.nn as nn
import giagrad.optim as optim 
from giagrad.display import draw_dot

from collections import OrderedDict

from typing import List

## Load data

In [2]:
def fetch(url, type_data = None):
    # Extract the dataset from the compressed file
    with gzip.open(urllib.request.urlopen(url)) as f:
        if type_data == 'label':
            data = np.frombuffer(f.read(), np.uint8, offset=8)
        else:
            data = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28, 28)
    return data

X_train_all = fetch("http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz")
Y_train_all = fetch("http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz", 'label')
X_test_all = fetch("http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz")
Y_test_all = fetch("http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz", 'label')

print(X_train_all.shape)
print(X_test_all.shape)

(60000, 28, 28)
(10000, 28, 28)


### Resize and reshape

In [3]:
n_train = 5000
n_test = 250

# Every row is a flattened image
X_train = Tensor(X_train_all[:n_train].astype(np.float32)).unsqueeze(axis=1)
Y_train = Tensor(Y_train_all[:n_train])

X_test = Tensor(X_test_all[:n_test].astype(np.float32)).unsqueeze(axis=1)
Y_test = Tensor(Y_test_all[:n_test])

## CNN

### Model Creation

In [15]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2D(
            out_channels=16,
            kernel_size=(10,10),
            padding=20,
            stride=2
        ) 
        self.linear1 = nn.Linear(
            in_features=16,
            out_features=64
        )
        self.linear2 = nn.Linear(
            in_features=64,
            out_features=10
        )
        
#         self.dropout = nn.Dropout(p=0.4)
        
    def __call__(self, x):
        x = self.conv1(x).relu()
#         x = self.dropout(x)
        x = x.max(axis=(-2, -1)) # global max pooling
        x = self.linear1(x).relu()
#         x = self.dropout(x)
        return self.linear2(x)


In [16]:
model = Model()
print(model)

Model
	conv1: Conv2D(16, kernel_size=(10, 10), stride=(2, 2), dilation=(1, 1), padding=(20, 20), bias=True)
	linear1: Layer(in=16, out=64, bias=True)
	linear2: Layer(in=64, out=10, bias=True)


### Training

In [17]:
criterion = nn.CrossEntropyLoss(reduction='mean')
optimizer = optim.Adam(
    model.parameters(),
    lr=0.0001,
    amsgrad=True
)

In [18]:
EPOCHS = 50

model.train()

for ite in (prog_bar := trange(EPOCHS)):
    # Zero gradient 
    optimizer.zero_grad() 
    # Pass data through the network
    output = model(X_train)
    # Calculate loss
    loss = criterion(output, Y_train)
    # Backpropagate
    loss.backward()
    # Update weights
    optimizer.step()

    prog_bar.set_description(f"loss: {loss.data}")

loss: 32.37689971923828:   4%|▊                  | 2/50 [00:19<07:41,  9.62s/it]


KeyboardInterrupt: 

### Evaluation

In [8]:
model.eval()

print(f'train accuracy: {(model(X_train).softmax(axis=1).data.argmax(axis=1) == Y_train).mean() * 100} %')
print(f'test accuracy: {(model(X_test).softmax(axis=1).data.argmax(axis=1) == Y_test).mean() * 100} %')

train accuracy: tensor: 10.679999 fn: Mul %
test accuracy: tensor: 8.8 fn: Mul %


## PyTorch

In [9]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [10]:
X_train = torch.from_numpy(X_train.data.copy())
Y_train = torch.from_numpy(Y_train.data.copy())

X_test = torch.from_numpy(X_test.data.copy())
Y_test = torch.from_numpy(Y_test.data.copy())

In [11]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.LazyConv2d(
            out_channels=16,
            kernel_size=(2,2),
            stride=2
        ) 
        self.linear1 = nn.Linear(
            in_features=16,
            out_features=64
        )
        self.linear2 = nn.Linear(
            in_features=64,
            out_features=10
        )
        
#         self.dropout = nn.Dropout(p=0.4)
        
    def __call__(self, x):
        x = F.relu(self.conv1(x))
#         x = self.dropout(x)
        x = x.reshape(*x.shape[0:2], -1)
        x = torch.max(x, dim=-1)[0] # global max pooling
        x = F.relu(self.linear1(x))
#         x = self.dropout(x)
        return self.linear2(x)

In [12]:
model = Model()
print(model)

Model(
  (conv1): LazyConv2d(0, 16, kernel_size=(2, 2), stride=(2, 2))
  (linear1): Linear(in_features=16, out_features=64, bias=True)
  (linear2): Linear(in_features=64, out_features=10, bias=True)
)




In [13]:
criterion = nn.CrossEntropyLoss(reduction='mean')
optimizer = optim.Adam(
    model.parameters(),
    lr=0.0001,
    amsgrad=True
)

In [14]:
EPOCHS = 50

model.train()

for ite in (prog_bar := trange(EPOCHS)):
    # Zero gradient 
    optimizer.zero_grad() 
    # Pass data through the network
    output = model(X_train)
    # Calculate loss
    loss = criterion(output, Y_train)
    # Backpropagate
    loss.backward()
    # Update weights
    optimizer.step()

    prog_bar.set_description(f"loss: {loss.data}")

loss: 26.59394073486328: 100%|██████████████████| 50/50 [00:05<00:00,  9.58it/s]


In [15]:
model.eval()

print(f'train accuracy: {(F.softmax(model(X_train), dim=1).argmax(dim=1) == Y_train).double().mean() * 100} %')
print(f'test accuracy: {(F.softmax(model(X_test), dim=1).argmax(dim=1) == Y_test).double().mean() * 100} %')

train accuracy: 10.5 %
test accuracy: 4.8 %
