In [None]:
import os
import cv2
import sys
import torch
import numpy as np
import scipy.io
import argparse 

import torchvision
import torchvision.transforms as transforms
from PIL import Image

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import pandas as pd
import seaborn as sns

In [None]:
from torch.utils.data import DataLoader, ConcatDataset
from sklearn.model_selection import KFold, StratifiedKFold

In [None]:
def reset_weights(m):
  '''
    Resetting model weights to avoid weight leakage.
  '''
  for layer in m.children():
   if hasattr(layer, 'reset_parameters'):
    print(f'Reset trainable parameters of layer = {layer}')
    layer.reset_parameters()

In [None]:
 def load_dataset(data_path):
    transformation = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    full_dataset = torchvision.datasets.ImageFolder(
        root=data_path,
        transform=transformation
    )
    
    
    train_size = int(0.7 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])
    
    return ConcatDataset([train_dataset, test_dataset])

In [None]:
class Net(nn.Module):
    def __init__(self, num_classes=5):
        super(Net, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2)
        
        self.conv3 = nn.Conv2d(in_channels=24, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        
        self.drop = nn.Dropout2d(p=0.2)
        self.fc = nn.Linear(in_features=8 * 8 * 128, out_features=5)

    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))  
        x = F.relu(self.pool(self.conv2(x)))          
        x = F.relu(self.pool(self.conv3(x)))  
        x = F.relu(self.pool(self.conv4(x)))          
        x = F.dropout(self.drop(x), training=self.training)
        x = x.view(-1, 8 * 8 * 128)
        x = self.fc(x)
        return torch.log_softmax(x, dim=1)

In [None]:
device = "cpu"
if (torch.cuda.is_available()):
    device = "cuda"

print('Training on', device)

def train(model, device, train_loader, optimizer, epoch):
    model.train()
    train_loss = 0
    print("Epoch:", epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = loss_criteria(output, target)
        train_loss += loss.item()
        
        loss.backward()
        optimizer.step()
        
        print('\tTraining batch {} Loss: {:.6f}'.format(batch_idx + 1, loss.item()))
    avg_loss = train_loss / (batch_idx+1)
    print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        batch_count = 0
        samples_count = 0
        for data, target in test_loader:
            batch_count += 1
            samples_count += len(target)
            data, target = data.to(device), target.to(device)
            output = model(data)
            
            test_loss += loss_criteria(output, target).item()
            _, predicted = torch.max(output.data, 1)
            correct += torch.sum(target==predicted).item()

    avg_loss = test_loss / batch_count
    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(avg_loss, correct, samples_count, 100. * correct / samples_count))
    
    return avg_loss, 100. * correct / samples_count

loss_criteria = nn.CrossEntropyLoss()

k_folds = 5
num_epochs = 5
results = {}

epoch_nums = []
training_loss = []
validation_loss = []
complete_dataset = load_dataset('/content/data/')

kfold = KFold(n_splits=k_folds, shuffle=True)

print('--------------------------------')

for fold, (train_ids, test_ids) in enumerate(kfold.split(complete_dataset)):

  model = Net(num_classes=5).to(device)

  optimizer = optim.Adam(model.parameters(), lr=0.001)

  print(f'FOLD {fold}')
  print('--------------------------------')
  
  train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
  test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
  
  train_loader = torch.utils.data.DataLoader(
                    complete_dataset, 
                    batch_size=25, sampler=train_subsampler)
  test_loader = torch.utils.data.DataLoader(
                    complete_dataset,
                    batch_size=25, sampler=test_subsampler)

  model.apply(reset_weights)
  for epoch in range(0, num_epochs):

    print(f'Starting epoch {epoch+1}')

    train_loss = train(model, device, train_loader, optimizer, epoch+1)
    epoch_nums.append(epoch)
    training_loss.append(train_loss)

  print('Training process has finished. Saving trained model.')
  print('Starting testing')

  test_loss, testaccuracy = test(model, device, test_loader)
  validation_loss.append(test_loss)
  results[fold] = testaccuracy


print(f'K-FOLD CROSS VALIDATION RESULTS FOR {k_folds} FOLDS')
print('--------------------------------')
sum = 0.0
for key, value in results.items():
  print(f'Fold {key}: {value} %')
  sum += value
print(f'Average: {sum/len(results.items())} %')

In [None]:
plt.figure(figsize=(5,5))

for fold in range(k_folds):
  plt.plot(epoch_nums[k_folds * fold : k_folds * fold + num_epochs], training_loss[k_folds * fold : k_folds * fold + num_epochs])

plt.plot(epoch_nums[:num_epochs], validation_loss)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['training', 'validation'], loc='upper right')
plt.show()


classes = ["L76","N78","Others","R82","Slash47"]
truelabels = []
predictions = []
model.eval()
print("Getting predictions from test set...")
for data, target in test_loader:
  for label in target.data.numpy():
    truelabels.append(label)
  for prediction in (model(data.cuda()).data).cpu().numpy().argmax(1):
    predictions.append(prediction) 

# Plot the confusion matrix
cm = confusion_matrix(truelabels, predictions)
tick_marks = np.arange(len(classes))

df_cm = pd.DataFrame(cm, index = classes, columns = classes)
plt.figure(figsize = (10,10))
sns.heatmap(df_cm, annot=True, cmap=plt.cm.Blues, fmt='g')
plt.xlabel("Predicted Shape", fontsize = 20)
plt.ylabel("True Shape", fontsize = 20)
plt.show()