# RBE/CS549 Fall 2022: Computer Vision
## Homework 0: Alohomora

Author(s): 
Prof. Nitin J. Sanket (nsanket@wpi.edu), Lening Li (lli4@wpi.edu), Gejji, Vaishnavi Vivek (vgejji@wpi.edu)

Robotics Engineering Department,

Worcester Polytechnic Institute

Code adapted from CMSC733 at the University of Maryland, College Park.


## Phase 1




### Get the BSDS500 dataset

In [None]:
!wget https://raw.githubusercontent.com/leelening/rbe549/main/hw0/BSDS500.tar.xz
!tar -xvf BSDS500.tar.xz
!mv BSDS500/ /content/data/

In [None]:
!wget https://raw.githubusercontent.com/leelening/rbe549/main/hw0/TxtFiles.tar.xz
!tar -xvf TxtFiles.tar.xz
!mv TxtFiles/ /content/data/

In [None]:
import os.path
from os import path
import numpy as np
import cv2
import matplotlib.pyplot as plt
from scipy.signal import convolve2d
import imutils
from sklearn.cluster import KMeans

In [None]:

Filter_Out = '/content/data/Filter_Output/'
if path.exists(Filter_Out) == False:
  os.mkdir(Filter_Out)

In [None]:
img = cv2.imread("/content/data/Images/2.jpg")
plt.axis("off")
plt.imshow(cv2.cvtColor(img,cv2.COLOR_BGR2RGB))

1. Generate Difference of Gaussian Filter Bank: (DoG)
2. Display all the filters in this filter bank and save image as DoG.png,
3. use command "cv2.imwrite(...)"

In [None]:
def gaussian2d(kernel_size,sd):
  # size = int(kernel_size/2);          # Dividing the matrix such that the middle is kept as zero        
  size = int((kernel_size-1)/2);        # (Kernel_size -1) is done to keep the final matrix less than or equal to the size of the kernel.
  x = [i for i in range(-size,size+1)]; # An array with equal values extending on both negative and positive side
  x,y = np.meshgrid(x,x)                # Creating a mesh grid
  z = np.power(x,2)+np.power(y,2)       # Using the x,y mesh values to calculate a function value (In this case a upward paraboloid)
  
  # gaussian = (1/np.sqrt(2*np.pi)*sd)*np.exp(-z/(2*np.power(sd,2))) 
  gaussian = (1/2*np.pi*np.power(sd,2))*np.exp(-z/(2*np.power(sd,2))) # Substituting the values in the gaussian equaation to get a 2D gaussian matrix
  return gaussian

def DoG_Filter_Bank(sd,orientation,kernel_size=5):
    '''
      sobel_x = [[-1,  0,  1]
                 [-2,  0,  2]
                 [-1,  0,  1]];
    
      sobel_y = [[-1,  -2,  -1]
                 [0,  0,  0]
                 [1, 2, 1]];
    '''
    # sd = [1,2,3]
    # orientation = [0,90,180,270]
    # kernel_size = 5
    bank = []
    sobel_fil = np.mat([[-1,  -2,  -1],[0,  0,  0], [1, 2, 1]])   # Only one filter is used because as the filter is considered in different orientation it also accomodates for the other direction.
    
    for i in sd:
        filter = convolve2d(gaussian2d(kernel_size,i), sobel_fil,mode='same')
        for j in orientation:
            fil = imutils.rotate(filter, j)
            bank.append(fil)
    
    # print(bank)
    fig, axs = plt.subplots(len(sd),len(orientation),figsize=(len(orientation),len(sd)))
    for i in range(len(sd)):
      for j in range(len(orientation)):
            # a = [0,1,2]
            # b = [0,1,2,3]
            # for i in a:con
            #   for j in b:
            #     print(c[i*len(b)+j])
            axs[i, j].imshow(bank[i*len(orientation)+j],cmap='gray')
            axs[i, j].axis('off')

    plt.show()
    return bank

kernel_size = 9
sigma = [1,2];
orientations = np.linspace(0,360,17)[:-1]
DoG_Filters = DoG_Filter_Bank(sigma,orientations,kernel_size)


1. Generate Leung-Malik Filter Bank: (LM)
2. Display all the filters in this filter bank and save image as LM.png,
3. use command "cv2.imwrite(...)"


In [None]:
def gaussian(kernel_size,sd): 
  sdx = sd
  sdy = sd*3  
  size = int((kernel_size-1)/2);        # (Kernel_size -1) is done to keep the final matrix less than or equal to the size of the kernel.
  x = [i for i in range(-size,size+1)]; # An array with equal values extending on both negative and positive side
  x,y = np.meshgrid(x,x)                # Creating a mesh grid
  gaussian = (1/(2*np.pi*sdx*sdy))*np.exp(-0.5*(x**2/sdx**2 + y**2/sdy**2)) # Substituting the values in the gaussian equaation to get a 2D gaussian matrix
  return gaussian

def LoG(kernel_size,sd):
  size = int((kernel_size-1)/2);        # (Kernel_size -1) is done to keep the final matrix less than or equal to the size of the kernel.
  x = [i for i in range(-size,size+1)]; # An array with equal values extending on both negative and positive side
  x,y = np.meshgrid(x,x)                # Creating a mesh grid
  LaplaceOfGaussian  = (-1/(np.pi * sd**4))*(1-(x**2 + y**2)/(2 * sd**2))*np.exp(-0.5*(x**2+ y**2)/sd**2)
  return LaplaceOfGaussian


def Leung_Malik_Bank(sd,orientation,kernel_size=5):
  '''
  https://academic.mu.edu/phys/matthysd/web226/Lab02.htm
  '''
  bank = [];
  sobel_fil = np.mat([[-1,  0,  1], [-2,  0,  2], [-1,  0,  1]]);
  # sobel_fil = np.mat([[-1,  -2,  -1],[0,  0,  0], [1, 2, 1]])   # Only one filter is used because as the filter is considered in different orientation it also accomodates for the other direction.

  for i in reversed(sd[:3]):
      filter_d1 = convolve2d(gaussian(kernel_size,i), sobel_fil, mode='same')
      filter_d2 = convolve2d(filter_d1, sobel_fil, mode='same')
      # print("Filter_D1: \n",filter_d1)
      # print("Filter_D2: \n",filter_d2)
      for j in orientation:
          fil1 = imutils.rotate(filter_d1, j)
          bank.append(fil1)
      for j in orientation:    
          fil2 = imutils.rotate(filter_d2, j)  
          bank.append(fil2)

  # Laplace of Gaussian
  for i in sd:
      LOG_sd = LoG(kernel_size,i) # Function call for standard deviation of sigma
      bank.append(LOG_sd)

  for i in sd:
      LOG_3sd = LoG(kernel_size,float(3*i)) # Function call for standard deviation of 3*sigma
      bank.append(LOG_3sd)

  for i in sd:
    bank.append(gaussian2d(kernel_size,i))

  print(len(bank))
  fig, axs = plt.subplots(4,12,figsize=(12,4))
  for i in range(4):
    for j in range(12):
          axs[i, j].imshow(bank[i*12+j],cmap='gray')
          axs[i, j].axis('off')

  plt.show()
  return bank

def Leung_Malik_Small(kernel_size):
  sigma = [1, np.sqrt(2), 2, 2*np.sqrt(2)];
  orientations = np.linspace(90,-90,7)[:-1]
  LM_Bank_small = Leung_Malik_Bank(sigma,orientations,kernel_size)
  return LM_Bank_small

def Leung_Malik_Large(kernel_size):
  sigma = [np.sqrt(2), 2, 2*np.sqrt(2), 4];
  orientations = np.linspace(90,-90,7)[:-1]
  LM_Bank_Large = Leung_Malik_Bank(sigma,orientations,kernel_size)
  return LM_Bank_Large


##### Function Call #####
val = int(input("Please enter the size of the Leung_Malik Filter : \n1.Large \n2.Small \n"))
kernel_size = int(input("\nPlease enter the kernel size (Only odd Number) : \n"))

if val == 1:
  LM_Bank = Leung_Malik_Large(kernel_size)
elif val == 2:
  LM_Bank = Leung_Malik_Small(kernel_size)
else:
  print("Choice doesn't exist")

1. Generate Gabor Filter Bank: (Gabor)
2. Display all the filters in this filter bank and save image as Gabor.png,
3. use command "cv2.imwrite(...)"


In [None]:
def gabor(side_x, sigma, theta, Lambda, gamma, psi):
  '''
    side_x  - The size of x direction of kernel when rot angle is zero
    sigma   - The standard deviation corresponsing to the x-axis
    theta   - The angle of rotation of the filter
    Lambda  - The wavelength of sinusoidal component
    gamma   - Spatial aspect ratio (y/x)
    psi     - Phase offset of the sinusoidal function
  '''
  gabor_bank = []
  x = side_x
  y = int(side_x /gamma)
  X,Y = np.meshgrid(np.linspace(-int((x-1)/2),int((x-1)/2),x),np.linspace(-int((y-1)/2),int((y-1)/2),y))

  for i,l in zip(sigma,Lambda):
      for j in theta:
          # sigma_x = i
          # sigma_y = float(i) / gamma
          x_prime = X*np.cos(j) + Y*np.sin(j)
          y_prime = -X*np.sin(j) + Y*np.cos(j)
          # gabor_filter = np.exp(-(x_prime**2/sigma_x**2 + y_prime**2/sigma_y**2))*np.cos(2*np.pi/l *x_prime + psi)
          gabor_filter = np.exp(-((x_prime**2 + y_prime**2)/i**2))*np.cos(2*np.pi/l *x_prime + psi)
          gabor_bank.append(gabor_filter);
  
  # print(bank)
  fig, axs = plt.subplots(len(sigma),len(theta),figsize=(len(theta),len(sigma)))
  for i in range(len(sigma)):
    for j in range(len(theta)):
          axs[i, j].imshow(gabor_bank[i*len(theta)+j],cmap='gray')
          axs[i, j].axis('off')
  plt.show()

  return gabor_bank

side_x = 100
sigma = [20, 30, 40, 50, 60]
theta = np.deg2rad(np.linspace(-90,90,9))[:-1]
Lambda = [10, 15, 20, 25, 30]
gamma = 1
psi   = 0

gb = gabor(side_x, sigma, theta, Lambda, gamma, psi)

1. Generate Half-disk masks
2. Display all the Half-disk masks and save image as HDMasks.png,
3. use command "cv2.imwrite(...)"

In [None]:
def half_disk(diameter):
  K = np.zeros((diameter,diameter))
  r = diameter/2
  c = int((diameter-1)/2)
  for i in range(diameter):
    for j in range(diameter):
      dist = (i-c)**2 + (j-c)**2
      if dist <= r**2:
        K[i,j] = 1

  for i in range(diameter):
    for j in range(diameter):
      if j >= c:
        K[i,j] = 0

  return K


def half_disk_filter(kernel_size,orientations):
  halfDisk_bank = []
  for i in kernel_size:
    filter = half_disk(i)
    for j in orientations:
      fil = imutils.rotate(filter, j);
      fil = np.ceil(fil)
      halfDisk_bank.append(fil)

  # fig, axs = plt.subplots(int(2*len(kernel_size)),int(len(orientations)/2),figsize=(int(len(orientations)/2),int(2*len(kernel_size))))
  # for i in range(int(2*len(kernel_size))):
  #   for j in range(int(len(orientations)/2)):
  #         axs[i, j].imshow(halfDisk_bank[i*int(len(orientations)/2)+j],cmap='gray')
  #         axs[i, j].axis('off')

  fig, axs = plt.subplots(len(kernel_size),len(orientations),figsize=(len(orientations),len(kernel_size)))
  for i in range(len(kernel_size)):
    for j in range(len(orientations)):
          axs[i, j].imshow(halfDisk_bank[i*len(orientations)+j],cmap='gray')
          axs[i, j].axis('off')
  

  plt.show()
  return halfDisk_bank

kernel_size = [7,17,27]
orientations = np.linspace(0,360,17)[:-1]
halfDisk_bank = half_disk_filter(kernel_size,orientations)
print(len(halfDisk_bank))

1. Generate Texton Map
2. Filter image using oriented gaussian filter bank


In [None]:
def TextonMap(Img,filter_bank):
  FilImgs = []
  Img = cv2.cvtColor(Img, cv2.COLOR_BGR2GRAY)
  
  for i in range(len(filter_bank)):
    fil_img = convolve2d(np.array(Img),np.array(filter_bank[i]), mode='same')
    FilImgs.append(fil_img)

  FilImgs = np.array(FilImgs)
  return FilImgs

DoG_Texton_Map = TextonMap(img,DoG_Filters)

1. Generate texture ID's using K-means clustering
2. Display texton map and save image as TextonMap_ImageName.png,
3. use command "cv2.imwrite('...)"


In [None]:
def TextonID(TextonMaps):
  K = 16
  Temp = np.reshape(TextonMaps,(TextonMaps.shape[0],TextonMaps.shape[1]*TextonMaps.shape[2]))
  KM = KMeans(n_clusters=K, tol=0.0001).fit(Temp.T) # 
  TextonIDMaps = KM.labels_.reshape(TextonMaps.shape[1],TextonMaps.shape[2])
  # print(KM.labels_.shape)
  return TextonIDMaps,K

textonIdMap,K = TextonID(DoG_Texton_Map)
cv2.imwrite(Filter_Out + "textonIdMap.png", textonIdMap)
plt.axis("off")
plt.imshow(textonIdMap)

1. Generate Texton Gradient (Tg)
2. Perform Chi-square calculation on Texton Map
3. Display Tg and save image as Tg_ImageName.png,
4. use command "cv2.imwrite(...)"


In [None]:
def gradient_fn(InputMap,K,halfDisk_bank):
  # print(InputMap.shape)
  NoOfFil = len(halfDisk_bank)
  H,W = InputMap.shape
  min_array = np.array([[1e-9 for j in range(W)] for i in range(H)]) # Min value array to prevent division by zero error in Chi-square calculation
  index = 0
  grad = np.zeros((int(NoOfFil/2),H,W))
  for i in range(NoOfFil):
    if (i+8) % 16 != 0:
      left_mask = halfDisk_bank[i]
      right_mask = halfDisk_bank[i+8]
      chi_square = np.zeros((H,W))
      for k in range(K):
        # print(InputMap.shape)
        Img_ = InputMap.copy()
        Img_[InputMap == k] = 1.0 
        Img_[InputMap != k] = 0.0
        # print(Img_.shape)
        gi = convolve2d(Img_, left_mask, mode='same')
        hi = convolve2d(Img_, right_mask, mode='same')
        # print(g_i.shape)
        # print(h_i.shape)
        #χ2  distance between two histograms g and h
        chi_square = chi_square + np.divide((gi - hi)**2, gi + hi + min_array)
      grad[index,:,:] = 0.5*chi_square
      index = index + 1 
    else:
      i = i + 8
    return grad

In [None]:
textronGrad = gradient_fn(textonIdMap,K,halfDisk_bank)
textronGrad_Avg = textronGrad.mean(axis=0,dtype=np.float32)
cv2.imwrite(Filter_Out + "textronGrad_Avg.png", textronGrad_Avg)
plt.imshow(textronGrad_Avg)

1. Generate Brightness Map
2. Perform brightness binning


In [None]:
def BrightnessMaps(Img_gray):
  K = 16
  Img_gray_ = Img_gray.reshape(-1,1)
  KM = KMeans(n_clusters=K, tol=0.0001).fit(Img_gray_) 
  BrightnessIDMaps = KM.labels_.reshape(Img_gray.shape[0],Img_gray.shape[1])
  return BrightnessIDMaps, K

BrightnessIDMaps,K = BrightnessMaps(cv2.cvtColor(img,cv2.COLOR_BGR2GRAY))
cv2.imwrite(Filter_Out + "BrightnessIDMaps.png", BrightnessIDMaps)
plt.imshow(BrightnessIDMaps)

1. Generate Brightness Gradient (Bg)
2. Perform Chi-square calculation on Brightness Map
3. Display Bg and save image as Bg_ImageName.png,
4. use command "cv2.imwrite(...)"



In [None]:
BrightnessGrad = gradient_fn(BrightnessIDMaps,K,halfDisk_bank)
BrightnessGrad_Avg = BrightnessGrad.mean(axis=0,dtype=np.float32)
cv2.imwrite(Filter_Out + "BrightnessGrad_Avg.png", BrightnessGrad_Avg)
plt.imshow(BrightnessGrad_Avg)

1. Generate Color Map
2. Perform color binning or clustering


In [None]:
def ColorMap(Img):
  K = 16
  Temp = np.reshape(Img,(Img.shape[0]*Img.shape[1],Img.shape[2]))
  KM = KMeans(n_clusters=K, tol=0.0001).fit(Temp) 
  ColorIDMaps = KM.labels_.reshape(Img.shape[0],Img.shape[1])
  return ColorIDMaps,K

ColorIDMaps,K = ColorMap(cv2.cvtColor(img,cv2.COLOR_BGR2RGB))
cv2.imwrite(Filter_Out + "ColorIDMaps.png", ColorIDMaps)
plt.imshow(ColorIDMaps)




1. Generate Color Gradient (Cg)
2. Perform Chi-square calculation on Color Map
3. Display Cg and save image as Cg_ImageName.png,
4. use command "cv2.imwrite(...)"



In [None]:
ColorGrad = gradient_fn(ColorIDMaps,K,halfDisk_bank)
ColorGrad_Avg = ColorGrad.mean(axis=0,dtype=np.float32)
cv2.imwrite(Filter_Out + "ColorGrad_Avg.png", ColorGrad_Avg)
plt.imshow(ColorGrad_Avg)


1. Read Sobel Baseline
2. use command "cv2.imread(...)"


In [None]:
sobelBaselinePath = "/content/data/SobelBaseline/2.png"
sobelOut = cv2.imread(sobelBaselinePath,0) #rEading gray scale Sobel baseline
plt.imshow(sobelOut,cmap="gray")




1. Read Canny Baseline
2. use command "cv2.imread(...)"



In [None]:
CannyBaselinePath = "/content/data/CannyBaseline/2.png"
CannyOut = cv2.imread(CannyBaselinePath,0) #rEading gray scale Sobel baseline
plt.imshow(CannyOut,cmap="gray")



1. Combine responses to get pb-lite output
2. Display PbLite and save image as PbLite_ImageName.png
3. use command "cv2.imwrite(...)"



In [None]:
PbLite = np.multiply((textronGrad_Avg + BrightnessGrad_Avg + ColorGrad_Avg)/3 , 0.5*(CannyOut + sobelOut))
cv2.imwrite(Filter_Out + "PbLite.png", PbLite)
plt.imshow(PbLite,cmap="gray")

## Phase 2

In [None]:
import torch
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")
print(torch.cuda.get_device_name())

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

def loss_fn(out, labels):
    ###############################################
    # Fill your loss function of choice here!
    ###############################################
    LossFn = nn.CrossEntropyLoss()
    loss = LossFn(out, labels)
    return loss

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = loss_fn(out, labels)         # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = loss_fn(out, labels)           # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'loss': loss.detach(), 'acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'loss': epoch_loss.item(), 'acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], loss: {:.4f}, acc: {:.4f}".format(epoch, result['loss'], result['acc']))


'''
###############################
****   CNN Architecture  *****
###############################
'''
class CIFAR10Model(ImageClassificationBase):
  def __init__(self, InputSize, OutputSize):
      """
      Inputs: 
      InputSize - Size of the Input
      OutputSize - Size of the Output
      """
      #############################
      # Fill your network initialization of choice here!
      #############################
      super().__init__()
      ''' CNN model'''
      self.layer1 = nn.Sequential(
          nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(64),
          nn.ReLU())
      self.layer2 = nn.Sequential(
          nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(64),
          nn.ReLU(), 
          nn.MaxPool2d(kernel_size = 2, stride = 2))
      self.layer3 = nn.Sequential(
          nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(128),
          nn.ReLU())
      self.layer4 = nn.Sequential(
          nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(128),
          nn.ReLU(),
          nn.MaxPool2d(kernel_size = 2, stride = 2))
      self.fc = nn.Sequential(
          nn.Linear(8*8*128, 4096),
          nn.ReLU())
      self.fc1= nn.Sequential(
          nn.Linear(4096, OutputSize))
      

  def forward(self, xb):
      """
      Input:
      xb is a MiniBatch of the current image
      Outputs:
      out - output of the network
      """
      #############################
      # Fill your network structure of choice here!
      #############################
      ''' CNN model'''
      out = self.layer1(xb)
      out = self.layer2(out)
      out = self.layer3(out)
      out = self.layer4(out)
      out = torch.flatten(out, 1)
      out = self.fc(out)
      out = self.fc1(out)
      out = F.softmax(out)
      return out

In [None]:
import time
import glob
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

def tic():
    """
    Function to start timer
    Tries to mimic tic() toc() in MATLAB
    """
    StartTime = time.time()
    return StartTime

def toc(StartTime):
    """
    Function to stop timer
    Tries to mimic tic() toc() in MATLAB
    """
    return time.time() - StartTime

def FindLatestModel(CheckPointPath):
    """
    Finds Latest Model in CheckPointPath
    Inputs:
    CheckPointPath - Path where you have stored checkpoints
    Outputs:
    LatestFile - File Name of the latest checkpoint
    """
    FileList = glob.glob(CheckPointPath + '*.ckpt.index') # * means all if need specific format then *.csv
    LatestFile = max(FileList, key=os.path.getctime)
    # Strip everything else except needed information
    LatestFile = LatestFile.replace(CheckPointPath, '')
    LatestFile = LatestFile.replace('.ckpt.index', '')
    return LatestFile


def convertToOneHot(vector, NumClasses):
    """
    Inputs:
    vector - vector of argmax indexes
    NumClasses - Number of classes
    """
    return np.equal.outer(vector, np.arange(NumClasses)).astype(np.float)

### Train your neural network

In [None]:
import torch
import torchvision
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torch.optim import AdamW
from torchvision.datasets import CIFAR10
import cv2
import sys
import os
import numpy as np
import random
import skimage
import PIL
import os
import glob
import random
from skimage import data, exposure, img_as_float
import matplotlib.pyplot as plt
import numpy as np
import time
from torchvision.transforms import ToTensor
import argparse
import shutil
import string
from termcolor import colored, cprint
import math as m
from tqdm.notebook import tqdm

def SetupAll(CheckPointPath):
    """
    Inputs: 
    CheckPointPath - Path to save checkpoints/model
    Outputs:
    SaveCheckPoint - Save checkpoint every SaveCheckPoint iteration in every epoch, checkpoint saved automatically after every epoch
    ImageSize - Size of the image
    NumTrainSamples - length(Train)
    TrainLabels - Labels corresponding to Train
    NumClasses - Number of classes
    """
    # Read and Setup Labels
    LabelsPathTrain = '/content/data/TxtFiles/LabelsTrain.txt'
    TrainLabels = ReadLabels(LabelsPathTrain)

    # If CheckPointPath doesn't exist make the path
    if(not (os.path.isdir(CheckPointPath))):
       os.makedirs(CheckPointPath)
        
    # Save checkpoint every SaveCheckPoint iteration in every epoch, checkpoint saved automatically after every epoch
    SaveCheckPoint = 100 
    
    # Image Input Shape
    ImageSize = [32, 32, 3]
    NumTrainSamples = len(TrainSet)

    # Number of classes
    NumClasses = 10

    return SaveCheckPoint, ImageSize, NumTrainSamples, TrainLabels, NumClasses


def ReadLabels(LabelsPathTrain):
    if(not (os.path.isfile(LabelsPathTrain))):
        print('ERROR: Train Labels do not exist in '+LabelsPathTrain)
        sys.exit()
    else:
        TrainLabels = open(LabelsPathTrain, 'r')
        TrainLabels = TrainLabels.read()
        TrainLabels = map(float, TrainLabels.split())

    return TrainLabels
    

def ReadDirNames(ReadPath):
    """
    Inputs: 
    ReadPath is the path of the file you want to read
    Outputs:
    DirNames is the data loaded from /content/data/TxtFiles/DirNames.txt which has full path to all image files without extension
    """
    # Read text files
    DirNames = open(ReadPath, 'r')
    DirNames = DirNames.read()
    DirNames = DirNames.split()
    return DirNames

    
def GenerateBatch(TrainSet, TrainLabels, ImageSize, MiniBatchSize):
    """
    Inputs: 
    TrainSet - Variable with Subfolder paths to train files
    NOTE that Train can be replaced by Val/Test for generating batch corresponding to validation (held-out testing in this case)/testing
    TrainLabels - Labels corresponding to Train
    NOTE that TrainLabels can be replaced by Val/TestLabels for generating batch corresponding to validation (held-out testing in this case)/testing
    ImageSize is the Size of the Image
    MiniBatchSize is the size of the MiniBatch
   
    Outputs:
    I1Batch - Batch of images
    LabelBatch - Batch of one-hot encoded labels 
    """
    I1Batch     = []
    LabelBatch = []
    
    ImageNum = 0
    while ImageNum < MiniBatchSize:
        # Generate random image
        RandIdx = random.randint(0, len(TrainSet)-1)
        I1, Label = TrainSet[RandIdx]
        ImageNum += 1
    	  ##########################################################
    	  # Add any standardization or data augmentation here!
    	  ##########################################################
        # Append All Images and Mask
        I1Batch.append(I1)
        LabelBatch.append(torch.tensor(Label))
        # Rot by 90 degree
        I1Batch.append(torch.rot90(I1, 1, [1,2]))
        LabelBatch.append(torch.tensor(Label))
        # Rot by -90 degree
        I1Batch.append(torch.rot90(I1, -1, [1,2]))
        LabelBatch.append(torch.tensor(Label))
        # Rot by 180 degree
        I1Batch.append(torch.rot90(I1, 2, [1,2]))
        LabelBatch.append(torch.tensor(Label))


    # return torch.stack(I1Batch), torch.stack(LabelBatch)   
    return torch.stack(I1Batch).to(device), torch.stack(LabelBatch).to(device)


def PrettyPrint(NumEpochs, DivTrain, MiniBatchSize, NumTrainSamples, LatestFile):
    """
    Prints all stats with all arguments
    """
    print('Number of Epochs Training will run for ' + str(NumEpochs))
    print('Factor of reduction in training data is ' + str(DivTrain))
    print('Mini Batch Size ' + str(MiniBatchSize))
    print('Number of Training Images ' + str(NumTrainSamples))
    if LatestFile is not None:
        print('Loading latest checkpoint with the name ' + LatestFile)              

def TrainOperation(TrainLabels, NumTrainSamples, ImageSize,
                   NumEpochs, MiniBatchSize, SaveCheckPoint, CheckPointPath,
                   DivTrain, LatestFile, TrainSet, LogsPath):
    """
    Inputs: 
    TrainLabels - Labels corresponding to Train/Test
    NumTrainSamples - length(Train)
    ImageSize - Size of the image
    NumEpochs - Number of passes through the Train data
    MiniBatchSize is the size of the MiniBatch
    SaveCheckPoint - Save checkpoint every SaveCheckPoint iteration in every epoch, checkpoint saved automatically after every epoch
    CheckPointPath - Path to save checkpoints/model
    DivTrain - Divide the data by this number for Epoch calculation, use if you have a lot of dataor for debugging code
    LatestFile - Latest checkpointfile to continue training
    TrainSet - The training dataset
    LogsPath - Path to save Tensorboard Logs
    Outputs:
    Saves Trained network in CheckPointPath and Logs to LogsPath
    """
    # Initialize the model
    model = CIFAR10Model(InputSize=3*32*32,OutputSize=10)
    model = model.to(device)
    
    ###############################################
    # Fill your optimizer of choice here!
    ###############################################
    # Optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    Optimizer = torch.optim.SGD(model.parameters(), lr=1e-2) # Optimizer used for base model as well as the Data Augmented model
    # Optimizer = torch.optim.SGD(model.parameters(), lr=0.001) # Optimizer used modified
    print("Optimizer Information: \n", Optimizer.state_dict)
    
    # Tensorboard
    # writer.add_graph(net,images)  # used to visualize the network
    from torchsummary import summary
    summary(model,input_size=(3,32,32))
    # Create a summary to monitor loss tensor
    Writer = SummaryWriter(LogsPath)
    if LatestFile is not None:
        CheckPoint = torch.load(CheckPointPath + LatestFile + '.ckpt')
        # Extract only numbers from the name
        StartEpoch = int(''.join(c for c in LatestFile.split('a')[0] if c.isdigit()))
        model.load_state_dict(CheckPoint['model_state_dict'])
        print('Loaded latest checkpoint with the name ' + LatestFile + '....')
    else:
        StartEpoch = 0
        print('New model initialized....')
        
    for Epochs in tqdm(range(StartEpoch, NumEpochs)):
        NumIterationsPerEpoch = int(NumTrainSamples/MiniBatchSize/DivTrain)
        for PerEpochCounter in tqdm(range(NumIterationsPerEpoch)):
            Batch = GenerateBatch(TrainSet, TrainLabels, ImageSize, MiniBatchSize)

            # Predict output with forward pass
            LossThisBatch = model.training_step(Batch)

            Optimizer.zero_grad()
            LossThisBatch.backward()
            Optimizer.step()
            
            # Save checkpoint every some SaveCheckPoint's iterations
            if PerEpochCounter % SaveCheckPoint == 0:
                # Save the Model learnt in this epoch
                SaveName =  CheckPointPath + str(Epochs) + 'a' + str(PerEpochCounter) + 'model.ckpt'
                
                torch.save({'epoch': Epochs,'model_state_dict': model.state_dict(),'optimizer_state_dict': Optimizer.state_dict(),'loss': LossThisBatch}, SaveName)
                print('\n' + SaveName + ' Model Saved...')

            result = model.validation_step(Batch)
            model.epoch_end(Epochs*NumIterationsPerEpoch + PerEpochCounter, result)
            # Tensorboard
            Writer.add_scalar('LossEveryIter', result["loss"], Epochs*NumIterationsPerEpoch + PerEpochCounter)
            Writer.add_scalar('Accuracy', result["acc"], Epochs*NumIterationsPerEpoch + PerEpochCounter)
            # If you don't flush the tensorboard doesn't update until a lot of iterations!
            Writer.flush()

        # Save model every epoch
        SaveName = CheckPointPath + str(Epochs) + 'model.ckpt'
        torch.save({'epoch': Epochs,'model_state_dict': model.state_dict(),'optimizer_state_dict': Optimizer.state_dict(),'loss': LossThisBatch}, SaveName)
        print('\n' + SaveName + ' Model Saved...')
        


# Default Hyperparameters
NumEpochs = 50
TrainSet = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=ToTensor())

DivTrain = 1.0
MiniBatchSize = 100
LoadCheckPoint = 0
CheckPointPath = "/content/Checkpoints/"
LogsPath = "/content/Logs"

# Setup all needed parameters including file reading
SaveCheckPoint, ImageSize, NumTrainSamples, TrainLabels, NumClasses = SetupAll(CheckPointPath)

# Find Latest Checkpoint File
if LoadCheckPoint==1:
    LatestFile = FindLatestModel(CheckPointPath)
else:
    LatestFile = None

# Pretty print stats
PrettyPrint(NumEpochs, DivTrain, MiniBatchSize, NumTrainSamples, LatestFile)

TrainOperation(TrainLabels, NumTrainSamples, ImageSize,
                NumEpochs, MiniBatchSize, SaveCheckPoint, CheckPointPath,
                DivTrain, LatestFile, TrainSet, LogsPath)

In [None]:
#!kill $(ps -e | grep 'tensorboard' | awk '{print $1}')
%load_ext tensorboard
%tensorboard --logdir {LogsPath} --host "0.0.0.0" --port 6006

### Test your neural network

In [None]:
from torchvision.transforms.functional import to_tensor
import cv2
import os
import sys
import glob
import random
from skimage import data, exposure, img_as_float
import matplotlib.pyplot as plt
import numpy as np
import time
from torchvision.transforms import ToTensor
import argparse
import shutil
import string
import math as m
from sklearn.metrics import confusion_matrix
from tqdm.notebook import tqdm
import torch

def SetupAll():
    """
    Outputs:
    ImageSize - Size of the Image
    """   
    # Image Input Shape
    ImageSize = [32, 32, 3]

    return ImageSize

def StandardizeInputs(Img):
    ##########################################################################
    # Add any standardization or cropping/resizing if used in Training here!
    ##########################################################################
    return Img


def ReadImages(Img):
    """
    Outputs:
    I1Combined - I1 image after any standardization and/or cropping/resizing to ImageSize
    I1 - Original I1 image for visualization purposes only
    """    
    I1 = Img
    
    if(I1 is None):
        # OpenCV returns empty list if image is not read! 
        print('ERROR: Image I1 cannot be read')
        sys.exit()
        
    I1S = StandardizeInputs(np.float32(I1))
    I1Combined = np.expand_dims(I1S, axis=0)
    I1Combined = torch.from_numpy(I1Combined) 

    return I1Combined.to(device), I1
                

def TestOperation(ImageSize, ModelPath, TestSet, LabelsPathPred):
    """
    Inputs: 
    ImageSize is the size of the image
    ModelPath - Path to load trained model from
    TestSet - The test dataset
    LabelsPathPred - Path to save predictions
    Outputs:
    Predictions written to /content/data/TxtFiles/PredOut.txt
    """
    # Predict output with forward pass, MiniBatchSize for Test is 1
    model = CIFAR10Model(InputSize=3*32*32,OutputSize=10) 
    model = model.to(device)

    CheckPoint = torch.load(ModelPath)
    model.load_state_dict(CheckPoint['model_state_dict'])
    print('Number of parameters in this model are %d ' % len(model.state_dict().items()))
    
    PredProb = []
    OutSaveT = open(LabelsPathPred, 'w')

    for count in tqdm(range(len(TestSet))): 
        Img, Label = TestSet[count]
        Img, ImgOrg = ReadImages(Img)
        # print(Img)
        # plt.imshow(ImgOrg)
        PredT = torch.argmax(model(Img)).item()

        OutSaveT.write(str(PredT)+'\n')
    OutSaveT.close()

def Accuracy(Pred, GT):
    """
    Inputs: 
    Pred are the predicted labels
    GT are the ground truth labels
    Outputs:
    Accuracy in percentage
    """
    return (np.sum(np.array(Pred)==np.array(GT))*100.0/len(Pred))

def ReadLabels(LabelsPathTest, LabelsPathPred):
    if(not (os.path.isfile(LabelsPathTest))):
        print('ERROR: Test Labels do not exist in '+LabelsPathTest)
        sys.exit()
    else:
        LabelTest = open(LabelsPathTest, 'r')
        LabelTest = LabelTest.read()
        LabelTest = map(float, LabelTest.split())

    if(not (os.path.isfile(LabelsPathPred))):
        print('ERROR: Pred Labels do not exist in '+LabelsPathPred)
        sys.exit()
    else:
        LabelPred = open(LabelsPathPred, 'r')
        LabelPred = LabelPred.read()
        LabelPred = map(float, LabelPred.split())
        
    return LabelTest, LabelPred

def ConfusionMatrix(LabelsTrue, LabelsPred):
    """
    LabelsTrue - True labels
    LabelsPred - Predicted labels
    """
    # Get the confusion matrix using sklearn.
    LabelsTrue, LabelsPred = list(LabelsTrue), list(LabelsPred)
    cm = confusion_matrix(y_true=LabelsTrue,  # True class for test-set.
                          y_pred=LabelsPred)  # Predicted class.

    # # Print the confusion matrix as text.
    # for i in range(10):
    #     print(str(cm[i, :]) + ' ({0})'.format(i))

    # # Print the class-numbers for easy reference.
    # class_numbers = [" ({0})".format(i) for i in range(10)]
    # print("".join(class_numbers))
    acc = Accuracy(LabelsPred, LabelsTrue)

    print('Accuracy: '+ str(acc), '%')
    return acc, cm

In [None]:
'''
################### Testing ###################
'''
ModelPath       = "/content/Checkpoints/"
LabelsPath      = "/content/data/TxtFiles/LabelsTest.txt"
LogsPath        = "/content/TestLogs"
PredLabelsPath  = '/content/data/TxtFiles/' # Path to save predicted labels
TestSet = CIFAR10(root='data/', train=False, transform=ToTensor())

def Train_plot(ModelPath,LabelsPath,TestSet,PredLabelsPath):
  """
  Inputs: 
  ModelPath - Path to load trained model from
  TestSet - The test dataset
  PredLabelsPath - Path to save predictions
  Outputs:
  Predictions written to /content/data/TxtFiles/
  """
  FileList = glob.glob(ModelPath + '*.ckpt') # * means all if need specific format then *.csv
  FileNames = []

  FileList = sorted(FileList)

  for fileName in FileList:
    res = fileName.split('/')
    for string in res:
      if ".ckpt" in string:
        string = string.split('.')[0]
        FileNames.append(string)
        # print(string)

  Writer = SummaryWriter(LogsPath)
  i = 0;
  for filePath in FileList:
    PredPath = PredLabelsPath + FileNames[i] + ".txt"
    ImageSize = SetupAll()
    TestOperation(ImageSize, filePath, TestSet, PredPath)
    # Plot Confusion Matrix
    LabelsTrue, LabelsPred = ReadLabels(LabelsPath, PredPath)
    Acc, CM = ConfusionMatrix(LabelsTrue, LabelsPred)
    Writer.add_scalar('Test_Accuracy', Acc,i*100)
    Writer.flush()
    i = i+1;
  plt.figure(figsize = (10,7))
  sns.heatmap(CM, annot=True, cmap="YlGnBu", fmt="d")
  print("Test Accuracy = ", Acc, "%")
  plt.show()

Train_plot(ModelPath, LabelsPath, TestSet, PredLabelsPath)

In [None]:
!kill $(ps -e | grep 'tensorboard' | awk '{print $1}')
%reload_ext tensorboard
%tensorboard --logdir {LogsPath} --host "0.0.0.0" --port 6006