# IF5200 - Modeling Notebook
___
Group: 8<br>
Project: Automated Chest X-Ray Report Generator in Bahasa Indonesia with the Use of Deep Learning<br>
Team members: Arief Purnama Muharram, Hollyana Puteri Haryono, Abassi Haji Juma

## A. Print library version

In [None]:
print('tqdm version:', __import__('tqdm').__version__)
print('matplotlib version:', __import__('matplotlib').__version__)
print('seaborn version:', __import__('seaborn').__version__)
print('pandas version:', __import__('pandas').__version__)
print('scikit-learn version:', __import__('sklearn').__version__)
print('pillow version:', __import__('PIL').__version__)
print('torch version:', __import__('torch').__version__)
print('torchvision version:', __import__('torchvision').__version__)

## B. Helpers

### 1. TrainUtils class

In [None]:
import torch
from torch import nn, optim
from tqdm import tqdm


class TrainUtils:
    
    def __init__(self, model, 
                 loss_fn: str, 
                 optimizer: str, 
                 learning_rate: float = 1e-3, 
                 device: str = None):
        
        super(TrainUtils, self).__init__()
        
        # Set model
        self.model = model
        
        # Set loss function
        if loss_fn not in ['CrossEntropyLoss']:
            raise ValueError('Loss function is not supported!')
        else:
            if loss_fn == 'CrossEntropyLoss':
                self.loss_fn = nn.CrossEntropyLoss()
        
        # Set optimizer
        if optimizer not in ['Adam', 'SGD']:
            raise ValueError('Optimizer is not supported!')
        else:
            if optimizer == 'Adam':
                self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
            elif optimizer == 'SGD':
                self.optimizer = optim.SGD(model.parameters(), lr=learning_rate)
        
        # Set device
        if device is not None:
            self.device = torch.device(device)
            print('Using GPU!\n')
        else:
            self.device = torch.device('cpu')
            print('Using CPU!\n')

    def train(self, dataloader, 
              print_log: bool = False):
        
        model = self.model
        loss_fn = self.loss_fn
        optimizer = self.optimizer
        device = self.device
        
        loss_history = []
        
        for batch, (X, y) in enumerate(tqdm(dataloader)):
            # Switch to train mode
            model.train()
            
            # Send tensors to the device
            X, y, model = X.to(device), y.to(device), model.to(device)
            
            # Compute loss (error)
            pred = model(X)
            loss = loss_fn(pred, y)
            
            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Append batch loss history
            if batch % 100 == 0:
                loss_history.append([batch, loss])
                
        # Print loss history
        if print_log == True:
            print('Loss over batches:')
            print(' Batch\tLoss')
            for item in loss_history:
                print(f' {item[0]}\t{item[1]:>7f}')
    
        # Return loss history
        return (loss_history)

    def test(self, dataloader, 
             print_log: bool = False):
        
        model = self.model
        loss_fn = self.loss_fn
        device = self.device
        
        size = len(dataloader.dataset)
        num_batches = len(dataloader)
        
        # Switch to eval mode
        model.eval()
        
        test_loss, correct = 0, 0
        
        with torch.no_grad():
            for X, y in tqdm(dataloader):
                # Send tensors to the device
                X, y, model = X.to(device), y.to(device), model.to(device)
                
                # Make prediction
                pred = model(X)
            
                test_loss += loss_fn(pred, y).item()
                correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    
        test_loss /= num_batches
        correct /= size
        
        # Print test accuracy and test lost
        if print_log == True:
            print(f'Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f}')
        
        # Return test accuracy
        return (correct)

### 2. build_model function

In [None]:
from torch import nn


def build_model(pretrained, 
                d_class: int = 2):
    
    # Load model
    model = pretrained
    
    if d_class <= 1:
        raise ValueError('Can not less than 2 classes!')
    
    # Setup final classification layer
    model.fc = nn.LazyLinear(d_class)
    
    return model

### 3. PreprocessDataLoader class

In [None]:
from torch.utils.data import Dataset
from torchvision import transforms as T
from PIL import Image


class PreprocessDataLoader(Dataset):
    
    def __init__(self, data, image_path):
        
        super(PreprocessDataLoader, self).__init__()
        
        self.data = data
        self.image_path = image_path
        self.transform = T.Compose([
            T.ToTensor(),
            T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    
    def __len__(self):
        
        return len(self.data)
    
    def __getitem__(self, idx):
        
        data = self.data.iloc[:, 0:len(self.data)].iloc[idx]
        
        # Image preprocessing
        image = data['image']
        image = Image.open(f'{self.image_path}/{image}').convert('RGB')
        image = self.transform(image)
        
        label = data['label']
        
        return (image, label)

### 4. train_wrapper function

In [None]:
MODEL_DIR = 'models/'
LOG_DIR = 'logs/'

In [None]:
import os
import time as timer
from tqdm import tqdm
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns


def train_wrapper(model, trainer, 
                  train_dataloader, val_dataloader, test_dataloader, 
                  epochs=10, saved_model_name='model.pth', log_name='log.txt'):
    
    model_path = os.path.join(MODEL_DIR)
    log_path = os.path.join(LOG_DIR)
    fig_path = os.path.join(LOG_DIR)
    
    if not os.path.exists(model_path):
        os.makedirs(model_path)
        
    if not os.path.exists(log_path):
        os.makedirs(log_path)
        
    model_path = os.path.join(model_path, saved_model_name)
    log_path = os.path.join(log_path, log_name)
    
    train_history = []
    
    with open(log_path, 'w') as fh:
        
        # Write log header
        fh.write('epoch\ttrain_acc\ttest_acc\n')
        
        for epoch in range(epochs):
            
            # Print epoch status
            print(f"Epoch {epoch+1} out of {epochs}\n ------------")
            
            start = timer.time()
            
            # Train model
            trainer.train(train_dataloader, print_log=False)  
            
            # Get elapsed time
            elapsed_time = timer.time() - start
            print(f"Training time: {elapsed_time:>.2f} seconds")
            
            start = timer.time()
            
            # Evaluate model: get training accuracy 
            train_accuracy = trainer.test(val_dataloader, print_log=False)
            
            # Get elapsed time
            elapsed_time = timer.time() - start
            print(f"Validation time: {elapsed_time:>.2f} seconds")
            
            start = timer.time()
            
            # Evaluate model: get testing accuracy
            test_accuracy = trainer.test(test_dataloader, print_log=False)
            
            # Get elapsed time
            elapsed_time = timer.time() - start
            print(f"Testing time: {elapsed_time:>.2f} seconds")
            
            # Append epoch train history
            train_history.append([epoch, train_accuracy, test_accuracy])
            
            # Write training log
            fh.write(f'{epoch}\t{train_accuracy}\t{test_accuracy}\n')
            
            # Save model
            torch.save(model.state_dict(), model_path)
            print(f"Model {model_path} stored!\n")
            
    train_history = pd.DataFrame(train_history, columns=['epoch', 'train_acc', 'test_acc'])
    train_history['epoch'] = train_history['epoch'].apply(lambda x: str(x))
    
    # Plot accuracy
    plt.figure()
    sns.lineplot(data=train_history, x='epoch', y='train_acc', label='Train Accuracy', color='#5f0f40')
    sns.lineplot(data=train_history, x='epoch', y='test_acc', label='Test Accuracy', color='#fb8b24')
    plt.title(f'{saved_model_name} Accuracy History over Epochs\n', fontdict={
        'fontsize': 15, 'fontweight': 'bold'
    })
    plt.xlabel('Epoch', fontdict={
        'fontsize': 10
    })
    plt.ylabel('Accuracy', fontdict={
        'fontsize': 10
    })
    plt.savefig(os.path.join(fig_path, f'{log_name.split(".")[0]}.png'))
    plt.show()
    
    print("Done!")

## C. Modeling

In [None]:
# Experiment global variables
LEARNING_RATE = 1e-3
EPOCHS = 5
BATCH_SIZE = 64

### 1. Load labels

In [None]:
import pandas as pd


df_labels = pd.read_csv('datasets/labels_cxr-images.csv', sep=',')
df_labels.info()

In [None]:
df_labels.head(3)

### 2. Experiment #1: Cardiomegaly

In [None]:
from sklearn.model_selection import train_test_split


train_dataset, test_dataset = train_test_split(
    df_labels, test_size=0.3, random_state=0
)

train_dataset, val_dataset = train_test_split(
    train_dataset, test_size=0.3, random_state=0
)

def prepare_dataset(dataset, image_path: str = 'images'):
    dataset = dataset[['Filename_Segment2', 'Cardiomegaly']]
    dataset = dataset.rename(columns={'Filename_Segment2': 'image', 'Cardiomegaly': 'label'}, errors='ignore')
    dataset = PreprocessDataLoader(dataset, image_path)
    return dataset

train_dataset = prepare_dataset(train_dataset, 'datasets/data_cxr-images_128x128')
val_dataset = prepare_dataset(val_dataset, 'datasets/data_cxr-images_128x128')
test_dataset = prepare_dataset(test_dataset, 'datasets/data_cxr-images_128x128')

In [None]:
from torch.utils.data import DataLoader


train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
from torch import device
from torchvision.models import resnet50, ResNet50_Weights

model = build_model(resnet50(weights=ResNet50_Weights.DEFAULT))
trainer = TrainUtils(model, 'CrossEntropyLoss', 'Adam', learning_rate=LEARNING_RATE, device=device('cuda:3'))

train_wrapper(model, trainer,
             train_dataloader=train_dataloader,
             val_dataloader=val_dataloader,
             test_dataloader=test_dataloader,
             epochs=EPOCHS,
             saved_model_name='model_dim-128x128_loss-crossentropy_optim-adam_lr-1e-3_batch-64.pth',
             log_name='model_dim-128x128_loss-crossentropy_optim-adam_lr-1e-3_batch-64.txt')