In [1]:
import glob
import random
import numpy as np
from PIL import Image


import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import ToTensor
from torch.utils.data import Dataset, DataLoader

In [2]:
lr = 0.001

In [3]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed()

In [4]:
set_seed()

train_images = glob.glob("data/training/*/*")
test_images = glob.glob("data/testing/*/*")

random.shuffle(train_images)
random.shuffle(test_images)

train_images = train_images[:500]
test_images = test_images[:500]

len(train_images), len(test_images)

(500, 500)

In [5]:
train_images[:2]

['data/training/0/49200.png', 'data/training/6/15591.png']

In [6]:
test_images[:2]

['data/testing/9/6655.png', 'data/testing/6/9938.png']

In [7]:
def read_image(path):
    img_pil = Image.open(path)
    img = np.array(img_pil)
    return img

In [8]:
class MNISTDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_dir)

    def __getitem__(self, idx):
        img_path = self.img_dir[idx]
        image = read_image(img_path)
        label = int(img_path.split("/")[-2])

        if self.transform:
            image = self.transform(image)

        return image, label

In [9]:
training_data = MNISTDataset(train_images, ToTensor())
test_data = MNISTDataset(test_images, ToTensor())

train_dataloader = DataLoader(training_data, batch_size=4, shuffle=False)
test_dataloader = DataLoader(test_data, batch_size=4, shuffle=False)

In [10]:
for x, y in train_dataloader:
    print(x.shape)
    print(y)
    break

torch.Size([4, 1, 28, 28])
tensor([0, 6, 6, 4])


In [104]:
device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device} device")

Using cuda device


In [124]:
class ReLU(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

    def relu(self, x):
        zero = torch.tensor([0]).to(self.device)
        x = x.to(device)
        return torch.max(zero, x)
    
    def forward(self, x):
        return self.relu(x)

In [125]:
class Linear(nn.Module):

    def __init__(self, in_features, out_features, bias=True):
        super().__init__()
        device = "cuda" if torch.cuda.is_available() else "cpu"
        root_k = (1 / in_features) ** 0.5
        self.in_features = in_features
        self.out_features = out_features
        self.bias = bias
        self.w = nn.Parameter(torch.empty(in_features, out_features, device=device))
        self.b = nn.Parameter(torch.empty(out_features, device=device)) if self.bias else None
        
        nn.init.uniform_(self.w, -root_k, root_k)
        nn.init.uniform_(self.b, -root_k, root_k)

    
    def forward(self, x):
        x = x @ self.w
        if self.b is not None:
            x += self.b
        return x
    
    def __repr__(self):
        return f"Linear(in_features={self.in_features}, out_features={self.out_features}, bias={self.bias})"

In [126]:
class Sequential(nn.Module):
    def __init__(self, *args):
        super().__init__()
        self.layers = nn.ModuleList(args)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

In [127]:
class Flatten(nn.Module):
        
    def forward(self, x):
        size = x.size
        x = x.view(size(0), -1)
        return x

In [128]:
set_seed()

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = Flatten()
        self.layers = Sequential(
            Linear(28*28, 512),
            nn.BatchNorm1d(512),
            ReLU(),
            Linear(512, 512),
            nn.BatchNorm1d(512),
            ReLU(),
            Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        x = self.layers(x)
        return x

model = Net().to(device)
print(model)

Net(
  (flatten): Flatten()
  (layers): Sequential(
    (layers): ModuleList(
      (0): Linear(in_features=784, out_features=512, bias=True)
      (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Linear(in_features=512, out_features=512, bias=True)
      (4): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
      (6): Linear(in_features=512, out_features=10, bias=True)
    )
  )
)


In [129]:
# tensor(2.2775, device='cuda:0', grad_fn=<NllLossBackward0>)
# after custom Linear: tensor(2.3203, device='cuda:0', grad_fn=<DivBackward0>)

In [130]:
class CrossEntropyLoss:
    
    def log_softmax(self, x, dim):
        softmax = torch.exp(x) / torch.exp(x).sum(axis=dim, keepdims=True)
        return torch.log(softmax)

    def __call__(self, pred, y):
        '''custom cross entropy loss'''
        batch_size = y.size(0)
        log_softmax = self.log_softmax(pred, dim=1)
        per_batch_ce = [log_softmax[i][y[i]] for i in range(batch_size)]
        summed = sum(per_batch_ce)
        ce = -summed / batch_size
        return ce

loss_fn = CrossEntropyLoss()

In [131]:
x, y = next(iter(test_dataloader))
x, y = x.to(device), y.to(device)
pred = model(x)
loss = loss_fn(pred, y)
loss

tensor(2.6374, device='cuda:0', grad_fn=<DivBackward0>)

In [132]:
def train(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        pred = model(X)
        loss = loss_fn(pred, y)

        for p in model.parameters():
            p.grad = None

        loss.backward()

        for p in model.parameters():
            p.data += -lr * p.grad

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [133]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [134]:
epochs = 2
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.194601  [    4/  500]
loss: 1.846271  [  404/  500]
Test Error: 
 Accuracy: 56.4%, Avg loss: 1.794078 

Epoch 2
-------------------------------
loss: 1.608880  [    4/  500]
loss: 1.258173  [  404/  500]
Test Error: 
 Accuracy: 68.6%, Avg loss: 1.496804 

Done!


In [135]:
# Accuracy: 18.2%, Avg loss: 2.294005
# after custom Linear: Accuracy: 19.6%, Avg loss: 2.289190

In [136]:
total_correct = 0
total_no = 0
for x, y in test_dataloader:
    x = x.to(device)
    pred = model(x).argmax(1).cpu()
    correct = (pred == y).long().sum().item()
    total_correct += correct
    total_no += x.shape[0]

accuracy = total_correct / total_no
print(f"Accuracy is: {accuracy*100:.2f}%")

Accuracy is: 68.60%
