In [None]:
#  article dependencies
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as Datasets
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tqdm.notebook import tqdm
from tqdm import tqdm as tqdm_regular
import seaborn as sns
from torchvision.utils import make_grid
import random

In [None]:
#  setting up device
if torch.cuda.is_available():
  device = torch.device('cuda:0')
  print('Running on the GPU')
else:
  device = torch.device('cpu')
  print('Running on the CPU')

## Random Cropping

In [None]:
def random_crop(dataset: list, crop_size=(20, 20)):
  """
  This function replicates the random crop process
  """
  cropped = []
  images = [x[0] for x in dataset]
  for image in tqdm_regular(images):
    # deriving image size
    img_size = image.shape

    #  extracting channels
    channel_0, channel_1, channel_2 = image[:,:,0], image[:,:,1], image[:,:,2]

    #  deriving random indicies
    idx_row = random.randint(0, img_size[0] - crop_size[0])
    idx_column = random.randint(0, img_size[0] - crop_size[0])

    #  cropping image per channel
    channel_0 = channel_0[idx_row:idx_row + crop_size[0], 
                          idx_column:idx_column + crop_size[1]]
    channel_1 = channel_1[idx_row:idx_row + crop_size[0], 
                          idx_column:idx_column + crop_size[1]]
    channel_2 = channel_2[idx_row:idx_row + crop_size[0], 
                          idx_column:idx_column + crop_size[1]]

    #  stacking images
    image = np.dstack((channel_0, channel_1, channel_2))

    #  resizing image
    image = cv2.resize(image, (32, 32))
    #  labelling and appending to list
    cropped.append((image, 1))
  return cropped 

## Image noising

In [None]:
def noise_image(dataset: list, noise_intensity=0.2):
  """
  This function replicates the image noising process
  """
  noised = []
  noise_threshold = 1 - noise_intensity
  images = [x[0] for x in dataset]

  for image in tqdm_regular(images):
    #  extracting channels
    channel_0, channel_1, channel_2 = image[:,:,0], image[:,:,1], image[:,:,2]

    #  flatenning channels
    channel_0 = channel_0.reshape(1024)
    channel_1 = channel_1.reshape(1024)
    channel_2 = channel_2.reshape(1024)

    #  creating vector of zeros
    noise_0 = np.zeros(1024, dtype='uint8')
    noise_1 = np.zeros(1024, dtype='uint8')
    noise_2 = np.zeros(1024, dtype='uint8')

    #  noise probability
    for idx in range(1024):
      regulator = round(random.random(), 1)
      if regulator > noise_threshold:
        noise_0[idx] = 255
        noise_1[idx] = 255
        noise_2[idx] = 255
      elif regulator == noise_threshold:
        noise_0[idx] = 0
        noise_1[idx] = 0
        noise_2[idx] = 0
      else:
        noise_0[idx] = channel_0[idx]
        noise_1[idx] = channel_1[idx]
        noise_2[idx] = channel_2[idx]
    
    #  reshaping noise vectors
    noise_0 = noise_0.reshape((32, 32))
    noise_1 = noise_1.reshape((32, 32))
    noise_2 = noise_2.reshape((32, 32))

    #  stacking images
    image = np.dstack((noise_0, noise_1, noise_2))
    #  labelling and appending to list
    noised.append((image, 1))
  return noised

## Image flipping

In [None]:
def flip_image(dataset: list):
  """
  This function replicates the process of horizontal flipping
  """
  flipped = []
  images = [x[0] for x in dataset]

  for image in tqdm_regular(images):
    #  extracting channels
    channel_0, channel_1, channel_2 = image[:,:,0], image[:,:,1], image[:,:,2]

    channel_0 = channel_0[:, ::-1]
    channel_1 = channel_1[:, ::-1]
    channel_2 = channel_2[:, ::-1]

    #  stacking images
    image = np.dstack((channel_0, channel_1, channel_2))
    #  labelling and appending to list
    flipped.append((image, 1))
  return flipped

## Image Blurring

In [None]:
def blur_image(dataset, kernel_size=5, padding=True):
  """This function performs convolution over an image
   with the aim of blurring"""

  #  defining internal function for padding
  def pad_image(image, padding=2):
    """
    This function performs zero padding using the number of 
    padding layers supplied as argument and return the padded
    image.
    """
    #  extracting channels
    channel_0, channel_1, channel_2 = image[:,:,0], image[:,:,1], image[:,:,2]

    #  creating an array of zeros
    padded_0 = np.zeros((image.shape[0] + padding*2, 
                         image.shape[1] + padding*2), dtype='uint8')
    padded_1 = np.zeros((image.shape[0] + padding*2, 
                         image.shape[1] + padding*2), dtype='uint8')
    padded_2 = np.zeros((image.shape[0] + padding*2, 
                         image.shape[1] + padding*2), dtype='uint8')
    
    #  inserting image into zero array
    padded_0[int(padding):-int(padding), 
             int(padding):-int(padding)] = channel_0
    padded_1[int(padding):-int(padding), 
             int(padding):-int(padding)] = channel_1
    padded_2[int(padding):-int(padding), 
             int(padding):-int(padding)] = channel_2

    #  stacking images
    padded = np.dstack((padded_0, padded_1, padded_2))

    return padded

  #  defining list to hold blurred images
  all_blurred = []

  #  defining gaussian 5x5 filter
  gauss_5 = np.array([[1, 4, 7, 4, 1],
                     [4, 16, 26, 16, 4],
                     [7, 26, 41, 26, 7],
                     [4, 16, 26, 16, 4],
                     [1, 4, 7, 4, 1]])

  filter = 1/273 * gauss_5
  
  #  extracting images
  images = [x[0] for x in dataset]

  for image in tqdm_regular(images):
    if padding:
      image = pad_image(image)
    else:
      image = image

    #  extracting channels
    channel_0, channel_1, channel_2 = image[:,:,0], image[:,:,1], image[:,:,2]

    #  creating an array to store convolutions
    blurred_0 = np.zeros(((image.shape[0] - kernel_size) + 1, 
                          (image.shape[1] - kernel_size) + 1), dtype='uint8')
    blurred_1 = np.zeros(((image.shape[0] - kernel_size) + 1, 
                          (image.shape[1] - kernel_size) + 1), dtype='uint8')
    blurred_2 = np.zeros(((image.shape[0] - kernel_size) + 1, 
                          (image.shape[1] - kernel_size) + 1), dtype='uint8')
    
    #  performing convolution
    for i in range(image.shape[0]):
      for j in range(image.shape[1]):
        try:
          blurred_0[i,j] = (channel_0[i:(i+kernel_size), j:(j+kernel_size)] * filter).sum()
        except Exception:
          pass

    for i in range(image.shape[0]):
      for j in range(image.shape[1]):
        try:
          blurred_1[i,j] = (channel_1[i:(i+kernel_size), j:(j+kernel_size)] * filter).sum()
        except Exception:
          pass

    for i in range(image.shape[0]):
      for j in range(image.shape[1]):
        try:
          blurred_2[i,j] = (channel_2[i:(i+kernel_size), j:(j+kernel_size)] * filter).sum()
        except Exception:
          pass

    #  stacking images
    blurred = np.dstack((blurred_0, blurred_1, blurred_2))
    #  labelling and appending to list
    all_blurred.append((blurred, 1))

  return all_blurred

## Putting it all together

In [None]:
#  loading training data
training_set = Datasets.CIFAR10(root='./', download=True,
                              transform=transforms.ToTensor())

#  loading validation data
validation_set = Datasets.CIFAR10(root='./', download=True, train=False,
                                transform=transforms.ToTensor())

In [None]:
def extract_images(dataset):
  """
  This function helps to extract cat and dog images
  from the cifar-10 dataset
  """
  cats = []
  dogs = []

  for idx in tqdm_regular(range(len(dataset))):
    if dataset.targets[idx]==3:
      cats.append((dataset.data[idx], 0))
    elif dataset.targets[idx]==5:
      dogs.append((dataset.data[idx], 1))
    else:
      pass
  return cats, dogs
  
#  extracting from the training set
train_cats, train_dogs = extract_images(training_set)
#  extracting from the validation set
val_cats, val_dogs = extract_images(validation_set)

In [None]:
#  deriving images of interest
dog_images = train_dogs[:1200]

#  creating random cropped copies
dog_cropped = random_crop(dog_images)

#  creating flipped copies
dog_flipped = flip_image(dog_images)

#  creating noised copies
dog_noised = noise_image(dog_images)

## Piecing together a dataset

In [None]:
#  creating a dataset of 4,800 dog images
train_dogs = dog_images + dog_cropped + dog_flipped + dog_noised

#  instantiating training data
training_images = train_cats[:4800] + train_dogs
random.shuffle(training_images)

#  instantiating validation data
validation_images = val_cats + val_dogs
random.shuffle(validation_images)

In [None]:
#  defining dataset class
class CustomCatsvsDogs(Dataset):
  def __init__(self, data, transforms=None):
    self.data = data
    self.transforms = transforms

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    image = self.data[idx][0]
    label = torch.tensor(self.data[idx][1])

    if self.transforms!=None:
      image = self.transforms(image)
    return(image, label)
    
    
 #  creating pytorch datasets
training_data = CustomCatsvsDogs(training_images, transforms=transforms.Compose([transforms.ToTensor(),
                                                                                 transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
validation_data = CustomCatsvsDogs(validation_images, transforms=transforms.Compose([transforms.ToTensor(),
                                                                                     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))

## Convnet classes

In [None]:
class ConvolutionalNeuralNet_2():
  def __init__(self, network):
    self.network = network.to(device)
    self.optimizer = torch.optim.Adam(self.network.parameters(), lr=1e-3)

  def train(self, loss_function, epochs, batch_size, 
            training_set, validation_set):
    
    #  creating log
    log_dict = {
        'training_loss_per_batch': [],
        'validation_loss_per_batch': [],
        'training_accuracy_per_epoch': [],
        'training_recall_per_epoch': [],
        'training_precision_per_epoch': [],
        'validation_accuracy_per_epoch': [],
        'validation_recall_per_epoch': [],
        'validation_precision_per_epoch': []
    } 

    #  defining weight initialization function
    def init_weights(module):
      if isinstance(module, nn.Conv2d):
        torch.nn.init.xavier_uniform_(module.weight)
        module.bias.data.fill_(0.01)
      elif isinstance(module, nn.Linear):
        torch.nn.init.xavier_uniform_(module.weight)
        module.bias.data.fill_(0.01)

    #  defining accuracy function
    def accuracy(network, dataloader):
      network.eval()
      
      all_predictions = []
      all_labels = []

      #  computing accuracy
      total_correct = 0
      total_instances = 0
      for images, labels in tqdm(dataloader):
        images, labels = images.to(device), labels.to(device)
        all_labels.extend(labels)
        predictions = torch.argmax(network(images), dim=1)
        all_predictions.extend(predictions)
        correct_predictions = sum(predictions==labels).item()
        total_correct+=correct_predictions
        total_instances+=len(images)
      accuracy = round(total_correct/total_instances, 3)

      #  computing recall and precision
      true_positives = 0
      false_negatives = 0
      false_positives = 0
      for idx in range(len(all_predictions)):
        if all_predictions[idx].item()==1 and  all_labels[idx].item()==1:
          true_positives+=1
        elif all_predictions[idx].item()==0 and all_labels[idx].item()==1:
          false_negatives+=1
        elif all_predictions[idx].item()==1 and all_labels[idx].item()==0:
          false_positives+=1
      try:
        recall = round(true_positives/(true_positives + false_negatives), 3)
      except ZeroDivisionError:
        recall = 0.0
      try:
        precision = round(true_positives/(true_positives + false_positives), 3)
      except ZeroDivisionError:
        precision = 0.0
      return accuracy, recall, precision

    #  initializing network weights
    self.network.apply(init_weights)

    #  creating dataloaders
    train_loader = DataLoader(training_set, batch_size)
    val_loader = DataLoader(validation_set, batch_size)

    #  setting convnet to training mode
    self.network.train()

    for epoch in range(epochs):
      print(f'Epoch {epoch+1}/{epochs}')
      train_losses = []

      #  training
      print('training...')
      for images, labels in tqdm(train_loader):
        #  sending data to device
        images, labels = images.to(device), labels.to(device)
        #  resetting gradients
        self.optimizer.zero_grad()
        #  making predictions
        predictions = self.network(images)
        #  computing loss
        loss = loss_function(predictions, labels)
        log_dict['training_loss_per_batch'].append(loss.item())
        train_losses.append(loss.item())
        #  computing gradients
        loss.backward()
        #  updating weights
        self.optimizer.step()
      with torch.no_grad():
        print('deriving training accuracy...')
        #  computing training accuracy
        train_accuracy, train_recall, train_precision = accuracy(self.network, train_loader)
        log_dict['training_accuracy_per_epoch'].append(train_accuracy)
        log_dict['training_recall_per_epoch'].append(train_recall)
        log_dict['training_precision_per_epoch'].append(train_precision)

      #  validation
      print('validating...')
      val_losses = []

      #  setting convnet to evaluation mode
      self.network.eval()

      with torch.no_grad():
        for images, labels in tqdm(val_loader):
          #  sending data to device
          images, labels = images.to(device), labels.to(device)
          #  making predictions
          predictions = self.network(images)
          #  computing loss
          val_loss = loss_function(predictions, labels)
          log_dict['validation_loss_per_batch'].append(val_loss.item())
          val_losses.append(val_loss.item())
        #  computing accuracy
        print('deriving validation accuracy...')
        val_accuracy, val_recall, val_precision = accuracy(self.network, val_loader)
        log_dict['validation_accuracy_per_epoch'].append(val_accuracy)
        log_dict['validation_recall_per_epoch'].append(val_recall)
        log_dict['validation_precision_per_epoch'].append(val_precision)

      train_losses = np.array(train_losses).mean()
      val_losses = np.array(val_losses).mean()

      print(f'training_loss: {round(train_losses, 4)}  training_accuracy: '+
      f'{train_accuracy}  training_recall: {train_recall}  training_precision: {train_precision} *~* validation_loss: {round(val_losses, 4)} '+  
      f'validation_accuracy: {val_accuracy}  validation_recall: {val_recall}  validation_precision: {val_precision}\n')
      
    return log_dict

  def predict(self, x):
    return self.network(x)

In [None]:
class ConvNet(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(3, 8, 3, padding=1)
    self.batchnorm1 = nn.BatchNorm2d(8)
    self.conv2 = nn.Conv2d(8, 8, 3, padding=1)
    self.batchnorm2 = nn.BatchNorm2d(8)
    self.pool2 = nn.MaxPool2d(2)
    self.conv3 = nn.Conv2d(8, 32, 3, padding=1)
    self.batchnorm3 = nn.BatchNorm2d(32)
    self.conv4 = nn.Conv2d(32, 32, 3, padding=1)
    self.batchnorm4 = nn.BatchNorm2d(32)
    self.pool4 = nn.MaxPool2d(2)
    self.conv5 = nn.Conv2d(32, 128, 3, padding=1)
    self.batchnorm5 = nn.BatchNorm2d(128)
    self.conv6 = nn.Conv2d(128, 128, 3, padding=1)
    self.batchnorm6 = nn.BatchNorm2d(128)
    self.pool6 = nn.MaxPool2d(2)
    self.conv7 = nn.Conv2d(128, 2, 1)
    self.pool7 = nn.AvgPool2d(3)

  def forward(self, x):
    #-------------
    # INPUT
    #-------------
    x = x.view(-1, 3, 32, 32)
    
    #-------------
    # LAYER 1
    #-------------
    output_1 = self.conv1(x)
    output_1 = F.relu(output_1)
    output_1 = self.batchnorm1(output_1)

    #-------------
    # LAYER 2
    #-------------
    output_2 = self.conv2(output_1)
    output_2 = F.relu(output_2)
    output_2 = self.pool2(output_2)
    output_2 = self.batchnorm2(output_2)

    #-------------
    # LAYER 3
    #-------------
    output_3 = self.conv3(output_2)
    output_3 = F.relu(output_3)
    output_3 = self.batchnorm3(output_3)

    #-------------
    # LAYER 4
    #-------------
    output_4 = self.conv4(output_3)
    output_4 = F.relu(output_4)
    output_4 = self.pool4(output_4)
    output_4 = self.batchnorm4(output_4)

    #-------------
    # LAYER 5
    #-------------
    output_5 = self.conv5(output_4)
    output_5 = F.relu(output_5)
    output_5 = self.batchnorm5(output_5)

    #-------------
    # LAYER 6
    #-------------
    output_6 = self.conv6(output_5)
    output_6 = F.relu(output_6)
    output_6 = self.pool6(output_6)
    output_6 = self.batchnorm6(output_6)

    #--------------
    # OUTPUT LAYER
    #--------------
    output_7 = self.conv7(output_6)
    output_7 = self.pool7(output_7)
    output_7 = output_7.view(-1, 2)

    return F.softmax(output_7, dim=1)

## Training a Convolutional Neural Network

In [None]:
#  training model
model = ConvolutionalNeuralNet_2(ConvNet())

log_dict = model.train(nn.CrossEntropyLoss(), epochs=10, batch_size=64, 
                       training_set=training_data, validation_set=validation_data)