In [46]:
## Load MNIST
import torch
from torchvision import datasets
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import ToTensor

SPLIT = 0.8
BATCH_SIZE = 64

train_dataset = datasets.MNIST("data", train=True, download=True, transform=ToTensor())
test_dataset = datasets.MNIST("data", train=False, download=True, transform=ToTensor())
train_dataset, valid_dataset = random_split(train_dataset, [int(SPLIT*len(train_dataset)), len(train_dataset)-int(SPLIT*len(train_dataset))], generator=torch.Generator().manual_seed(42))

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False) # Note that shuffle is turned off for the test set

print(f"> Data Loaded: \n  {len(train_dataset):,} train\n  {len(valid_dataset):,} valid\n  {len(test_dataset):,} test")


> Data Loaded: 
  48,000 train
  12,000 valid
  10,000 test


In [49]:
next(iter(train_dataloader))

RuntimeError: Numpy is not available

In [None]:
## Show image batch
import matplotlib.pyplot as plt
import numpy as np

def show_image_batch(dataloader, n_rows=8, figsize=(10,10)):
    images, labels = next(iter(dataloader))
    N = len(images)
    fig, axs = plt.subplots(n_rows, N//n_rows, figsize=(10,10), constrained_layout=True)
    
    for image, label, ax in zip(images, labels, axs.ravel()[:N]):
        ax.set_title(label.item())
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(image.squeeze())

show_image_batch(train_dataloader, n_rows=8)

In [52]:
## Device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {DEVICE} device")

Using cpu device


In [None]:
## Custom Dataset
import os
import pandas as pd
from torch.utils.data import Dataset
from torchvision.io import read_image

class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [None]:
## Simple Neural Network
import torch.nn as nn
from collections import OrderedDict

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            OrderedDict([
                ("linear1", nn.Linear(28*28, 512)),
                ("relu1", nn.ReLU()),
                ("linear2", nn.Linear(512, 512)),
                ("relu2", nn.ReLU()),
                ("linear3", nn.Linear(512, 10)),
            ])
            
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [None]:
## LeNet
from collections import OrderedDict

model = nn.Sequential(OrderedDict([
    # First Convolution Block
    ("conv1", nn.Conv2d(1, 6, 5, padding=2)),
    ("relu1", nn.ReLU()),
    ("avgpool1", nn.AvgPool2d(2, stride=2)),
    
    # Second Convolution Block
    ("conv2", nn.Conv2d(6, 16, 5, padding=2)),
    ("relu2", nn.ReLU()),
    ("avgpool2", nn.AvgPool2d(2, stride=2)),
    
    # Classifier
    ("flatten", nn.Flatten()),
    ("linear1", nn.Linear(784, 120)),
    ("relu3", nn.ReLU()),
    ("linear2", nn.Linear(120, 84)),
    ("relu4", nn.ReLU()),
    ("linear3", nn.Linear(84, 10)),
]))

In [None]:
## Loss function
import torch.nn as nn

loss_fn = nn.CrossEntropyLoss()

In [53]:
## Optimizer
import torch

LEARNING_RATE = 0.001
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

NameError: name 'model' is not defined

In [None]:
## Simple Training loop

def train_model(model, dataloader, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [78]:
a = time.perf_counter()
time.sleep(2)
b = time.perf_counter()
b-a

2.0057309999974677

In [79]:
## Training loop
from tqdm.notebook import tqdm, trange
import time

def train_step(model, dataloader, loss_fn, optimizer, current_epoch, num_epochs):
    
    # tqdm progressbar
    pbar = tqdm(total=len(dataloader.dataset), desc=f"Epoch {current_epoch}/{num_epochs} ")
    
    # For every batch
    for batch, (X, y) in enumerate(dataloader):
        
        # Compute prediction and loss
        pred = model(X)
        train_loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()
        
        # Calculate losses
        train_loss = train_loss.item()
        valid_loss = train_loss + 0.003
        
        # Update tqdm progressbar
        pbar.set_postfix({"Train Loss":f"{train_loss:.6f}", "Valid Loss":f"{valid_loss:.6f}"})
        pbar.update(len(X))
        
    return train_loss, valid_loss

def train_model(model, train_dataloader, loss_fn, optimizer, epochs, weights_path="model.pt"):
    # Track time
    start_time = time.perf_counter()
    
    # Best model and loss
    best_valid_loss = float('inf')
    best_model = model
    
    # Loss histories for plotting
    train_loss_history = []
    valid_loss_history = []
    
    # For every epoch
    for i in range(epochs):
        # Train step
        train_loss, valid_loss = train_step(model, train_dataloader, loss_fn, optimizer, current_epoch=i+1, num_epochs=epochs)
        
        # Track losses
        train_loss_history.append(train_loss)
        valid_loss_history.append(valid_loss)
        
        # Save best model
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            best_model = copy.deepcopy(model)
            torch.save(best_model.state_dict(), weights_path)
            tqdm.write(f"> Best Model Saved: {weights_path}")
    
    print(f"Training Done in {time.perf_counter() - start_time} seconds.")
    print(f"best Validation Loss: {best_valid_loss:.6f}")
    
    return best_model, train_loss_history, valid_loss_history

best_model, best_loss = train_model(model, train_dataloader, loss_fn, optimizer, epochs=4, weights_path="model.pt")


NameError: name 'model' is not defined

In [None]:
## Plot loss

def plot_loss(train_loss_history, valid_loss_history):
    plt.figure()
    plt.title("Loss")
    plt.xlabel("epoch")
    plt.ylabel("Loss")
    
    plt.plot(np.arange(len(train_loss_history)), train_loss_history)
    plt.plot(np.arange(len(valid_loss_history)), valid_loss_history)
    
    plt.legend(["Train Loss","Valid Loss"])
    plt.show()

plot_loss(train_loss_history, valid_loss_history)

In [None]:
model = Neural_Network()
model.to(DEVICE)
model.load_state_dict(torch.load('/content/best-weights.pt'))

In [None]:
## Torchvision transforms

from torchvision import transforms as T 

train_augs = T.Compose([
    T.RandomHorizontalFlip(p = 0.5),
    T.RandomRotation(degrees=(-20, +20)),
    T.ToTensor()

])

valid_augs = T.Compose([
    T.ToTensor()
])

In [None]:
# Load datasets using ImageFolder
from torchvision.datasets import ImageFolder

trainset = ImageFolder("data/train", transform=train_augs)
validset = ImageFolder("data/valid", transform=valid_augs)

In [None]:
data_transforms = {
    'train': T.Compose([
        T.RandomResizedCrop(INPUT_SIZE),
        T.RandomHorizontalFlip(),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'valid': T.Compose([
        T.Resize(INPUT_SIZE),
        T.CenterCrop(INPUT_SIZE),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}