In [1]:
#!python -m pip install -r requirements.txt

In [2]:
import pydicom
import numpy as np
import torch
from torch.utils.data import Dataset
import pandas as pd
import os
from torchvision import datasets, transforms, models
import gc
import random
import optuna
import mlflow

import torch.nn as nn
import torch.nn.functional as F

In [3]:
BATCH_SIZE = 8
NUM_EPOCHS = 200
OPT_EPOCHS = 20
OPT_STEPS = 50
NUM_CLASSES = 3
BITS = 12
IMAGE_PER_SUBJECT = 500

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
subject_file = 'subjects.csv'

<font size="5">Custom dicom dataset</font>

In [4]:
class DicomData(Dataset):
    def __init__(self, path_list, transform=None):
        self.path_list = path_list
        self.transform = transform

    def __len__(self):
        return len(self.path_list)

    def __getitem__(self, index):
        image = self.load_dicom_image(self.path_list[index][0])
        label = self.path_list[index][1]

        if self.transform:
            image = self.transform(image)

        return image, label

    def load_dicom_image(self, path):
        return np.float32(pydicom.dcmread(path).pixel_array)

def get_image_list(root, path, label):
    paths = pydicom.data.data_manager.get_files(os.path.join(root, path), '*.dcm')
    random.shuffle(paths)
    valid = []
    for path in paths:
        img = pydicom.dcmread(path)
        # Discard corrupted images when listing them.
        # It could also be done by defining custom collate_fn function for dataloader.
        # It's better to discard them before starting the training for better training performance
        if "ImageType" in img and not np.all(img.pixel_array == 0) and len(img.pixel_array.shape) == 2:
            valid.append((path, label))
            if len(valid) == IMAGE_PER_SUBJECT:
                break
    print('total: ' + str(len(paths)) + ' chosen: ' +  str(len(valid)))
    return valid


<font size="5">Data Preparation</font>

Name of the subjects, path to their data folder and their labels should be in subjects.csv.
<br>
Images are resized to (512, 512) and normalized based on their depth.

In [5]:
df = pd.read_csv(subject_file)
root_path = os.path.abspath(os.getcwd())
subjects = []
for _, subject in df.iterrows():
    paths = get_image_list(root_path, subject[1], subject[2])
    subjects.append(paths)


# Right now we only have 12 subjects! So we split them manually. 

split_index = 9
train_all, test_all = subjects[:split_index], subjects[split_index:]
train = [element for nestedlist in train_all for element in nestedlist]
valid = [element for nestedlist in test_all for element in nestedlist[:150]]
test = [element for nestedlist in test_all for element in nestedlist[150:]]

data_transform = transforms.Compose([transforms.ToTensor(), transforms.Resize(size=(512, 512)),
                                     transforms.Normalize(mean=(2**(BITS-1) - 0.5), std=(2**(BITS-1)))])

train_dataset = DicomData(train, transform=data_transform)
valid_dataset = DicomData(valid, transform=data_transform)
test_dataset = DicomData(test, transform=data_transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

total: 771 chosen: 500
total: 3037 chosen: 500
total: 3044 chosen: 500
total: 3048 chosen: 500
total: 3039 chosen: 500
total: 3746 chosen: 500
total: 2359 chosen: 500
total: 3058 chosen: 500
total: 1750 chosen: 500
total: 3032 chosen: 500
total: 804 chosen: 500
total: 3042 chosen: 500


<font size="5">Models modification and custom loss function</font>

The first layer of models needs to change because medical images have 1 channel, compared to RGB which have 3.
<br>
The classifier layer needs to be changed to match the number of classes.
<br>
Since classes are imbalanced, we test focal loss alongside cross-entropy

In [6]:
# The first layer of models needs to change because medical images have 1 channel, compared to RGB which have 3.
# The classifier layer needs to be changed to match the number of classes.

def buildResNet18Model(numClasses, dropout=0):
    
    model = models.resnet18()
    model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    numFcInputs = model.fc.in_features
    model.fc = nn.Sequential(nn.Dropout(dropout),
                             nn.Linear(numFcInputs, numClasses))
    return model


def buildEfficientNetModel(numClasses, dropout=0):
    model = models.efficientnet_b0()
    model.features[0] = nn.Sequential(nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False),
                 nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                           nn.SiLU(inplace=True))
    numFcInputs = model.classifier[1].in_features
    model.classifier = nn.Sequential(nn.Dropout(dropout),
                             nn.Linear(numFcInputs, numClasses))
    return model


# Since classes are imbalanced, we test focal loss alongside cross-entropy

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=1, logits=False, reduce=True):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.logits = logits
        self.reduce = reduce

    def forward(self, inputs, targets):
    
        CE_loss = nn.CrossEntropyLoss()(inputs, targets)

        pt = torch.exp(-CE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * CE_loss

        if self.reduce:
            return torch.mean(F_loss)
        else:
            return F_loss

<font size="5">Training loop and evaluation</font>


In [7]:
def train(model, train_loader, optimizer, criterion, epochs):
    total_step = len(train_loader)

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        for i, (images, labels) in enumerate(train_loader):  
            # Move tensors to the configured device
            images = images.to(device)
            labels = labels.to(device)
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            del images, labels, outputs
            torch.cuda.empty_cache()
            gc.collect()
            
        return total_loss/len(train_loader)
    
        # print ('Epoch [{}/{}], Loss: {:.4f}' 
        #                .format(epoch+1, epochs, running_loss/len(train_loader)))

def evaluate(model, valid_loader, criterion):
    model.eval()
    correct = 0
    total = 0
    valid_loss = 0.0
    for images, labels in valid_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        valid_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    validation_accuracy = 100 * correct / total
    validation_loss = valid_loss / total
    
    # print('validation accuracy: {:.4f} %'.format(validation_accuracy))
    # print('validation loss : {:.4f}'.format(validation_loss))
    
    return validation_accuracy, validation_loss

<font size="5">Optimizing hyperparameters using optuna</font>

Mlflow is used for experiment tracking.

In [8]:
def get_hyperparameters(trial):
    hyperparameters = {
        'model_type': trial.suggest_categorical('model_type', ['ResNet', 'EfficientNet']),
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True),
        'loss_function': trial.suggest_categorical('loss_function', ['CE', 'FL']),
        'dropout': trial.suggest_float('dropout', 0, 0.8, step=0.1),
        'optimizer': trial.suggest_categorical('optimizer', ['Adam', 'SGD']),
    }
    return hyperparameters

def objective(trial, optimization=True):
    run_name = 'opt'+ str(trial.number) if optimization else 'train'
    
    with mlflow.start_run(run_name=run_name):
        hyperparameters = get_hyperparameters(trial)
        mlflow.log_params(hyperparameters)
        
        if hyperparameters['model_type'] == 'ResNet':
            model = buildResNet18Model(NUM_CLASSES, dropout=hyperparameters['dropout'])
        else:
            model = buildEfficientNetModel(NUM_CLASSES, dropout=hyperparameters['dropout'])
    
        model = model.to(device)
        if hyperparameters['loss_function'] == 'CE':
            criterion = nn.CrossEntropyLoss()
        else:
            criterion = FocalLoss()
        
        if hyperparameters['optimizer'] == 'Adam':
            optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters['learning_rate'])
        else:
            optimizer = torch.optim.SGD(model.parameters(), lr=hyperparameters['learning_rate'])

        best_accuracy = -float('inf')
        best_loss = float('inf')

        epochs = OPT_EPOCHS if optimization else NUM_EPOCHS
        for i in range(epochs):
            train_loss = train(model, train_loader, optimizer, criterion, 1)

            
            val_accuracy, val_loss = evaluate(model, valid_loader, criterion)
            if optimization is False:
                print('Epoch [{}/{}]'.format(i+1, epochs))
                print('Train loss: {:.4f}'.format(train_loss))
                print('validation accuracy: {:.4f} %'.format(val_accuracy))
                print('validation loss : {:.4f}'.format(val_loss))
            
            mlflow.log_metric('val_accuracy', val_accuracy, step=i)
            mlflow.log_metric('val_loss', val_loss, step=i)
            
            if val_accuracy > best_accuracy:
                best_accuracy = val_accuracy
                best_loss = val_loss # This is the loss of the best accuracy, not the best loss!

                # We don't need to save model in optimization runs!
                if optimization is False:
                    mlflow.pytorch.log_model(model, 'models/best')
        
        # Since we use two different losses, we can't optimize loss.
        return best_accuracy

In [9]:
experiment_name = 'CAD-classification'
try:
    mlflow.create_experiment(experiment_name)
except:
    print('Experiment already exists')
mlflow.set_experiment(experiment_name)

study = optuna.create_study(study_name='CAD-classification', direction='maximize')
study.optimize(objective, n_trials=OPT_STEPS)

<font size="5">Training with best hyperparameters</font>

After finding the best hyperparameters, a model is trained and saved using Mlflow.

In [10]:
objective(study.best_trial, optimization=False)