## Hyperparameters

In [1]:
image_size = 512
vit_dropout = 0.2
vit_emb_dropout = 0.1
l2_reg = 1e-3
lr = 3e-5
batch_size = 16
scheduler_factor = 0.5
scheduler_patience = 3
scheduler_threshold = 1e-2
max_epochs = 50
vit_patch_size = 16
vit_num_classes = 3
vit_k = 128
vit_depth = 6
vit_heads = 8
vit_mlp = 512
vit_channels = 1

device = "cuda"

In [2]:
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from sklearn.utils import shuffle
from PIL import Image

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

plt.ion()   # interactive mode

## Definição dos diretórios:
 - base_dir: Diretório para a pasta raiz do projeto;
 - formated_dataset_dir: Diretório onde a base está armazenada.

In [3]:
base_dir = 'D:/CTR Pulmões - Doenças Respiratórias/CovidNet/' 
formated_dataset_dir = 'D:/CTR Pulmões - Doenças Respiratórias/CovidNet/CovidNet Formatada'

In [4]:
class_names = {'Normal': 0, 'Pneumonia': 1, 'Covid-19 Pneumonia': 2}

In [5]:
train_transforms = transforms.Compose(
    [
        transforms.RandomRotation(30),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.1, 0.2),
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
    ]
)
val_test_transforms = transforms.Compose(
    [
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
    ]
)


In [6]:
def get_image(image_path, act,total, transform=None):
    if(act[0]%100==0):
        print(f"{act[0]}/{total}")
    act[0]+=1
    image = Image.open(image_path)
    return image

## Carregar dataset direto pra memória

In [7]:
class CT_ScansDatset_memory(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        act = [1]
        act[0] = 0
        self.ct_scans = shuffle(pd.read_csv(csv_file, sep=' ', names=['X', 'y'], usecols=[0, 1]))
        self.ct_scans = self.ct_scans[:len(self.ct_scans)]
        self.root_dir = root_dir
        self.transform = transform
        total=len(self.ct_scans)
        self.ct_scans['X'] = self.ct_scans['X'].apply(lambda x : get_image(os.path.join(self.root_dir, x), act, total, self.transform))
        self.ct_scans['y'] = self.ct_scans['y'].apply(lambda x : np.array(x))
        
    def __len__(self):
        return len(self.ct_scans)
    
    def __getitem__(self, idx):
        image = self.ct_scans.iloc[idx, 0]
        label = self.ct_scans.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        return image, label
#         if torch.is_tensor(idx):
#             idx = idx.tolist()
#         img_path = os.path.join(self.root_dir,
#                                 self.ct_scans.iloc[idx, 0])
#         image = Image.open(img_path)
#         # image = image.reshape([1, 512, 512])
#         label = self.ct_scans.iloc[idx, 1]
#         label = np.array(label)
        
#         if self.transform:
#             image = self.transform(image)
#         return image, label

# Carregar dataset do disco, economizando memória

In [8]:
class CT_ScansDatset_disk(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.ct_scans = shuffle(pd.read_csv(csv_file, sep=' ', names=['X', 'y'], usecols=[0, 1]))
        self.root_dir = root_dir
        self.transform = transform
        
    def __len__(self):
        return len(self.ct_scans)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_path = os.path.join(self.root_dir,
                                self.ct_scans.iloc[idx, 0])
        image = Image.open(img_path)
        # image = image.reshape([1, 512, 512])
        label = self.ct_scans.iloc[idx, 1]
        label = np.array(label)
        
        if self.transform:
            image = self.transform(image)
        return image, label

In [9]:
ct_dataset_train = CT_ScansDatset_memory(csv_file = base_dir + 'Data Split/train_COVIDx-CT.txt', root_dir=formated_dataset_dir, transform=train_transforms)
ct_dataset_val = CT_ScansDatset_disk(csv_file = base_dir + 'Data Split/val_COVIDx-CT.txt', root_dir=formated_dataset_dir, transform=val_test_transforms)
ct_dataset_test = CT_ScansDatset_disk(csv_file = base_dir + 'Data Split/test_COVIDx-CT.txt', root_dir=formated_dataset_dir, transform=val_test_transforms)

0/61782
100/61782
200/61782
300/61782
400/61782
500/61782
600/61782
700/61782
800/61782
900/61782
1000/61782
1100/61782
1200/61782


KeyboardInterrupt: 

In [None]:
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.datasets import MNIST
from torchvision import transforms
from torch.utils.data import DataLoader
from vit_pytorch import ViT
from torch.optim.lr_scheduler import ReduceLROnPlateau
import pytorch_lightning as pl
from torch.utils.data import random_split
from sklearn.metrics import accuracy_score, roc_auc_score, recall_score, precision_score
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from torch.utils.tensorboard

In [None]:
ViT_Model = ViT(
    image_size = image_size,
    patch_size = vit_patch_size,
    num_classes = vit_num_classes,
    dim = vit_k,
    depth = vit_depth,
    heads = vit_heads,
    mlp_dim = vit_mlp,
    dropout = vit_dropout,
    emb_dropout = vit_emb_dropout,
    channels = vit_channels,
)

In [None]:
train_dataloader = DataLoader(ct_dataset_train, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(ct_dataset_val, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(ct_dataset_test, batch_size=batch_size, shuffle=True)

In [None]:
class LitViT(pl.LightningModule):
    def __init__(self):
        super(LitViT, self).__init__()
        self.ViT = ViT_Model
        self.num_classes = vit_num_classes
        
    def forward(self, data):
        return self.ViT(data)
    
    def training_step(self, batch, batch_nb):
        data, label = batch
        data = data.to(device)
        label = label.to(device)
        
        y_hat = self(data)

        loss = F.cross_entropy(y_hat, label)
        
        _, y_hat = torch.max(y_hat, dim=1)
        
        train_acc = torch.tensor(accuracy_score(y_hat.cpu(), label.cpu()))
        
        self.log('loss', loss)
        
        return {'loss':loss, 'train_acc': train_acc}
    
    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        avg_train_acc = torch.stack([x['train_acc'] for x in outputs]).mean()
        
        metrics = {'loss': avg_loss, 'accuracy': avg_train_acc}
        self.log_parameters('training', metrics)
        
        
    
    def validation_step(self, batch, batch_nb):
        data, label = batch
        data = data.to(device)
        label = label.to(device)
        
        y_hat = self(data)
        
        loss = F.cross_entropy(y_hat, label)
        
        a, y_hat = torch.max(y_hat, dim=1)
        val_acc = accuracy_score(y_hat.cpu(), label.cpu())
        val_precision = precision_score(y_hat.cpu(), label.cpu(), labels=[0, 1, 2], average=None)
        val_recall = recall_score(y_hat.cpu(), label.cpu(), labels=[0, 1, 2], average=None)
        try:
            val_auc = roc_auc_score(y_hat.cpu(), label.cpu(), average=None)
        except ValueError:
            val_auc = 0.0
        val_acc = torch.tensor(val_acc)
        val_precision = torch.tensor(val_precision)
        val_recall = torch.tensor(val_recall)
        val_auc = torch.tensor(val_auc)
        
        return {'val_loss': loss, 'val_acc': val_acc, 'val_precision': val_precision, 'val_recall':val_recall, \
                'val_auc': val_auc}
    
    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        avg_val_acc = torch.stack([x['val_acc'] for x in outputs]).mean()
        avg_val_precision = torch.stack([x['val_precision'] for x in outputs]).mean()
        avg_val_recall = torch.stack([x['val_recall'] for x in outputs]).mean()
        avg_val_auc = torch.stack([x['val_auc'] for x in outputs]).mean()
        
        metrics = {'loss': avg_loss, 'accuracy': avg_val_acc, 'precision': avg_val_precision, \
                            'recall': avg_val_recall, 'auc': avg_val_auc}
        
        self.log_parameters('validation', metrics)
        
        return {'progress_bar': {'val_loss': avg_loss}}
    
    def test_step(self, batch, batch_nb):
        data, label = batch
        data = data.to(device)
        label = label.to(device)
        
        y_hat = self(data)
        a, y_hat = torch.max(y_hat, dim=1)
        test_acc = accuracy_score(y_hat.cpu(), label.cpu(), labels=[0, 1, 2])
        
        return {'test_acc': torch.tensor(test_acc)}
    
    def test_epoch_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        avg_val_acc = torch.stack([x['val_acc'] for x in outputs]).mean()
        avg_val_precision = torch.stack([x['val_precision'] for x in outputs]).mean()
        avg_val_recall = torch.stack([x['val_recall'] for x in outputs]).mean()
        avg_val_auc = torch.stack([x['val_auc'] for x in outputs]).mean()
        
        metrics = {'loss': avg_loss, 'accuracy': avg_val_acc, 'precision': avg_val_precision, \
                            'recall': avg_val_recall, 'auc': avg_val_auc}
        progress_bar = {'loss': avg_loss}
        
        self.log_parameters('validation', metrics)
        
        return {'val_loss': avg_loss, 'progress_bar': progress_bar}
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam([p for p in self.parameters() if p.requires_grad], lr=lr, eps=1e-8, weight_decay=l2_reg)
        return{
            'optimizer': optimizer,
            'lr_scheduler': ReduceLROnPlateau(optimizer, factor=scheduler_factor, patience=scheduler_patience, threshold=scheduler_threshold),
            'monitor': 'loss'
        }
    
    def train_dataloader(self):
        return train_dataloader
    
    def val_dataloader(self):
        return val_dataloader
    
    def test_dataloader(self):
        return test_dataloader
    
    def log_parameters(self, group, metrics):
        for (key, value) in metrics.items():
            self.log(f'{group}/{key}', value)
        

In [None]:
# Defining Tensorboard
logger = TensorBoardLogger("tb_logs", name="ViTNoLinformer")

In [None]:
# Defining Checkpoints Callbacks
checkpoint_callback = ModelCheckpoint(
    monitor='loss',
    dirpath=os.path.join(base_dir, 'checkpoints/no_linformer'),
    filename='sample-ViT-{epoch:02d}-{val_loss:.2f}'
)

# Defining Early Stopping Callbacks
early_stopping = EarlyStopping(monitor='val_loss', min_delta=0.005)

In [None]:
model = LitViT()
model.to(device)
trainer = pl.Trainer(gpus=1, callbacks=[checkpoint_callback, early_stopping], max_epochs=max_epochs, logger=logger)
trainer.fit(model)