# IMPORTS

In [None]:
#
!pip install pycm livelossplot
%pylab inline

from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedShuffleSplit
from livelossplot import PlotLosses
from pycm import *
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import torchvision.datasets
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torchsummary import summary
import matplotlib.pyplot as plt
import numpy as np
import random

### Setting the seed and setting up cuda/gpu

In [None]:
def set_seed(seed):
    """
    Use this to set ALL the random seeds to a fixed value and take out any randomness from cuda kernels
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.benchmark = False  ## uses the inbuilt cudnn auto-tuner to find the fastest convolution algorithms. -
    torch.backends.cudnn.enabled   = False

    return True

device = 'cpu'
if torch.cuda.device_count() > 0 and torch.cuda.is_available():
    print("Cuda installed! Running on GPU!")
    device = 'cuda'
else:
    print("No GPU available!")

## CUSTOM DATASET

In [None]:
class ChestMNIST(Dataset):
    def __init__(self, data_path, split="train", p=0.2, transform=None):
        self.data_path = data_path
        self.split = split
        self.p = p
        self.transform = transform
        if self.split == "train":
            self.data = np.load(self.data_path)["train_images"]        
        elif self.split == "test":
            self.data = np.load(self.data_path)["test_images"]        
        elif self.split == "val":
            self.data = np.load(self.data_path)["val_images"]
        else:
            return ValueError("wrong split")
            
    def _get_mask(self, img_shape):
        # Create a method for your class _get_mask, that generates a binary mask
        # of the size of the sample to randomly erase some data points based on
        # the probability p

        mask = np.random.uniform(0, 1, size=(img_shape)) > self.p
        return mask

    def __getitem__(self, idx):
        sample = self.data[idx]

        if self.transform:
            sample = self.transform(sample)
        
        # masked image (want to pred), actual image (our target)
        return sample * self._get_mask(sample.shape), sample
        
    def __len__(self):
        return self.data.shape[0]

## get a validation set

In [None]:
#

from sklearn.model_selection import StratifiedShuffleSplit
shuffler = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42).split(mnist_train.data, mnist_train.targets)
# gives you n, n-1 random ids that correspond to test and train subsets
# n is the train proportion 
train_idxs, valid_idxs = [(train_idx, valid_idx) for train_idx, valid_idx in shuffler][0]

X_train, y_train = mnist_train.data[train_idxs], mnist_train.targets[train_idxs]
X_val, y_val =     mnist_train.data[valid_idxs], mnist_train.targets[valid_idxs]
X_test, y_test =   mnist_test.data, mnist_test.targets

# NOTE torch categorical data = .long()
mnist_train =    TensorDataset(X_train, y_train.long())
mnist_validate = TensorDataset(X_val, y_val.long())
mnist_test =     TensorDataset(X_test, y_test.long())

# from sklearn.model_selection import StratifiedShuffleSplit
# shuffler = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42).split(mnist_train.data, mnist_train.targets)
# # gives you n, n-1 random ids that correspond to test and train subsets
# # n is the train proportion 
# train_idxs, valid_idxs = [(train_idx, valid_idx) for train_idx, valid_idx in shuffler][0]
# X_train, y_train = apply_standardization(mnist_train.data[train_idxs].float()), mnist_train.targets[train_idxs]
# X_val, y_val =     apply_standardization(mnist_train.data[valid_idxs].float()), mnist_train.targets[valid_idxs]
# X_test, y_test =   apply_standardization(mnist_test.data.float()), mnist_test.targets
# mnist_train =    TensorDataset(X_train, y_train.long())
# mnist_validate = TensorDataset(X_val, y_val.long())
# mnist_test =     TensorDataset(X_test, y_test.long())

# Transformations

In [None]:
train_transform = Compose([
    ToTensor(),
    Normalize(mean=[0.1307], std=[0.3081]),
])

validation_test_transform = Compose([
    Normalize(mean=[0.1307], std=[0.3081])
])

### Conf matrix

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

fig, ax = plt.subplots(figsize=(6,6))
ConfusionMatrixDisplay.from_predictions(y_gt, y_pred, ax=ax, colorbar=False, cmap='bone_r')
plt.show()

# DATALOADERS (AND GETTING TORCH DATASETS)

In [None]:
#

# train_dataset = ChestMNIST(
#     data_path="./chestmnist.npz",
#     split="train",
#     p=mask_probability,
#     transform=transformations
# )
#
# val_dataset = ChestMNIST(
#     data_path="./chestmnist.npz",
#     split="val",
#     p=mask_probability,
#     transform=transformations
# )
#
# test_dataset = ChestMNIST(
#     data_path="./chestmnist.npz",
#     split="test",
#     p=mask_probability,
#     transform=transformations
# )

# datasets will be torch tensors eg torch.TensorDataset(X, y) X=SampleInput, y=SampleLabel

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    pin_memory=True)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=True,
    pin_memory=True)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=True,
    pin_memory=True)

# AN EXAMPLE NEURAL NETWORK IMPLEMENTATION

In [None]:
# 
dimensions = [784, 150, 50, 50, 150, 784]

class neural_net(nn.Module):
    def __init__(self, dimensions):
        super(neural_net, self).__init__()
        # fully connected
        self.fc1 = nn.Linear(dimensions[0], dimensions[1], bias=False)
        self.fc2 = nn.Linear(dimensions[1], dimensions[2], bias=False)
        self.fc3 = nn.Linear(dimensions[2], dimensions[3], bias=False)
        self.fc4 = nn.Linear(dimensions[3], dimensions[4], bias=False)
        self.fc5 = nn.Linear(dimensions[4], dimensions[-1], bias=False)
        # activation functions/special
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        self.mish = nn.Mish()
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()
        self.dropout = nn.Dropout(0.5)

    def forward(self, X):
        # INPUT LAYER
        out = self.fc1(X.flatten(start_dim=1).float())
        out = self.mish(out)
        
        # HIDDEN 1
        out = self.fc2(out)
        out = self.mish(out)
        
        # HIDDEN 2
        out = self.fc3(out)
        out = self.mish(out)

        # HIDDEN 3
        out = self.fc4(out)
        out = self.mish(out)
        
        # OUTPUT LAYER
        out = self.fc5(out)
        out = self.sigmoid(out)

        # TRANSFORMED OUTPUT LAYER 
        return out.view(X.shape)


# CHECKING A BATCH (+ OPT/LOSS FUNCS)

In [None]:
# 

#~ CHECKING EVERYTHING FITS CELL

# get a batch
train_batch = next(iter(train_loader))

X = torch.Tensor(train_batch[0])
y = torch.Tensor(train_batch[1])

print(f"X.shape = {X.shape}")
print(f"y.shape = {y.shape}")

# ensure prediction works
model = neural_network()
output = model(X)

print(f"  OUTPUT SHAPE: {output.shape} (pre-softmax)")
print(f"EXPECTED SHAPE: {y.shape}")

# ensure loss works
criterion = nn.CrossEntropyLoss()
print(criterion(output, y))

# --
optimizer = torch.optim.Adam(model.parameters(), weight_decay=weight_decay, lr=learning_rate)   # instantiate the optimizer
criterion = nn.MSELoss()

# # get a batch
# train_batch = next(iter(train_loader))

# # masked images
# X = torch.Tensor(train_batch[0]).reshape(-1,1,28,28)
# # unmasked images
# y = torch.Tensor(train_batch[1].float())

# print(f"X.shape = {X.shape}")
# print(f"y.shape = {y.shape}")
# # ensure prediction works
# model = LeNet5drbn()
# output = model(X)

# print(f"  OUTPUT SHAPE: {output.shape}")
# print(f"EXPECTED SHAPE: {y.shape}")

# # ensure loss works
# criterion = nn.CrossEntropyLoss()
# y = y.long()
# print(criterion(output, y))




# TRAIN, VALIDATE, EVALUATE FUNCTIONS

In [None]:
# 

def train(model, optimizer, criterion, data_loader):
    model.train()##
    ### set the model to train
    model.to(device)
    train_loss, train_accuracy = 0, 0
    for X, y in data_loader:
        
        X, y = X.to(device), y.to(device)
#         print(X.view(-1, 1, 28, 28).shape)
#         X = X.view(-1, 1, 28, 28)
#         outputs = model(X)
        optimizer.zero_grad()
        outputs = model(X.view(-1, 1, 28, 28))
        
        loss = criterion(outputs, y)
        loss.backward()##
        train_loss += loss*y.size(0)
        y_pred = F.log_softmax(outputs, dim=1).max(1)[1]##
        train_accuracy += accuracy_score(
            y.cpu().numpy(), 
            y_pred.detach().cpu().numpy())*X.size(0) ##

        optimizer.step()
        ### your code goes here
    
    # loss, acc
    return train_loss/len(data_loader.dataset), train_accuracy/len(data_loader.dataset)


def validate(model, criterion, data_loader):
    ### set the model to evaluate
    validation_loss, validation_accuracy = 0, 0
    for X, y in data_loader:
        with torch.no_grad():
            X, y = X.to(device), y.to(device)
            outputs = model(X.view(-1, 1, 28, 28))
            loss = criterion(outputs, y)
            validation_loss += loss*y.size(0)
            y_pred = F.log_softmax(outputs, dim=1).max(1)[1]##
            validation_accuracy += accuracy_score(
                y.cpu().numpy(), 
                y_pred.detach().cpu().numpy())*X.size(0) ##

            


    return validation_loss/len(data_loader.dataset), validation_accuracy/len(data_loader.dataset)


def evaluate(model, data_loader):
    ### set the model to evaluate
    ys, y_preds = [], []
    model.to(device)
    for X, y in data_loader:
        with torch.no_grad():

            X, y = X.to(device), y.to(device)
            outputs = model(X.view(-1, 1, 28, 28))
            y_pred = F.log_softmax(outputs, dim=1).max(1)[1]
            y_preds.append(y_pred.cpu().numpy())
            ys.append(y.cpu().numpy())


    return np.concatenate(y_preds, 0),  np.concatenate(ys, 0)

# TRAINING LOOP (Live plotting)

In [None]:
def train_model(model, model_params=None):
    set_seed(seed)
    model = model(model).to(device)

    optimizer = optimizer = torch.optim.Adam(model.parameters(), lr=lr,momentum=momentum)
    criterion = nn.CrossEntropyLoss()

    train_loader = DataLoader(
        mnist_train,
        batch_size=batch_size,
        shuffle=True,
        pin_memory=False, 
        num_workers=0,)
    
    validation_loader = DataLoader(
        mnist_validate,
        batch_size=test_batch_size,
        shuffle=True,
        pin_memory=False, 
        num_workers=0,)
    
    test_loader = DataLoader(
        mnist_test,
        batch_size=test_batch_size,
        shuffle=True,
        pin_memory=False, 
        num_workers=0,)

    liveloss = PlotLosses()
    for epoch in range(30):
        logs = {}
        train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)

        logs['' + 'log loss'] = train_loss.item()
        logs['' + 'accuracy'] = train_accuracy.item()

        validation_loss, validation_accuracy = validate(model, criterion, validation_loader)
        logs['val_' + 'log loss'] = validation_loss.item()
        logs['val_' + 'accuracy'] = validation_accuracy.item()

        liveloss.update(logs)
        liveloss.draw()

    return model

model = train_model(model)

## Transfer learning!!!

In [None]:
# uses 2 helper functions
def set_parameter_requires_grad(model, requires_grad=False):
    """https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html"""
    for param in model.parameters():
        param.requires_grad = requires_grad
    return None
def get_params_to_update(model):
    """ Returns list of model parameters that have required_grad=True"""
    params_to_update = []
    for name,param in model.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
    return params_to_update



model = models.resnet18(pretrained=True).to(device)
set_parameter_requires_grad(model, False)### use the provdied set_parameter_requires_grad to disable training
print(model)
# look for the name of the output layer, in this case its fc
# that layer to be one that fits your models use case :)
# the below example is converting a binary-classification resnet thing.
model.fc = nn.Linear(model.fc.in_features, 2).to(device) 

## Other stuff :/

In [None]:
#

### Saving a model
# model_save_name = 
path = f"/content/gdrive/My Drive/models/{model_save_name}"
torch.save(model.state_dict(), path)

### Loading a model

### output size for conv layer
def calculate_conv_output_size(input_size, kernel_size, stride, padding, num_filters):
    https://chat.openai.com/share/132dd59a-df37-4c72-adc4-89cae275a6c6
    
    # Extract input size dimensions
    input_height, input_width = input_size

    # Extract kernel size dimensions
    kernel_height, kernel_width = kernel_size

    # Calculate the output height and width
    output_height = ((input_height + 2 * padding - kernel_height) // stride) + 1
    output_width = ((input_width + 2 * padding - kernel_width) // stride) + 1

    return (output_height, output_width)

# Example usage:
input_size = (32, 32)
kernel_size = (3, 3)
stride = 1
padding = 1
num_filters = 64

output_size = calculate_conv_output_size(input_size, kernel_size, stride, padding, num_filters)
print("Output size:", output_size)

### imagestandardization

def apply_standardization(X, mean=0.1307, std=0.3081): # define an standardisation function
#     px_range = 225. 
    X /= 255.
    return (X - mean) / std
    

In [None]:
# when you have a dataset class you can use 
def show_batch(dataset, nr=4, nc=4):
    # this function comes from lecture 5: (VAEs)
    fig, axarr = plt.subplots(nr, nc, figsize=(10, 10))
    for i in range(nr):
    for j in range(nc):
        idx = random.randint(0, len(train_ds))
        sample, target = train_ds[idx]
        try:
            axarr[i][j].imshow(sample) # if PIL
        except:
            axarr[i][j].imshow(sample.permute(1,2,0)) # if tensor of shape CHW
            target_name = train_ds.classes[target]
            axarr[i][j].set_title("%s (%i)"%(target_name, target))

    fig.tight_layout(pad=1.5)
    plt.show()
    
show_batch(train_ds, 5, 5)