In [None]:
#imports
import numpy as np
from tqdm import tqdm, trange
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
import os
from posix import listdir
from random import shuffle
from math import floor
import re
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
from numpy.ma.core import argmin
from matplotlib.colors import LinearSegmentedColormap
from PIL import Image
import random
from keras.preprocessing.image import ImageDataGenerator
import PIL

%matplotlib inline

In [None]:
#this makes the model deterministic, and thus more repeatable
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x78c9143e8310>

In [None]:
path = '/content/drive/MyDrive/ALL_IDB2/img/'         #main directory for training data
dirs = listdir(path)                                  # lists items in training directory
n_p = 8   #number of patches, used both in patchifying and explaining: defined here because the value needs to be same for both

In [None]:
'''
This function augments the data
The filename of each file is extracted
The image iteslf is turned into an array, and 8 augmented images are produced from it:
1) the image is rotated in a random dircetion by up to 40 degrees
2) & 3) flips are permormed about both the vertical and horizontal axes
4) & 5) translations are carried out in a random direction by 0-40% of the image width in either x or y directions
6) the image is zoomed into at 30% zoom
7) the image is sheared by 20%
8) an unmodified 'normal' image is outputted
'''

def augment():
 for item in dirs:
        if os.path.isfile(path+item):
            f, e = os.path.splitext(path+item)
            img1 = cv2.imread(path+item)
            img2 = cv2.resize(img1, (512,512), interpolation = cv2.INTER_AREA)
            img3 = tf.keras.utils.img_to_array(img2)

            #rotation
            r1 = np.expand_dims(img3, 0)
            r2 = ImageDataGenerator(rotation_range=40).flow(r1).next()
            r3 = r2[0].astype('uint8')
            cv2.imwrite(f + 'r40n.jpg', r3)

            #flip, vertical
            flippedv = cv2.flip(img3, 1)
            cv2.imwrite(f + 'fvn.jpg', flippedv)

            #flip, horizontal
            flippedh = cv2.flip(img3, 0)
            cv2.imwrite(f + 'fhn.jpg', flippedh)

            #x-translate
            tx1 = np.expand_dims(img3, 0)
            tx2 = ImageDataGenerator(height_shift_range=0.4).flow(tx1).next()
            tx3 = tx2[0].astype('uint8')
            cv2.imwrite(f + 'tx40n.jpg', tx3)

            #y-translate
            ty1 = np.expand_dims(img3, 0)
            ty2 = ImageDataGenerator(width_shift_range=0.4).flow(ty1).next()
            ty3 = ty2[0].astype('uint8')
            cv2.imwrite(f + 'ty40n.jpg', ty3)

            #zoom
            z1 = np.expand_dims(img3, 0)
            z2 = ImageDataGenerator(zoom_range=[1/1.3,1/1.3]).flow(z1).next()
            z3 = z2[0].astype('uint8')
            cv2.imwrite(f + 'z30n.jpg', z3)

            #shear
            s1 = np.expand_dims(img3, 0)
            s2 = ImageDataGenerator(shear_range=20).flow(s1).next()
            s3 = s2[0].astype('uint8')
            cv2.imwrite(f + 's20n.jpg', s3)

            #normal image
            cv2.imwrite(f + 'n.jpg', img3)

In [None]:
'''
This function generates five datasets from the images files in the training folder
It starts by creating a list of all the files of the correct dimensions (with correct extensions)
The label for each object is then found using regular expressions
the label is added to the name in a 2-element list
the list is shuffled and split into four parts
Four different training/testing sets are created by removing one of the four parts at a time for use as a validation set
For each of the two datasets in turn, the filenames are converted into arrays and replaced by these arrays
'''
def data_fold():

 folds = 4
 global datasets_test                        #initiate data arrays
 datasets_test = [[0]*(27*8)]*folds
 global datasets_train
 datasets_train = [[0]*(81*8)]*folds

 i = 0                                      # initiate data-writing loop
 while i < folds:

  all_files = os.listdir(os.path.abspath(path))    #get list of files to process
  data_files = list(filter(lambda file: file.endswith('n.jpg'), all_files))
  shuffle(data_files)

  data_files = np.array(data_files)                         # split into testing and training data
  split_set = np.split(data_files,folds)
  global train_set
  train_set = list(np.concatenate((split_set[((i+0)%folds)], split_set[((i+1)%folds)], split_set[((i+2)%folds)])))
  test_set = list(split_set[((i+3)%folds)])

  j = 0
  p = re.compile(r'_1')
  while j<len(train_set):
   c = cv2.imread(path+str(train_set[j]))
   d = tf.keras.utils.img_to_array(c)
   if (p.findall(train_set[j]) == [str('_1')]):
    h = (1)
   elif (p.findall(train_set[j]) != [str('_1')]):
    h = (0)
   datasets_train[i][j] = [d,h]
   j+=1

  j = 0
  p = re.compile(r'_1')
  while j<len(test_set):
   c = cv2.imread(path+str(test_set[j]))
   d = tf.keras.utils.img_to_array(c)
   if (p.findall(test_set[j]) == [str('_1')]):
    h = (1)
   elif (p.findall(test_set[j]) != [str('_1')]):
    h = (0)
   datasets_test[i][j] = [d,h]
   j+=1

  i+=1

In [None]:
def main():
    # Loading data
    train_loader = DataLoader(train, shuffle=True)     #loading data sets
    test_loader = DataLoader(test, shuffle=False)

    # Defining model and training options
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")    #choosing processing device
    print("Using device: ", device, f"({torch.cuda.get_device_name(device)})" if torch.cuda.is_available() else "")  # printing name of device being used
    global model
    model = MyViT((3, 512, 512), n_patches=8, n_blocks=4, hidden_d=768, n_heads=2, out_d=2).to(device)     # params
    N_EPOCHS = 15
    LR = 0.0000002

    # Training loop
    optimizer = Adam(model.parameters(), lr=LR)    #optimizer, lr = learning rate
    #optimizer = torch.optim.SGD(model.parameters(), lr=LR)       # use this line instead of above line if you want SGD insetad of ADAM
    criterion = CrossEntropyLoss()                                  #loss criterion
    for epoch in trange(N_EPOCHS, desc="Training"):                 #loop for training epoch
        train_loss = 0.0
        for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=False):        #loop for training within epoch
            x, y = batch              #get data, and its label
            y = y.to(device)          #send data to GPU/CPU
            x = x.to(device)
            y_hat = model(x)                           #predicted value of data
            loss = criterion(y_hat, y)                 #loss due to predicted value and exapt value being different

            train_loss += loss.detach().cpu().item() / len(train_loader)  #calculating total loss

            optimizer.zero_grad()                 #optimization of params
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch + 1}/{N_EPOCHS} loss: {train_loss:.4f}")    #reporting training accuracy (loss)

    # Test loop
    with torch.no_grad():
        correct, total = 0, 0
        test_loss = 0.0
        for batch in tqdm(test_loader, desc="Testing"):     # training loop over all data
            x, y = batch                                    # extract data and label
            x, y = x.to(device), y.to(device)               # sending data to be processed
            y_hat = model(x)                                # prediction of model
            loss = criterion(y_hat, y)
            test_loss += loss.detach().cpu().item() / len(test_loader)

            correct += torch.sum(torch.argmax(y_hat, dim=1) == y).detach().cpu().item()      #increses number of correct if value of y is same as column in output matrix with max probability
            total += len(x)

        print(f"Test loss: {test_loss:.2f}")      #reporting results
        print(f"Test accuracy: {correct / total * 100:.2f}%")

In [None]:
'''
This cell contains all of the code for the functions and objects which the ViT model uses, and defines them
'''

'''
Patchify cuts the image into squares for feeding into the transformer
First it makes sure that the image is square
Then it creates a tensor to store the patches
Finally it selects squares from the image, flattens them, and adds them to the tensor it has created
'''
def patchify(images, n_patches):
    n, h, w, c = images.shape

    assert h == w, "Patchify method is implemented for square images only"    #making sure image is square

    patches = torch.zeros(n, n_patches ** 2, h * w * c // n_patches ** 2)      #creates a tensor to store patches
    patch_size = h // n_patches

    for idx, image in enumerate(images):                  #patchify image
        for i in range(n_patches):                      #horizontal and vertical cuts
            for j in range(n_patches):
                patch = image[i * patch_size: (i + 1) * patch_size, j * patch_size: (j + 1) * patch_size,:]   #patchifies by taking pixels in relavent range of h,w
                patches[idx, i * n_patches + j] = patch.flatten()            #turns patch into vector
    return patches



'''
This function adds positional encodings to a new tensor
This means that the model can now know where each patch in original image came from
This is because the value is uniquely dependant on the position of each patch
'''
def get_positional_embeddings(sequence_length, d):
    result = torch.ones(sequence_length, d)          #creates a tensor with dimensions ,sequence_length, d
    for i in range(sequence_length):
        for j in range(d):
            result[i][j] = np.sin(i / (10000 ** (j / d))) if j % 2 == 0 else np.cos(i / (10000 ** ((j - 1) / d)))   #loop writes positional encodings into tensor
    return result

'''
This class defines the operation of a self-attention head
'''
class MyMSA(nn.Module):
    def __init__(self, d, n_heads=4):
        super(MyMSA, self).__init__()
        self.d = d
        self.n_heads = n_heads

        #Dimensions of vector must be divisible between heads so that all heads process equivalent ammounts of data for each image
        assert d % n_heads == 0, f"Can't divide dimension {d} into {n_heads} heads"

        d_head = int(d / n_heads)          #finding the dimension of the input of a head
        self.q_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)]) #query vector
        self.k_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)]) #key vector
        self.v_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)]) #learned vector
        self.d_head = d_head
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, sequences):
        # Sequences has shape (N, seq_length, token_dim)
        # We go into shape    (N, seq_length, n_heads, token_dim / n_heads)
        # And come back to    (N, seq_length, item_dim)  (through concatenation)
        result = []
        for sequence in sequences:
            seq_result = []
            for head in range(self.n_heads):
                q_mapping = self.q_mappings[head]
                k_mapping = self.k_mappings[head]
                v_mapping = self.v_mappings[head]

                seq = sequence[:, head * self.d_head: (head + 1) * self.d_head]
                q, k, v = q_mapping(seq), k_mapping(seq), v_mapping(seq)

                attention = self.softmax(q @ k.T / (self.d_head ** 0.5))      #application of self-attention
                seq_result.append(attention @ v)
            result.append(torch.hstack(seq_result))
        return torch.cat([torch.unsqueeze(r, dim=0) for r in result])        #combine the outputs of the different heads

'''
This class defines a transformer block
'''
class MyViTBlock(nn.Module):
    def __init__(self, hidden_d, n_heads, mlp_ratio=4):
        super(MyViTBlock, self).__init__()
        self.hidden_d = hidden_d
        self.n_heads = n_heads
        #These are defenitions of various attributes and metods of the block
        self.norm1 = nn.LayerNorm(hidden_d)
        self.mhsa = MyMSA(hidden_d, n_heads)
        self.norm2 = nn.LayerNorm(hidden_d)
        self.mlp = nn.Sequential(
            nn.Linear(hidden_d, mlp_ratio * hidden_d),
            nn.GELU(),
            nn.Linear(mlp_ratio * hidden_d, hidden_d)
        )
    #THis function is what produces the output of the transformer's block, and feeds it forwrd
    def forward(self, x):
        out = x + self.mhsa(self.norm1(x))
        out = out + self.mlp(self.norm2(out))
        return out

'''
This class defines the overall operation of the whole model
The various functions which it uses are described where they are defined
'''
class MyViT(nn.Module):
    def __init__(self, hwc, n_patches=n_p, n_blocks=4, hidden_d=256, n_heads=4, out_d=2):
        # Super constructor
        super(MyViT, self).__init__()

        # Attributes
        self.hwc = hwc # (Height , Width , Colour)
        self.n_patches = n_patches
        self.n_blocks = n_blocks
        self.n_heads = n_heads
        self.hidden_d = hidden_d

        # Input and patches sizes
        assert hwc[1] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        assert hwc[2] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        self.patch_size = (hwc[1] / n_patches, hwc[2] / n_patches)

        # 1) Linear mapper for turning matrices into smaller vectors
        self.input_d = int(hwc[0] * self.patch_size[0] * self.patch_size[1])
        self.linear_mapper = nn.Linear(self.input_d, self.hidden_d)

        # 2) Learnable classification token
        self.class_token = nn.Parameter(torch.rand(1, self.hidden_d))

        # 3) Positional embedding
        self.register_buffer('positional_embeddings', get_positional_embeddings(n_patches ** 2 + 1, hidden_d), persistent=False)

        # 4) Transformer encoder blocks (makes them part of the model)
        self.blocks = nn.ModuleList([MyViTBlock(hidden_d, n_heads) for _ in range(n_blocks)])

        # 5) Classification MLPk
        self.mlp = nn.Sequential(
            nn.Linear(self.hidden_d, out_d),
            nn.Softmax(dim=-1)
        )

    def forward(self, images):
        # Dividing images into patches
        n, h, w, c = images.shape   #extracts shapes from image data
        patches = patchify(images, self.n_patches).to(self.positional_embeddings.device) #creates patches

        # Running linear layer tokenization
        # Map the vector corresponding to each patch to the hidden size dimension (hidden_d)
        tokens = self.linear_mapper(patches)

        # Adding classification token to the tokens (patches)
        tokens = torch.cat((self.class_token.expand(n, 1, -1), tokens), dim=1)

        # Adding positional embedding so that model know where patch came from
        out = tokens + self.positional_embeddings.repeat(n, 1, 1)

        # Transformer Blocks
        #The output of each block is calculated
        for block in self.blocks:
            out = block(out)

        # Getting the classification token only
        out = out[:, 0]

        return self.mlp(out) # Map to output dimension, output category distribution

In [None]:
'''
This trains the model
It augments the data set
It then creates a four-fold validation set
Lastly it trains and validates the model on each of the folds
'''
def prepare():
  #augment()
  data_fold()

  k = 0
  while k<4:
    global train
    train = datasets_train[k]
    global test
    test = datasets_test[k]
    main()
    k+=5

In [None]:
prepare()

In [None]:
def use():
    # Loading data
    use_path = '/content/drive/MyDrive/ALL_IDB1/im/'
    all_files = os.listdir(os.path.abspath(use_path))    #get list of files to process
    data_files = list(filter(lambda file: file.endswith('.jpg'), all_files))
    size_use = int(len(data_files))

    datasets_use = [0]*size_use
    j = 0
    while j < size_use:
     a = str(data_files[j])
     b = cv2.imread(use_path+str(data_files[j]))
     c = cv2.resize(b, (512,512), interpolation = cv2.INTER_CUBIC)
     datasets_use[j] = [tf.keras.utils.img_to_array(c), a]
     j+=1

    test_loader = DataLoader(datasets_use)

    # Use loop
    Correct = 0
    FP = 0
    FN = 0
    p = re.compile(r'_1')
    with torch.no_grad():
        for batch in test_loader:
            x,y_1 = batch
            v = model(x)
            pred = int(torch.argmax(v))
            print(y_1, pred)

            if (p.findall(str(y_1)) == [str('_1')]):
               h = 1
            elif (p.findall(str(y_1)) != [str('_1')]):
              h = 0
            if h == pred:
              Correct+=1
            elif h == 1 and pred == 0:
              FN += 1
            elif h == 0 and pred == 1:
              FP += 1

    print(f'correct = {Correct}')
    print(f'False Negative = {FN}')
    print(f'False Positive = {FP}')

In [None]:
use()

In [None]:
'''
This function tries to explain the model's output using oclusion.
This is a peturbation based aproach where square patches of the image are replaced by zeroes
and the difference between the output for this image and the old image is computed and plotted
The 'explain_path' is the source for the data, and all the files in this folder are listed
A data set where each image is represented as a tensor and labelled by its name is then generated
For each element in the data set, the model is used to predict whether cancer is present
Occulsion is implemented and patches of the image are zeroed; the whole image is covered
The diiferences are put into an array with the same dimensions as the input image
The array is normalised so that its values are between 0. and 1.
The array is plotted as a cmap
'''
def explain():
    #get list of files
    explain_path = '/content/drive/MyDrive/ALL_IDB1/im/'
    all_files = os.listdir(os.path.abspath(explain_path))    #get list of files to process

    data_files = list(filter(lambda file: file.endswith('n.jpg'), all_files))
    size_explain = int(len(data_files))

    #get dataset
    datasets_use = [0]*size_explain
    j = 0
    while j < size_explain:
     a = str(data_files[j])
     b = cv2.imread(explain_path+str(data_files[j]))
     c = cv2.resize(b, (512,512), interpolation = cv2.INTER_AREA)
     datasets_use[j] = [tf.keras.utils.img_to_array(c), a]
     j+=1

    explain_loader = DataLoader(datasets_use, shuffle = True)


    with torch.no_grad():
        for batch in explain_loader:
            x,y_1 = batch                          #get model prediction
            v = model(x)                           #produces array where elemnt i is the probability that image is in ith class
            pred = int(torch.argmax(v))
            print(y_1[0],';', v)


            i = 0
            global attr
            attr = np.array([[0]*256]*256)
            while i < n_p:                      #horizontal and vertical cuts
             j = 0
             while j < n_p:
              q = np.array(x)

              for k in range(int(256/n_p)):                                #turning the relevant patch into a set of zeroes
                for l in range(int(256/n_p)):
                 q[0][int(i*256/n_p) + k][int(j*256/n_p) + l][:] = 0

              r = torch.from_numpy(q)
              s = model(r)
              diff = float(v[0][1])-float(s[0][1])                      #calculating the resulting change in the model

              for k in range(int(256/n_p)):
                for l in range(int(256/n_p)):
                 attr[int(i*256/n_p) + k][int(j*256/n_p) + l] = diff*10**15      #putting these attributions of regions into an array
              j+=1
             i+=1

            Max = float(np.max(np.array(attr)))
            Min = float(np.min(np.array(attr)))
            attr_1 = (np.array(attr) - Min)/(Max - Min)                   #normalising the array

            plt.imshow(attr_1, cmap='Greys')
            plt.colorbar()
            plt.show()

In [None]:
explain()

Output hidden; open in https://colab.research.google.com to view.