# Imports

In [17]:
import numpy as np
from os import listdir
from os.path import isfile
from os.path import isdir
from os.path import join
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import shutil
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets

import copy
import random
import time

from sklearn import metrics
from sklearn import decomposition
from sklearn import manifold

from torch.utils.data import TensorDataset, DataLoader, WeightedRandomSampler

import wandb

Misc functions

In [37]:
def load_data(path):
  X = []
  Y = []

  onlyDirs = [f for f in listdir(path) if isdir(join(path, f))]

  for category in onlyDirs:
    pos_y = onlyDirs.index(category)
    print(category, pos_y)
    tempDirInput = join(path, category)
    for f in listdir(tempDirInput):
      inputPath = (join(tempDirInput, f))
      if isfile(inputPath):
        X.append(np.load(inputPath))
        one_hot = np.zeros((len(onlyDirs)))
        one_hot[pos_y] = 1
        Y.append(one_hot)

  return X, Y

In [38]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [39]:
def calculate_accuracy(y_pred, y):
    
    top_pred = y_pred.argmax(1, keepdim = True)
    
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [40]:
# About metrics.
# Metric dictionary keys 
_loss            = 'Loss'
_accuracy        = 'Accuracy'
_accuracyClass   = 'Accuracy class'
_confusionMatrix = 'Confusion matrix'
_groundtruth     = 'Groundtruth'
_predictions     = 'Predictions'
# Get a clean dictionary for the metrics.
def getMetricsDict(config):
    return {
        _loss            : torch.tensor(0.),
        _accuracy        : torch.tensor(0.),
        _accuracyClass   : torch.zeros(config[classesLen]),
        _confusionMatrix : torch.zeros((config[classesLen], config[classesLen]), dtype=torch.int),
        _groundtruth     : torch.tensor([]),
        _predictions     : torch.tensor([])
    }

In [41]:
# Pretty print the metrics dictionaries.
def printMetricsDict(metricsResults, config):
    # Build accuracy by class.
    accuracyClassStr = ''
    for i, _class in enumerate(config[classes]):
        accuracyClassStr += '{}: {:2.2f}%'.format(_class, metricsResults[_accuracyClass][i] * 100)
        accuracyClassStr += ', '
    accuracyClassStr = accuracyClassStr[:-2]

    print('Loss: {:.4f}, Accuracy: {:2.2f}% ({})'.format(metricsResults[_loss], metricsResults[_accuracy] * 100, accuracyClassStr))

In [42]:
# Function used to update the dictionary of resulting metrics.
def updateRunningMetrics(outputs, groundtruth, loss, batchAmount, metricsResults, config):
    # Accumulate the loss.
    metricsResults[_loss] += loss.cpu() / batchAmount
    # Accumulate the confusion matrix.
    confusionMatrix = getConfusionMatrix(outputs, groundtruth, config)
    metricsResults[_confusionMatrix] += confusionMatrix
    metricsResults[_groundtruth] = torch.cat((metricsResults[_groundtruth], groundtruth)) 
    metricsResults[_predictions] = torch.cat((metricsResults[_predictions], outputs))

# Function used to process the dictionary of resulting metrics (make final calculations).
def processRunningMetrics(metricsResults):
    # Get the total of samples processed by class.
    classTotal = torch.sum(metricsResults[_confusionMatrix], 1)
    # Get the total of samples correctly classified by class.
    classCorrect = torch.diagonal(metricsResults[_confusionMatrix])

    # Get the total accuracy, correct total samples / total samples.
    metricsResults[_accuracy] = torch.sum(classCorrect) / torch.sum(classTotal)
    # Get the total accuracy, by class.
    metricsResults[_accuracyClass] = classCorrect / classTotal

In [43]:
# This functions process an metrics result dictionary for wandb. Is necessary to indicte
#   the metrics origin, training or testing.
def processMetricsWandb(metricsResults, config, training=False):
    # Get the prefix to log on wandb, the keys must be different.
    resultsType = 'training' if training else 'testing'

    # All the wandb keys are based in the original metrics results keys.
    lossKey = '{} ({})'.format(_loss, resultsType)
    accuracyKey = '{} ({})'.format(_accuracy, resultsType)
    accuracyClassKeys = ['{} accuracy ({})'.format(_class, resultsType) for _class in config[classes]]

    confusionMatrixKey = '{} ({})'.format(_confusionMatrix, resultsType)
    heatMap = wandb.sklearn.plot_confusion_matrix(torch.argmax(metricsResults[_groundtruth], axis=1), torch.argmax(metricsResults[_predictions], axis=1), config[classes])

    # Make the dictionary for wandb and store the values.
    wandbDict = {
        lossKey             : metricsResults[_loss].item(),
        accuracyKey         : metricsResults[_accuracy].item(),
        confusionMatrixKey  : heatMap
    }
    for i in range(config[classesLen]):
        wandbDict[accuracyClassKeys[i]] = metricsResults[_accuracyClass][i].item()

    # Return, to log later.
    return wandbDict
    
# Get the metrics dictionaries for wandb and log them.
def logMetricsWandb(trainMetricsResults, testMetricsResults):
    # Get both dictionaries for wandb.
    wandbTrainDict = processMetricsWandb(trainMetricsResults, training=True)
    wandbTestDict  = processMetricsWandb(testMetricsResults, training=False)

    # Merge the dictionaries.
    wandbDict = {**wandbTrainDict, **wandbTestDict}

    # Log on wandb
    wandb.log(wandbDict)

# Calculate the confusion matrix.


In [44]:
# Function to get the confusion matrix values.
def getConfusionMatrix(outputs, groundtruth, config):
    # Init the confusion matrix.
    confusionMatrix = torch.zeros((config[classesLen], config[classesLen]), dtype=torch.int)

    # Obtain the predictions (the greater number, because we use a one hot vector).
    _, predicted = torch.max(outputs, 1)

    # Iterate the predictions.
    for i in range(predicted.shape[0]):
        # Add 1 based on the prediction done for a specific label.
        
        confusionMatrix[torch.argmax(groundtruth[i])][predicted[i]] += 1

    return confusionMatrix

Paths dir

In [28]:
# Folder paths
histOutputDir = 'data/pp/hist'

run_ID = 0

# Load training and testing set

In [29]:
X_test, y_test = load_data(os.path.join(histOutputDir,'test'))
X_train, y_train = load_data(os.path.join(histOutputDir,'train'))

COVID 0
Lung_Opacity 1
Normal 2
Viral_Pneumonia 3
COVID 0
Lung_Opacity 1
Normal 2
Viral_Pneumonia 3


Global parameters

In [45]:
# Input size of the model.
inputSize = 'inputSize'
# Output size of the model.
outputSize = 'outputSize'
# Batch size.
batchSize = 'batchSize'
# Epochs amount.
epochs = 'epochs'
# Learning rate.
learningRate = 'learningRate'
#FC_Layer
fc_size_1 = 'fc_size_1'
fc_size_2 = 'fc_size_2'
fc_size_3 = 'fc_size_3'
fc_size_4 = 'fc_size_4'
fc_size_5 = 'fc_size_5'
fc_size_6 = 'fc_size_6'
# Class names.
classes = 'classes'
# Number of classes to classify.
classesLen = 'classesLen'
name = 'name'

upSampling = 'upSampling'

exp_1 = {
    epochs        :   21,
    learningRate :   0.0001,
    batchSize    :   32,
    inputSize     :   256,
    fc_size_1     :   256,
    fc_size_2     :   128,
    fc_size_3     :   64,
    fc_size_4     :   32,
    fc_size_5     :   16,
    fc_size_6     :   8,
    outputSize      :   4,
    classes      : ['COVID', 'Lung Opacity', 'Normal', 'Viral Pneumonia'],
    classesLen     : 4,
    upSampling     : False,
    name           : 'WithoutUpsampling_32Batch'
}
exp_2 = {
    epochs        :   21,
    learningRate :   0.0001,
    batchSize    :   32,
    inputSize     :   256,
    fc_size_1     :   256,
    fc_size_2     :   128,
    fc_size_3     :   64,
    fc_size_4     :   32,
    fc_size_5     :   16,
    fc_size_6     :   8,
    outputSize      :   4,
    classes      : ['COVID', 'Lung Opacity', 'Normal', 'Viral Pneumonia'],
    classesLen     : 4,
    upSampling     : True,
    name           : 'WithUpsampling_32Batch'
}
exp_3 = {
    epochs        :   21,
    learningRate :   0.0001,
    batchSize    :   16,
    inputSize     :   256,
    fc_size_1     :   256,
    fc_size_2     :   128,
    fc_size_3     :   64,
    fc_size_4     :   32,
    fc_size_5     :   16,
    fc_size_6     :   8,
    outputSize      :   4,
    classes      : ['COVID', 'Lung Opacity', 'Normal', 'Viral Pneumonia'],
    classesLen     : 4,
    upSampling     : False,
    name           : 'WithoutUpsampling_16Batch'
}
exp_4 = {
    epochs        :   21,
    learningRate :   0.0001,
    batchSize    :   16,
    inputSize     :   256,
    fc_size_1     :   256,
    fc_size_2     :   128,
    fc_size_3     :   64,
    fc_size_4     :   32,
    fc_size_5     :   16,
    fc_size_6     :   8,
    outputSize      :   4,
    classes      : ['COVID', 'Lung Opacity', 'Normal', 'Viral Pneumonia'],
    classesLen     : 4,
    upSampling     : True,
    name           : 'WithUpsampling_16Batch'
}

configs = [exp_1, exp_2, exp_3, exp_4]

## Create a Data Loader

### UpSampling

In [46]:
def upSamplingData(config):
    if config[upSampling]:
        dic_values = {}
        weight = {}
        for class_type in np.unique(y_train, axis=0):
            dic_values[int(np.argmax(class_type))] = len(np.where((y_train == class_type).all(axis=1))[0])
            weight[int(np.argmax(class_type))] = float(1.0/len(np.unique(y_train, axis=0)))/dic_values[int (np.argmax(class_type))]

        print("dic_values", dic_values)
        print("weight", weight)

        samples_weight = np.array([weight[int(np.argmax(t))] for t in y_train])
        samples_weight = torch.from_numpy(samples_weight)
        sampler = WeightedRandomSampler(samples_weight.type('torch.DoubleTensor'), len(samples_weight))

        return sampler
    return None

In [47]:
#Sampler option is mutually exclusive with shuffle option.
def setShuffle(config):
    if config[upSampling]:
        return False
    return True
    

In [57]:
def createDataLoaders(config):
        tensor_x = torch.Tensor(X_train) 
        tensor_y = torch.Tensor(y_train)
        my_dataset = TensorDataset(tensor_x,tensor_y) 
        trainingSetLoader = DataLoader(my_dataset,
                shuffle=setShuffle(config),
                sampler= upSamplingData(config),
                batch_size=config[batchSize]) 

        tensor_x_test = torch.Tensor(X_test) 
        tensor_y_test = torch.Tensor(y_test)
        my_dataset_test = TensorDataset(tensor_x_test,tensor_y_test) 
        testSetLoader = DataLoader(my_dataset_test,
                shuffle=True,
                batch_size=config[batchSize]) 
        return (trainingSetLoader, testSetLoader)


Check CUDA is available

In [49]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cpu


## Model

In [50]:
class MLP(nn.Module):
    def __init__(self, input_dim, output_dim, config):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, config[fc_size_1]),
            nn.ReLU(),
            nn.Linear(config[fc_size_1], config[fc_size_2]),
            nn.ReLU(),
            nn.Linear(config[fc_size_2], config[fc_size_3]),
            nn.ReLU(),
            nn.Linear(config[fc_size_3], config[fc_size_4]),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(config[fc_size_4], config[fc_size_5]),
            nn.ReLU(),
            nn.Linear(config[fc_size_5], config[fc_size_6]),
            nn.ReLU(),
            nn.Linear(config[fc_size_6], output_dim),
            nn.Softmax()
            )
    
    def forward(self, x):
        return self.layers(x)
    

### Training method.
This method takes care of a single training pass. Another function call this one multiple times.

In [51]:
def trainEpoch(dataloader, model, criterion, optimizer, config):
    # Metrics for training.
    metricsResults = getMetricsDict(config)

    # Enable the grad, for training.
    with torch.set_grad_enabled(True):

        # Indicate that the model is going to be trained.
        model.train()

        # Loader len, for metrics calculation.
        loaderLen = len(dataloader)

        # Iterate the batches for training.
        for batch in dataloader:
            # Train the model.
            # Get the inputs and labels, and move them to the selected device.
            inputs, labels = batch[0].to(device), batch[1].to(device)
            # Zero the gradient parameters.
            optimizer.zero_grad()
            # Get the predictions.
            outputs = model(inputs)
            # Calculate the error.
            labels_argmax = torch.empty(labels.shape[0], dtype=torch.long)
            labels_argmax = torch.argmax(labels, dim=1)
            loss = criterion(outputs, labels_argmax)
            # Calculates the derivatives of the parameters that have a gradient.
            loss.backward()
            # Update the parameters based on the computer gradient.
            optimizer.step()
            # Metrics for the training set.
            updateRunningMetrics(outputs, labels, loss, loaderLen, metricsResults, config)

    return metricsResults

### Evaluation method.
This method evaluates the model for a specified dataset.

In [52]:
def evaluate(dataloader, model, criterion, config):
    # Metrics for testing.
    metricsResults = getMetricsDict(config)

    # Enable the grad, for training.
    with torch.set_grad_enabled(False):

        # Indicate that the model is going to be evaluated.
        model.eval()

        # Loader len, for metrics calculation.
        loaderLen = len(dataloader)

        # Iterate the batches for testing.
        for batch in dataloader:
            # Test the model.
            # Get the inputs and labels, and move them to the selected device.
            inputs, labels = batch[0].to(device), batch[1].to(device)
            # Get the predictions.
            outputs = model(inputs)
            # Calculate the error.
            labels_argmax = torch.empty(labels.shape[0], dtype=torch.long)
            labels_argmax = torch.argmax(labels, dim=1)
            loss = criterion(outputs, labels_argmax)
            # Metrics for the testing set.
            updateRunningMetrics(outputs, labels, loss, loaderLen, metricsResults, config)

    return metricsResults

### Trainining and evaluate method.
For the specific purpose of this project, in each epoch we evaluate metrics for each data set (training and testing) in each epoch, this method simplifies the process. 

In [53]:
def trainAndEvaluate(trainloader, testloader, model, criterion, optimizer, config):
    startTimeTotal = time.time()
    print('Running training')
    for epoch in range(1, config[epochs] + 1):
        
        # Train the model.
        trainMetricsResults = trainEpoch(trainloader, model, criterion, optimizer, config)
        processRunningMetrics(trainMetricsResults)

        # Evaluate the model.
        testMetricsResults = evaluate(testloader, model, criterion, config)
        processRunningMetrics(testMetricsResults)

        # Log on wandb
        logMetricsWandb(trainMetricsResults, testMetricsResults)

        # Print the results.
        if epoch %20 == 0:
            print('**', '[', 'Epoch ', epoch, ']', '*' * 48, sep='')
            print('\tTraining results:', end=' ')
            printMetricsDict(trainMetricsResults)
            print('\t Testing results:', end=' ')
            printMetricsDict(testMetricsResults)
        
    # Print time
    print('Epochs terminados')
    print("--- %s seconds ---" % (time.time() - startTimeTotal))

# Run

In [60]:
#Create the model, optimizer and criterion
run_ID += 1
for runConfig in configs:

    trainloader, testloader = createDataLoaders(runConfig)
    
    model = MLP(runConfig[inputSize], runConfig[outputSize], runConfig)
    optimizer = optim.Adam(model.parameters(),lr=runConfig[learningRate])
    criterion = nn.CrossEntropyLoss()

    #Init WandB
    name = runConfig[name] + str(run_ID)
    run = wandb.init(project='MLP', entity='tecai', config=runConfig,
                            name=name)
    wandb.watch(model)
    #Train and evaluate the model
    trainAndEvaluate(trainloader, testloader, model, criterion, optimizer, runConfig)
    #Finish WandB
    run.finish()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
wandb: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Appending key for api.wandb.ai to your netrc file: C:\Users\TEC/.netrc
  warn("The `IPython.html` package has been deprecated since IPython 4.0. "
wandb: wandb version 0.10.30 is available!  To upgrade, please run:
wandb:  $ pip install wandb --upgrade


Running training
  input = module(input)


TypeError: processRunningMetrics() takes 1 positional argument but 2 were given