# Deep Learning Crash Course
## Pytorch tutorial

## Test title

In [None]:
import torch
torch.__version__

import numpy as np

## Tensors

In [None]:
size = [3, 4, 5]
x = torch.randn(size)
print(x.shape)

In [None]:
x = torch.ones(size)
x

In [None]:
x = torch.zeros(size)
print(x)

In [None]:
nx = np.random.rand(2, 3)
x = torch.from_numpy(nx)
x

In [None]:
x = torch.tensor([2, 3, 4 ,5]) 
x.shape

In [None]:
x = torch.tensor([[2, 3], [4, 5]]) 
print(x.shape)
print(x)

In [None]:
nx = x.numpy()
print(nx)

In [None]:
x = torch.tensor([[2, 3], [4, 5], [6, 7]]) 
print(x.shape)
print(x)

y = x[0:2, :]
print(y)

In [None]:
x = torch.tensor([[2, 3], [4, 5], [6, 7]])
y = torch.tensor([[2, 3], [4, 5], [6, 7]])+0.2
z = torch.cat((x, y), axis=1)
print(z.shape)

Check the pytorch page : https://pytorch.org/docs/stable/tensors.html

## Autograd

In [None]:
x = torch.tensor(1.0, requires_grad = True)
z = x ** 3
z.backward()
print(x.grad.data)

In [None]:
a = torch.randn((3,3), requires_grad = True)

w1 = torch.randn((3,3), requires_grad = True)
w2 = torch.randn((3,3), requires_grad = True)
w3 = torch.randn((3,3), requires_grad = True)
w4 = torch.randn((3,3), requires_grad = True)

b = w1@a 
c = w2*a

d = w3*b + w4*c 
d.retain_grad()

L = torch.mean(10 - d)

print(L)

In [None]:
L.backward()

print(a.grad.data.shape)
print(w1.grad.data.shape)

In [None]:
d.is_leaf
a.is_leaf

In [None]:
print(d.requires_grad)
print(d.grad.data)

In [None]:
d.retain_grad()

Please read more on https://pytorch.org/tutorials/beginner/blitz/autograd_tutorial.html

## Data Loader

In [None]:
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

In [None]:
print(training_data)

In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)

In [None]:
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[12].squeeze()
label = train_labels[12]
plt.imshow(img, cmap="gray")
plt.show()
print(f"Label: {label}")

### write your own data loader

In [None]:
def plot_image_array(im, label, label_names, columns=4, figsize=[32, 32]):
    """plot images as a panel with columns

    Args:
        im ([H, W, 3, N]): images to plot
        columns (int, optional): number of columns in the plot. Defaults to 4.
        figsize (list, optional): figure size. Defaults to [32, 32].

    Returns:
        fig : handle to figure
    """
    fig=plt.figure(figsize=figsize)    

    H, W, C, N = im.shape
    
    rows = np.ceil(N/columns)
    for i in range(1, N+1):
        fig.add_subplot(rows, columns, i)
        if(len(im.shape)==4):
            plt.imshow(im[:,:,:,i-1])
        else:
            plt.imshow(im)
        plt.title(label_names[label[i-1]], fontsize=20)

        plt.axis('off')
    plt.show()
    
    return fig

In [None]:
import sys
from pathlib import Path
import numpy as np
import matplotlib 

import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader, sampler
import torchvision.transforms as transforms

Project_DIR = '.'

sys.path.append(Project_DIR)

from util import *

class Cifar10Dataset(Dataset):
    """Dataset for cifar-10."""

    def __init__(self, X, Y, transform=None):
        """Initialize the dataset

        Store the X an Y into self.images and self.labels
        Make sure self.images are in the dimension [N, C, H, W]

        Args:
            X ([32, 32, 3, N]): images
            Y ([N]): labels
        """
        # *** START CODE HERE ***
        self.images = np.transpose(X, (3, 2, 0, 1))
        self.labels = Y
        assert self.images.shape[0]==self.labels.shape[0]
        # *** END CODE HERE ***
        
        self.transform = transform
        
    def __len__(self):
        """Get the number of samples in this dataset.

        Returns:
            number of samples
        """
        # *** START CODE HERE ***
        return self.images.shape[0]
        # *** END CODE HERE ***

    def __getitem__(self, idx):
        """Get the idx sample

        Args:
            idx (int): the index of sample to get; first sample has idx being 0

        Returns:
            sample : a tuple (image, label)
        """
        # *** START CODE HERE ***
        N, C, H, W = self.images.shape
        
        if idx >= N:
            raise "invalid index"

        im = self.images[idx,:,:,:]
        if self.transform:
            # note the torchvision requires input image in [H, W, C]
            im = self.transform(np.transpose(im, (1,2,0)))

        return (im, self.labels[idx])
        # *** END CODE HERE ***
        
    def __str__(self):
        str = "Cifar 10 Dataset\n"
        str += "  Number of images: %d" % self.images.shape[0] + "\n"
        str += "  Number of labels: %d" % self.labels.shape[0] + "\n"
        str += "  transform : %s" % (self.transform) + "\n"
        str += "  image shape: %d %d %d" % self.images.shape[1:] + "\n"
            
        return str

In [None]:
# enable the interactive plotting
matplotlib.use("tkagg")

# load dataset
cifar10_dataset = load_and_prepare_data(os.path.join(Project_DIR, "data"), subtract_mean=False)

with open(os.path.join(Project_DIR, "data", "batches.meta"), "rb") as f:
    label_names = pickle.load(f, encoding="latin1")

In [None]:
print(cifar10_dataset.keys())
print(cifar10_dataset['X_train'].shape, cifar10_dataset['Y_train'].shape)

In [None]:
# plot the data
f = plot_image_array(cifar10_dataset['X_train'][:,:,:,0:16], cifar10_dataset['Y_train'][0:16], label_names['label_names'], columns=4, figsize=[16, 16])

In [None]:
# test dataset
train_set = Cifar10Dataset(cifar10_dataset['X_train'], cifar10_dataset['Y_train'], transform=None)
print("Information for training set ... ", train_set)
test_set = Cifar10Dataset(cifar10_dataset['X_test'], cifar10_dataset['Y_test'], transform=None)
print("Information for test set ... ", test_set)

In [None]:
# directly get one sample
im, label = train_set[12]
print("Get one sample ", im.shape)

# create and load a batch
batch_size = 16
num_validation = 1000

dataset_size = len(train_set)
dataset_indices = list(range(dataset_size))

np.random.shuffle(dataset_indices)
train_idx, val_idx = dataset_indices[num_validation:], dataset_indices[:num_validation]

loader_for_train = DataLoader(train_set, batch_size=batch_size, sampler=sampler.SubsetRandomSampler(train_idx), pin_memory=True)
loader_for_val = DataLoader(train_set, batch_size=batch_size, sampler=sampler.SubsetRandomSampler(val_idx), pin_memory=True)

# no need to shuffle the test set
loader_for_test = DataLoader(test_set, batch_size=batch_size, pin_memory=True)

In [None]:
# plot a batch
iter_train = iter(loader_for_train)
images, labels = iter_train.next()
f = plot_image_array(np.transpose(images.numpy(), (2,3,1,0)), labels.numpy(), label_names['label_names'], columns=4, figsize=[32, 32])

iter_val = iter(loader_for_val)
images, labels = iter_val.next()
f = plot_image_array(np.transpose(images.numpy(), (2,3,1,0)), labels.numpy(), label_names['label_names'], columns=4, figsize=[32, 32])

iter_test = iter(loader_for_test)
images, labels = iter_test.next()
f = plot_image_array(np.transpose(images.numpy(), (2,3,1,0)), labels.numpy(), label_names['label_names'], columns=4, figsize=[32, 32])

In [None]:
# now, add some random transformation
transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.RandomRotation(degrees=90.0)
])

# set the transform
train_set.transform = transform

# now plot a batch
images, labels = iter_train.next()
f = plot_image_array(np.transpose(images.numpy(), (2,3,1,0)), labels.numpy(), label_names['label_names'], columns=4, figsize=[32, 32])

## Train a model

In [None]:
def compute_test_accuracy(loader, model, loss_func, device=torch.device('cpu')):
    
    running_loss_train = 0.0
    total = 0
    correct = 0
    
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(loader, 0):
            x, y = data            
            x = x.to(device=device, dtype=torch.float32)
            y = y.to(device=device, dtype=torch.long)
                                          
            y_hat = model(x)
            loss = loss_func(y_hat, y)
            
            _, predicted = torch.max(y_hat.data, 1)
            total += y.size(0)
            correct += (predicted == y).sum().item()
            
            running_loss_train += loss.item()

        loss = running_loss_train / (i+1)
        accu = correct / total
    
    return loss, accu

In [None]:
num_epochs = 30
num_hidden_layers = [300, 200, 200, 200, 100]
batch_size = 4096
reg = 1e-4
learning_rate = 0.1
use_gpu = True
one_batch_training = False

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class PytorchMLP(nn.Module):

    def __init__(self, H, W, C, num_hidden_layers):
        """Initial the model

        Please create the pytorch layers for MLP. Please use ReLU as the nonlinear activation. 
        Hints: torch.nn.Sequential may be useful. Also, check torch.nn.Linear and torch.nn.ReLU

        Args:
            H (int): Height of input image
            W (int): Width of input image
            C (int): Number of channels of input image
            num_hidden_layers (list, optional): number of hidden layers. Defaults to [300, 300, 200, 100].
        """
        super().__init__()
        
        self.blocks = nn.Sequential()
        for i, num_neurons in enumerate(num_hidden_layers):
            if(i==0):
                input_dim = int(H*W*C)
            else:
                input_dim = num_hidden_layers[i-1]
                
            output_dim = num_neurons
                
            self.blocks.add_module("fc_%d" % i, nn.Linear(input_dim, output_dim, bias=True))
            self.blocks.add_module("relu_%d" % i, nn.ReLU())
                
        self.blocks.add_module("fc_output", nn.Linear(output_dim, 10, bias=True))
        
    def forward(self, x):
        """Forward pass of MLP model

        Args:
            x ([B, C, H, W]): a batch of input image

        Returns:
            output ([B, 10]): logits tensor, ready for the softmax
        """
        x = torch.flatten(x, 1)
        x = self.blocks(x)
        return x

In [None]:
def run_training(cifar10_dataset, num_samples_validation=1000):
    """Run the training

    Inputs:
        cifar10_dataset : dataset loaded with utlity functions
        num_samples_validation : number of samples for validation

    Outputs:
        model : model after training
        loss_train, loss_val : loss for every epoch
        accu_train, accu_val : accuracy for every epoch
    """

    # add some data transformation
    transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomVerticalFlip(p=0.5)
    ])

    # set up the data loader
    train_set = Cifar10Dataset(cifar10_dataset['X_train'], cifar10_dataset['Y_train'], transform=transform)
    # do not add data augmentation to test set !
    test_set = Cifar10Dataset(cifar10_dataset['X_test'], cifar10_dataset['Y_test'], transform=None)
    
    # create and load a batch    
    dataset_size = len(train_set)
    dataset_indices = list(range(dataset_size))
    np.random.shuffle(dataset_indices)

    if(one_batch_training):
        print("Train with only one batch")
        train_idx, val_idx = dataset_indices[num_samples_validation:num_samples_validation+batch_size], dataset_indices[:num_samples_validation]
    else:
        train_idx, val_idx = dataset_indices[num_samples_validation:], dataset_indices[:num_samples_validation]

    loader_for_train = DataLoader(train_set, batch_size=batch_size, sampler=sampler.SubsetRandomSampler(train_idx), pin_memory=True)
    loader_for_val = DataLoader(train_set, batch_size=batch_size, sampler=sampler.SubsetRandomSampler(val_idx), pin_memory=True)

    H, W, C, B = cifar10_dataset['X_train'].shape
    
    # declare the model
    m = PytorchMLP(H, W, C, num_hidden_layers)
    print(m)        
    
    # declare the loss function
    loss_func = nn.CrossEntropyLoss()
    
    # declare the optimizer
    optimizer = optim.SGD(m.parameters(), lr=learning_rate, momentum=0.9, weight_decay=reg)
    
    # check the device
    if torch.cuda.is_available():
        device = torch.device('cuda')
        
    if (use_gpu is False):
        device = torch.device('cpu')
    
    # set model to device
    m.to(device=device)

    loss_train = []
    loss_val = []
    accu_train = []
    accu_val = []

    # train for num_epochs
    for epoch in range(num_epochs):

        m.train()
        
        # go through all mini-batches for this epoch
        running_loss_train = 0.0
        running_accu_train = 0.0
        for i, data in enumerate(loader_for_train, 0):

            x, y = data
            x = x.to(device=device, dtype=torch.float32) 
            y = y.to(device=device, dtype=torch.long) 
                          
            # forward pass, put the model output to y_hat
            y_hat = m(x)

            # compute loss
            loss = loss_func(y_hat, y)

            # zero the parameter gradients
            optimizer.zero_grad()
        
            # backprop
            loss.backward()
            
            # perform gradient descent step
            optimizer.step()
            
            running_loss_train += loss.item()            
            running_accu_train += compute_accuracy(y_hat.detach().cpu(), y.detach().cpu())
            
        # after one epoch, compute training loss and accuracy
        loss_train.append(running_loss_train/(i+1))
        accu_train.append(running_accu_train/(i+1))

        # after one epoch, compute validation loss and accuracy
        lv, av = compute_test_accuracy(loader_for_val, m, loss_func, device=device)
        loss_val.append(lv)
        accu_val.append(av)

        print('epoch %d, train loss %f, accuracy %f - val loss %f, accuracy %f' % (epoch, loss_train[epoch], accu_train[epoch], loss_val[epoch], accu_val[epoch]))

    # compute test accuracy
    test_set = Cifar10Dataset(cifar10_dataset['X_test'], cifar10_dataset['Y_test'], transform=None)
    loader_for_test = DataLoader(test_set, batch_size=batch_size, pin_memory=True)
    loss_test, accu_test = compute_test_accuracy(loader_for_test, m, loss_func, device=device)
    
    return m, loss_train, loss_val, loss_test, accu_train, accu_val, accu_test

In [None]:

num_samples_validation = 3000
best_model, loss_train, loss_val, loss_test, accu_train, accu_val, accu_test = run_training(cifar10_dataset, num_samples_validation)

In [None]:
fig, (ax1, ax2) = plt.subplots(2, 1)

ax1.plot(np.arange(num_epochs), loss_train,'r', label='train')
ax1.plot(np.arange(num_epochs), loss_val, 'b', label='validation')
ax1.set_xlabel('epochs')
ax1.set_ylabel('loss')
ax1.legend()

ax2.plot(np.arange(num_epochs), accu_train,'r', label='train')
ax2.plot(np.arange(num_epochs), accu_val, 'b', label='validation')
ax2.set_xlabel('epochs')
ax2.set_ylabel('accuracy')
ax2.legend()