# Familiarisation with PyTorch

In [None]:
# pytorch
import torch
from torchvision import transforms,datasets
from torch.utils.data import Dataset,DataLoader
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
import torch.nn.functional as F

# pyplot
import matplotlib.pyplot as plt
from MNIST_dataloader import *

Data loading

In [None]:
    # define parameters
    data_loc = os.path.abspath(".") #change the datalocation to something that works for you
    batch_size = 64
    
    # get dataloader
    train_loader,val_loader, test_loader = create_dataloaders(data_loc, batch_size)
    
    # get some examples
    examples = enumerate(test_loader)
    _, (x_clean_example, x_noisy_example, labels_example) = next(examples)
    # use these example images througout the assignment as the first 10 correspond to the digits 0-9
    
    # show the examples in a plot
    plt.figure(figsize=(12,3))
    for i in range(10):
        plt.subplot(2,10,i+1)
        plt.imshow(x_clean_example[i,0,:,:],cmap='gray')
        plt.xticks([])
        plt.yticks([])
        
        plt.subplot(2,10,i+11)
        plt.imshow(x_noisy_example[i,0,:,:],cmap='gray')
        plt.xticks([])
        plt.yticks([])
    
    plt.tight_layout()
    plt.savefig(r"figure\data_examples.png",dpi=300,bbox_inches='tight')
    plt.show()

check data size

In [None]:
USE_GPU = True
dtype = torch.float32
if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Interval for printing loss during training
print_every = 100

print(f'Using device:{device}')

## b) Build up a fully connected network with linear layers

In [None]:
class ThreeLayerFC_o_act(nn.Module):
    def __init__(self,in_neuron,hidden1_neuron,hidden2_neuron,out_neuron):
        super().__init__()
        self.fc1 = nn.Linear(in_neuron,hidden1_neuron)
        self.fc2 = nn.Linear(hidden1_neuron,hidden2_neuron)
        self.fc3 = nn.Linear(hidden2_neuron,out_neuron)
    def forward(self,x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

def test_ThreeLayerFC():
    x = torch.zeros((64,32,32)).view((64,32*32))
    model = ThreeLayerFC_o_act(in_neuron=32*32,hidden1_neuron=64*64,hidden2_neuron=128*128,out_neuron=32*32)
    result = model(x)
    print(result.shape)

test_ThreeLayerFC()

In [None]:
# trainer, loss calculation and plot

def flatten(x):
    return x.view(len(x),-1)

def loss_epoch(loss_ls,model,loader) -> list:
    model.eval() # model in evaluation mode
    with torch.no_grad():
        loss_epoch=0
        for t,(x_clean,x_noisy,_) in enumerate(loader):
            x_noisy = x_noisy.to(device=device, dtype=torch.float32)
            x_clean = x_clean.to(device=device, dtype=torch.float32)
            out_x = model(flatten(x_noisy))
            loss=F.mse_loss(out_x,flatten(x_clean))
            loss_epoch += loss
        loss_ls.append(loss_epoch/t)

def loss_batch(loss_ls,model,loader) -> list:
    model.eval() # model in evaluation mode
    with torch.no_grad():
        for _,(x_clean,x_noisy,_) in enumerate(loader):
            x_noisy = x_noisy.to(device=device, dtype=torch.float32)
            x_clean = x_clean.to(device=device, dtype=torch.float32)
            out_x = model(flatten(x_noisy))
            loss=F.mse_loss(out_x,flatten(x_clean))
        loss_ls.append(loss)   

def trainer(model,optimizer, epochs=1):
    loss_train = []
    loss_val = []
    model = model.to(device=device)
    for e in range(epochs):
        for _,(x_clean,x_noisy,_) in enumerate(train_loader):
            x_noisy = x_noisy.to(device=device, dtype=torch.float32)
            x_clean = x_clean.to(device=device, dtype=torch.float32)
            model.train() # training mode
            out_x = model(flatten(x_noisy))
            loss = F.mse_loss(out_x,flatten(x_clean))
            optimizer.zero_grad() #reset gradients
            loss.backward()
            optimizer.step()
        loss_epoch(loss_train,model,train_loader)
        loss_epoch(loss_val,model,val_loader)
        print(f'epoch {e+1}:train loss={loss_train[e]},val loss={loss_val[e]}')
    return loss_train,loss_val

def loss_plot(loss_train,loss_val,epoch,figname:str):
    x = range(1,epoch+1)
    plt.plot(x,loss_train,'r-',label="training loss")
    plt.plot(x,loss_val,'k:',label="validation loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss(MSE)")
    plt.title("Train/Val Loss")
    plt.legend()
    plt.savefig(f"figure/{figname}.png")
    plt.show()

def prediction(model,x): #test
    model = model.to(device=device)
    model.eval()
    with torch.no_grad():
        x = x.to(device=device,dtype=torch.float32)
        out_x = torch.Tensor.cpu(model(flatten(x)))
    return out_x


In [None]:
in_neuron=32*32
hidden1_neuron=64*64
hidden2_neuron=64*64
out_neuron=32*32

lr = 1e-3

model_o_act = ThreeLayerFC_o_act(in_neuron,hidden1_neuron,hidden2_neuron,out_neuron)
optimizer = optim.SGD(model_o_act.parameters(),lr) #default momentum=0
epoch=20

train_loss,val_loss = trainer(model_o_act,optimizer,epoch)
loss_plot(train_loss,val_loss,epoch,"loss_o_act")
PATH = os.path.join(os.path.abspath("."), "model")
torch.save(model_o_act,PATH)

In [None]:
model_o_act_untrained = ThreeLayerFC_o_act(in_neuron,hidden1_neuron,hidden2_neuron,out_neuron)

# get some examples
examples = enumerate(test_loader)
_, (x_clean_example, x_noisy_example, labels_example) = next(examples)

# prediction
x_untrained_example = prediction(model_o_act_untrained,x_noisy_example).reshape(64,1,32,32)
x_trained_example = prediction(model_o_act,x_noisy_example).reshape(64,1,32,32)

# comparison of trained/untrained examples with clean and noisy figures
plt.figure(figsize=(40,10))
for i in range(10):
    plt.subplot(4,10,i+1)
    plt.imshow(x_clean_example[i,0,:,:],cmap='gray')
    plt.xticks([])
    plt.yticks([])
    
    plt.subplot(4,10,i+11)
    plt.imshow(x_noisy_example[i,0,:,:],cmap='gray')
    plt.xticks([])
    plt.yticks([])

    plt.subplot(4,10,i+21)
    plt.imshow(x_untrained_example[i,0,:,:],cmap='gray')
    plt.xticks([])
    plt.yticks([])

    plt.subplot(4,10,i+31)
    plt.imshow(x_trained_example[i,0,:,:],cmap='gray')
    plt.xticks([])
    plt.yticks([])
        
plt.tight_layout()
plt.savefig(r"figure/data_examples_o_act.png",dpi=300,bbox_inches='tight')
plt.show()

## Linear layers with activation(ReLu)

In [None]:
def ReLu(x):
    y = torch.zeros_like(x)
    return torch.where(x<=0,x,y)

#test ReLu
test_relu= torch.randn(3,2)
print(test_relu)
print(ReLu(test_relu))

In [None]:
# plot the relu
plot_relu_x = torch.arange(-100,100,1)
plot_relu_y = ReLu(plot_relu_x)
plt.plot(plot_relu_x,plot_relu_y,'r-',label="ReLu")
plt.xlabel("x")
plt.ylabel("ReLU(x)")
plt.title("Output of ReLu function")
plt.legend()
plt.savefig(r'figure/ReLu.png')
plt.show()

In [None]:
# model with ReLU activation
class ThreeLayerFC_act(nn.Module):
    def __init__(self,in_neuron,hidden1_neuron,hidden2_neuron,out_neuron):
        super().__init__()
        self.fc1 = nn.Linear(in_neuron,hidden1_neuron)
        self.fc2 = nn.Linear(hidden1_neuron,hidden2_neuron)
        self.fc3 = nn.Linear(hidden2_neuron,out_neuron)
    def forward(self,x):
        x = ReLu(self.fc1(x))
        x = ReLu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
model_act = ThreeLayerFC_act(in_neuron,hidden1_neuron,hidden2_neuron,out_neuron)
optimizer = optim.SGD(model_o_act.parameters(),lr) #default momentum=0
epoch=100

train_loss,val_loss = trainer(model_o_act,optimizer,epoch)
loss_plot(train_loss,val_loss,epoch,"loss_o_act")
PATH = os.path.join(os.path.abspath("."), "model")
torch.save(model_o_act,PATH)