In [0]:
#importing libraries
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from statistics import mean 
from sklearn.model_selection import train_test_split

In [0]:
#mounting my drive
from google.colab import drive
drive.mount('/content/drive/')

In [0]:
#splitting and shuffling data into training and validation splits
SPLIT = False

PATH = F"/content/drive/My Drive/data_split_cnn_iss.pt"


if SPLIT:
  #loading data as numpy arrays
  X = np.load('/content/drive/My Drive/diff_intf/X_img_size_50.npy')
  y = np.load('/content/drive/My Drive/diff_intf/y_img_size_50.npy')
  #reading input channels and input image size of one feature-set
  INPUT_CHANNELS = X.shape[1]
  INPUT_IMG_SIZE = X.shape[2]
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
  #converting numpy arrays to tensors
  X_train = torch.Tensor([i for i in X_train]).view(-1, INPUT_CHANNELS, INPUT_IMG_SIZE, INPUT_IMG_SIZE)
  X_test = torch.Tensor([i for i in X_test]).view(-1, INPUT_CHANNELS, INPUT_IMG_SIZE, INPUT_IMG_SIZE)
  y_train = torch.Tensor([i for i in y_train])
  y_test = torch.Tensor([i for i in y_test])
  #Normalaizing X 
  X_train = X_train/255.0
  X_test = X_test/255.0
  
  
  torch.save({
            'X_train': X_train,
            'X_test': X_test,
            'y_train': y_train,
            'y_test': y_test
        }, PATH)
  
else:
  
  checkpoint = torch.load(PATH)
  
  X_train = checkpoint['X_train']
  X_test = checkpoint['X_test']
  y_train = checkpoint['y_train']
  y_test = checkpoint['y_test']

In [0]:
#class to instantnize network object
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.features1 = nn.Sequential(
            nn.Conv2d(INPUT_CHANNELS, 16, 1, 1, 1),# args(INPUT_CHANNELS, output_channels, kernerl_size, strides, padding)
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        
        self.features2 = nn.Sequential(
            nn.Conv2d(16, 16, 1, 1, 1),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        
        self.features3 = nn.Sequential(
            nn.Conv2d(16, 32, 1, 1, 1),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )
        
        self.features4 = nn.Sequential(
            nn.Conv2d(32, 64, 1, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        
        #below 3 lines of code determines the input nodes to the fully connected layer
        X = torch.randn(INPUT_CHANNELS, INPUT_IMG_SIZE, INPUT_IMG_SIZE).view(-1, INPUT_CHANNELS, INPUT_IMG_SIZE, INPUT_IMG_SIZE)
        self._to_linear = None
        self.conv(X)
        
        self.drop_layer = nn.Dropout(p=0.25)
        self.fc1 = nn.Linear(self._to_linear, INPUT_IMG_SIZE*INPUT_IMG_SIZE)
        
        
        
        
    def conv(self, X):# forward propogation through con nets only
        X = self.features1(X)
        X = self.features2(X)
        X = self.features3(X)
        X = self.features4(X)
        if self._to_linear is None:
            self._to_linear = X[0].shape[0]*X[0].shape[1]*X[0].shape[2]
            
        return X
    
    
    def forward(self, X):#forward propogation through complete network
        X = self.conv(X)
        X = X.view(-1, self._to_linear)
        X = self.drop_layer(X)
        X = self.fc1(X)
        
        return torch.sigmoid(X)

In [0]:
#instantizing network
net = Net()

In [0]:
# function to calculate metrics for current batch
def calc_metrics(y_pred, y_true):
  epsilon = 1e-7
  '''rounded each element to either 0 for less than 0.5 or 1 for greater than 0.5'''
  y_pred = torch.round(y_pred)

  
  tp = (y_true * y_pred).sum() # true positive
  fp = ((1-y_true) * y_pred).sum() # false positive
  fn = (y_true * (1-y_pred)).sum()# calculating false negative

  precision = tp / (tp + fp + epsilon)
  recall = tp / (tp + fn + epsilon )
  f1 = 2*tp / (2*tp + fp + fn + epsilon)

  return f1, precision, recall


In [0]:
#function to calculate a class_weight tensor of shape (batchsize, 1) for current batch
def calc_class_weights_tensor(batch_y):
    '''reshape y '''
    y_reshaped = batch_y.view(-1)

    '''calculate ratio of class1 to class2'''
    ratio = calc_ratio(y_reshaped)
    
    ''' generate class_weight_tensor for y with ratio'''
    class_weight_tensor = calc_class_weight_tensor(y_reshaped, ratio)
    
   
    return class_weight_tensor


def calc_ratio(y):
  matches  = [torch.round(i)==1 for i in y]
  class1 = matches.count(True)
  class2 = y.shape[0] - class1
  ratio = torch.tensor(class2/class1)
  return ratio
  

def calc_class_weight_tensor(y, ratio):
  t = torch.tensor([0, 1])
  t = t.repeat(y.shape[0]).view(-1, 2)
  t = (t==y) + 0
  t = t*(ratio-1)
  t += 1
  return (t/ratio)[:, :1]

In [0]:
#function to calculate class_weights for current batch
def calc_class_weights(batch_y):
    '''reshape y '''
    y_reshaped = batch_y.view(-1)

    '''calculate ratio of class1 to class2'''
    ratio = calc_ratio(y_reshaped)
    
    
    return ratio


def calc_ratio(y):
  matches  = [torch.round(i)==1 for i in y]
  class1 = matches.count(True)
  class2 = y.shape[0] - class1
  ratio = torch.tensor(class1/class2)
  return ratio
  

In [0]:
#class to create Macro_soft_double_F1_loss object instance
class Macro_Double_Soft_F1_Loss(nn.Module):
    
    def __init__(self, epsilon=1e-16):
        super().__init__()
        self.epsilon = epsilon
        self.class_weight = calc_class_weights(y_train)
        
    def forward(self, y_pred, y_true):
        
        tp = (y_true * y_pred).sum()
        tn = ((1 - y_true) * (1 - y_pred)).sum()
        fp = ((1 - y_true) * y_pred).sum()
        fn = (y_true * (1 - y_pred)).sum()


        f1_class1 = 2* tp / (2*tp + fp + fn + self.epsilon)
        f1_class1 = f1_class1.clamp(min=self.epsilon, max=1-self.epsilon)

        f1_class0 = 2* tn / (2*tn + fp + fn + self.epsilon)
        f1_class0 = f1_class0.clamp(min=self.epsilon, max=1-self.epsilon)

        cost_class0 = 1 - f1_class0
        cost_class1 = 1 - f1_class1
        
        cost = (self.class_weight*cost_class0 + cost_class1)/(1+ self.class_weight)

        return cost

macro_double_soft_f1_loss = Macro_Double_Soft_F1_Loss()

In [0]:
#class to create F1_loss object instance
class F1_Loss(nn.Module):
    
    def __init__(self, epsilon=1e-7):
        super().__init__()
        self.epsilon = epsilon
        
    def forward(self, y_pred, y_true,):
        
        tp = (y_true * y_pred).sum()
        fp = ((1 - y_true) * y_pred).sum()
        fn = (y_true * (1 - y_pred)).sum()


        f1 = 2*tp / (2*tp + fp + fn + self.epsilon)
        return 1 - f1

f1_loss = F1_Loss()

In [0]:
#instantizing bce loss
bceloss = nn.BCELoss(reduction = 'none')

In [0]:
#function to calculate weighted bce loss 
def calc_loss_n_acc(outputs, batch_y, class_weights):
  outputs = outputs.view(-1)
  batch_y = batch_y.view(-1)
  loss = bceloss(outputs, batch_y)
  loss_class_weighted = loss * class_weights
  loss_class_weighted = loss_class_weighted.mean()
  return loss_class_weighted

In [0]:
#instantizing f1 loss 
loss_function = f1_loss

In [0]:
#function to calculate loss and metrics
def calc_loss_n_metrics(outputs, batch_y):
  outputs = outputs.view(-1)
  loss = loss_function(outputs, batch_y)
  f1, precision, recall = calc_metrics(outputs, batch_y)
  return loss, f1, precision, recall

In [0]:
#function to pass batch through network with some additional arguments
def fwd_pass(batch_X, batch_y, train):
    
    if train:
      net.train()
      net.zero_grad()
      outputs = net(batch_X)
      loss, f1, precision, recall = calc_loss_n_metrics(outputs, batch_y)
      loss.backward()
      optimizer.step()  
    
    else:
      net.eval()
      with torch.no_grad():
        outputs = net(batch_X)
      loss, f1, precision, recall = calc_loss_n_metrics(outputs, batch_y)
    
    

    return loss, f1, precision, recall

In [0]:
# completing one epoch through the given dataset
def one_epoch(X, y, BATCH_SIZE, train=False):
  LOSS = []
  F1 = []
  PRECISION = []
  RECALL = []
  for i in tqdm(range(0, len(X), BATCH_SIZE)):
    batch_X = X[i:i+BATCH_SIZE].view(-1, INPUT_CHANNELS, INPUT_IMG_SIZE, INPUT_IMG_SIZE)
    batch_y = y[i:i+BATCH_SIZE].view(-1)
    
    loss, f1, precision, recall = fwd_pass(batch_X, batch_y, train)
    LOSS.append(loss)
    F1.append(f1)
    PRECISION.append(precision)
    RECALL.append(recall)
  LOSS=torch.mean(torch.stack(LOSS))
  F1=torch.mean(torch.stack(F1))
  PRECISION=torch.mean(torch.stack(PRECISION))
  RECALL=torch.mean(torch.stack(RECALL))

  return LOSS, F1, PRECISION, RECALL


In [0]:
#intializing optimimzer instance
optimizer = optim.Adam(net.parameters())

In [0]:
#instantizing TRAINING and VALIDATION lists to save metrics
TRAINING=[]
VALIDATION=[]

In [0]:
#function to train the network
def train(net, best_val_loss):
    BATCH_SIZE = 8
    EPOCHS = 20

    for epoch in range(EPOCHS):
        
        epoch_training_loss, epoch_training_f1, epoch_training_precision, epoch_training_recall = one_epoch(X_train, y_train, BATCH_SIZE, True)
        
        TRAINING.append([epoch_training_loss.item(), epoch_training_f1.item(), epoch_training_precision.item(), epoch_training_recall.item()])
        print('training_loss: ', epoch_training_loss)    
        print('training_f1: ', epoch_training_f1)
        print('training_precision: ', epoch_training_precision)
        print('training_recall: ', epoch_training_recall)
        
        epoch_val_loss, epoch_val_f1, epoch_val_precision, epoch_val_recall = one_epoch(X_test, y_test, BATCH_SIZE, False)
        
        VALIDATION.append([epoch_val_loss.item(), epoch_val_f1.item(), epoch_val_precision.item(), epoch_val_recall.item()])
        print('val_loss: ', epoch_val_loss)
        print('val_f1: ', epoch_val_f1)
        print('val_precision: ', epoch_val_precision)
        print('val_recall: ', epoch_val_recall)

        
        if not best_val_loss:
          best_val_loss = epoch_val_loss + 1

        is_best = epoch_val_loss < best_val_loss
        best_val_loss = min(epoch_val_loss, best_val_loss)

        if is_best:
          print('Saving latest iteration')
          torch.save({
            'model_state_dict': net.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_val_loss': best_val_loss
        }, F"/content/drive/My Drive/cnn_iss.pt" )


In [0]:
#load saved model
PATH = F"/content/drive/My Drive/cnn_iss.pt" 
checkpoint = torch.load(PATH)
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
net.load_state_dict(checkpoint['model_state_dict'])
best_val_loss=checkpoint['best_val_loss']

In [0]:
#training network
train(net, None)

In [0]:
#save training and validation metrics
torch.save({
            'training_metric': TRAINING,
            'validation_metric': VALIDATION,
        }, F"/content/drive/My Drive/cnn_iss.pt" )

In [0]:
#load saved training and validation metrics
PATH = F"/content/drive/My Drive/cnn_iss.pt.pt" 
checkpoint = torch.load(PATH)
TRAINING = checkpoint['training_metric']
VALIDATION = checkpoint['validation_metric']

In [0]:
#function to calculate accuracy of the model
def calc_acc(X, y):
  accs=[]
  for i in tqdm(range(0, len(X), 100)):
    batch_X = X[i:i+100].view(-1, 37, 50, 50)
    net.eval()
    batch_y = y[i:i+100]
    with torch.no_grad():
      o = net(batch_X)
    o = torch.round(o)
    matches  = [i==j for i, j in zip(o.view(-1), batch_y.view(-1))]
    acc = matches.count(True)/len(matches)
    accs.append(acc)
  return mean(accs)

In [0]:
#calulating accuracy for training split and validation split
training_acc = calc_acc(X_train, y_train)
validation_acc = calc_acc(X_test, y_test)
print(training_acc)
print(validation_acc)