# Packages Import

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision
import torchvision.transforms as transforms

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import os
import cv2
import random

from sklearn.model_selection import train_test_split
from tqdm import tqdm

from tqdm import tqdm

import config
import metrics
import utils

# Project Configs

In [1]:
from dataclasses import dataclass

@dataclass
class Config:
    # Dataset path
    DATASET_DIR: str = 'dataset/'

    # Images paths
    NORMAL_IMAGES_FOLDER: str = 'dataset/Normal/'
    TUBERCULOSIS_IMAGES_FOLDER: str = 'dataset/Tuberculosis'

    # Dataframes paths
    NORMAL_XLSX_PATH: str = 'dataset/Normal.metadata.xlsx'
    TUBERCULOSIS_XLSX_PATH: str = 'dataset/Tuberculosis.metadata.xlsx'

    # Hyperparameters
    BATCH_SIZE: int = 64
    LEARNING_RATE: float = 0.001
    NUM_EPOCHS: int = 10
    MOMENTUM: float = 0.9
    WEIGHT_DECAY: float = 1e-5


# Utils

In [10]:
class Utils:
    @staticmethod
    def trim(im):
        """
        Converts image to grayscale using cv2, then computes binary matrix
        of the pixels that are above a certain threshold, then takes out
        the first row where a certain percentage of the pixels are above the
        threshold will be the first clip point. Same idea for col, max row, max col.
        """
        percentage = 0.02

        img = np.array(im)
        img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        im = img_gray > 0.1 * np.mean(img_gray[img_gray != 0])
        row_sums = np.sum(im, axis=1)
        col_sums = np.sum(im, axis=0)
        rows = np.where(row_sums > img.shape[1] * percentage)[0]
        cols = np.where(col_sums > img.shape[0] * percentage)[0]
        min_row, min_col = np.min(rows), np.min(cols)
        max_row, max_col = np.max(rows), np.max(cols)
        im_crop = img[min_row: max_row + 1, min_col: max_col + 1]
        return Image.fromarray(im_crop)

    @staticmethod
    def plot_images_with_labels(data_df, num_images=5, random_seed=42, show_image_mode=True):
        """
        Plot some images with their corresponding labels from the given DataFrame.

        Args:
            data_df (pd.DataFrame): The DataFrame containing 'filepaths' and 'labels' columns.
            num_images (int, optional): Number of images to plot. Defaults to 5.
            random_seed (int, optional): Random seed for reproducibility. Defaults to 42.
            show_image_mode (bool, optional): Whether to show image mode (RGB or not) alongside the labels. 
                                              Defaults to True.
        """
        random.seed(random_seed)
        sampled_data = data_df.sample(n=num_images)

        num_rows = (num_images - 1) // 5 + 1
        num_cols = min(num_images, 5)

        fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 5 * num_rows))
        for idx, (filepath, label) in enumerate(zip(sampled_data['filepaths'], sampled_data['labels'])):
            image = Image.open(filepath)

            row_idx = idx // 5
            col_idx = idx % 5

            axes[row_idx, col_idx].imshow(image)
            axes[row_idx, col_idx].axis('off')

            if show_image_mode:
                is_rgb = image.mode == 'RGB'
                axes[row_idx, col_idx].set_title(f'Label: {label} | RGB: {is_rgb}')
            else:
                axes[row_idx, col_idx].set_title(f'Label: {label}')

        for idx in range(num_images, num_rows * 5):
            row_idx = idx // 5
            col_idx = idx % 5
            fig.delaxes(axes[row_idx, col_idx])

        plt.tight_layout()
        plt.show()

    @staticmethod
    def resize_and_save_images(data_df, save_dir, new_size=(224, 224)):
        """
        Resize images from the DataFrame and save them to the specified directory.

        Args:
            data_df (pd.DataFrame): The DataFrame containing 'filepaths' and 'labels' columns.
            save_dir (str): The directory path where the resized images will be saved.
            new_size (tuple, optional): The new size to which the images will be resized. Defaults to (224, 224).
        """
        # Create the save directories if they don't exist
        normal_save_dir = os.path.join(save_dir, 'Normal')
        tuberculosis_save_dir = os.path.join(save_dir, 'Tuberculosis')
        os.makedirs(normal_save_dir, exist_ok=True)
        os.makedirs(tuberculosis_save_dir, exist_ok=True)

        for filepath, label in tqdm(zip(data_df['filepaths'], data_df['labels'])):
            image = cv2.imread(filepath)
            trimmed_image = np.array(Utils.trim(image))
            resized_image = cv2.resize(trimmed_image, new_size, interpolation=cv2.INTER_CUBIC)

            if label == 'Normal':
                label_save_dir = normal_save_dir
            else:
                label_save_dir = tuberculosis_save_dir

            filename_without_ext = os.path.splitext(os.path.basename(filepath))[0]

            save_filename = f"{filename_without_ext}.png"
            save_path = os.path.join(label_save_dir, save_filename)
            cv2.imwrite(save_path, resized_image)


# Preprocessing

In [11]:
def get_filepaths_and_labels(sdir):
    filepaths = []
    labels = []
    classlist = sorted(os.listdir(sdir))
    for _class in classlist:
        classpath = os.path.join(sdir, _class)
        if os.path.isdir(classpath):
            flist = sorted(os.listdir(classpath))
            for f in tqdm(flist, ncols=130, desc=f'{_class:25s}', unit='files', colour='blue'):
                fpath = os.path.join(classpath, f)
                filepaths.append(fpath)

                if _class == 'Augmented_normal':
                    _class = 'Normal'
                elif _class == 'Augmented_tuberculosis':
                    _class = 'Tuberculosis'
                    
                labels.append(_class)
    return filepaths, labels

def create_dataframes(filepaths, labels):
    Fseries = pd.Series(filepaths, name='filepaths')
    Lseries = pd.Series(labels, name='labels')
    df = pd.concat([Fseries, Lseries], axis=1)
    return df

def split_data(df):
    train_df, dummy_df = train_test_split(df, train_size=.8, shuffle=True, random_state=123, stratify=df['labels'])
    valid_df, test_df = train_test_split(dummy_df, train_size=.5, shuffle=True, random_state=123, stratify=dummy_df['labels'])
    return train_df, test_df, valid_df

def calculate_average_image_size(df, num_samples=50):
    sample_df = df.sample(n=num_samples, replace=False)
    ht = 0
    wt = 0
    count = 0
    for i in range(len(sample_df)):
        fpath = sample_df['filepaths'].iloc[i]
        try:
            img = cv2.imread(fpath)
            h, w, _ = img.shape
            wt += w
            ht += h
            count += 1
        except:
            pass
    average_height = int(ht / count)
    average_weight = int(wt / count)
    aspect_ratio = average_height / average_weight
    return average_height,average_weight,aspect_ratio


def make_dataframes(sdir):
    filepaths, labels = get_filepaths_and_labels(sdir)
    df = create_dataframes(filepaths, labels)
    train_df, test_df, valid_df = split_data(df)
    average_height, average_weight, aspect_ratio = calculate_average_image_size(train_df)
    
    # Other statistics and information can be printed here if needed.
    class_count = len(train_df['labels'].unique())
    counts = list(train_df['labels'].value_counts())
    
    return train_df, test_df, valid_df, class_count, average_height, average_weight, aspect_ratio



# Data Augmentation

In [1]:
import cv2
import os
import random
import numpy as np

from tqdm import tqdm
from utils import trim

class Augmentation:
    def __init__(self, output_path, probability=0.4):
        self.output_path = output_path
        self.probability = probability

    def random_rotation(self, image, angle_range=(-30, 30)):
        if random.random() < self.probability:
            angle = random.uniform(angle_range[0], angle_range[1])
            h, w = image.shape[:2]
            center = (w / 2, h / 2)
            rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            image = cv2.warpAffine(image, rotation_matrix, (w, h), flags=cv2.INTER_LINEAR)
        return image

    def random_flip(self, image):
        if random.random() < self.probability:
            image = cv2.flip(image, 1)  # Horizontal flip
        return image
    
    def random_zoom(self, image, zoom_range=(0.8, 1.2)):
        if random.random() < self.probability:
            zoom_factor = random.uniform(zoom_range[0], zoom_range[1])
            h, w = image.shape[:2]

            # Calculate new zoomed image size
            new_h, new_w = int(h * zoom_factor), int(w * zoom_factor)

            # Ensure the new size does not exceed the original size
            if new_h >= h:
                new_h = h - 1
            if new_w >= w:
                new_w = w - 1

            # Resize the image
            zoomed_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)

            # Compute border size for padding
            top = (h - new_h) // 2
            bottom = h - new_h - top
            left = (w - new_w) // 2
            right = w - new_w - left

            # Pad the zoomed image to the original size
            image = cv2.copyMakeBorder(zoomed_image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0)

        return image

    def color_jittering(self, image, jitter_range=20):
        if random.random() < self.probability:
            b, g, r = cv2.split(image)
            b = np.clip(b.astype(np.int32) + random.randint(-jitter_range, jitter_range), 0, 255).astype(np.uint8)
            g = np.clip(g.astype(np.int32) + random.randint(-jitter_range, jitter_range), 0, 255).astype(np.uint8)
            r = np.clip(r.astype(np.int32) + random.randint(-jitter_range, jitter_range), 0, 255).astype(np.uint8)
            image = cv2.merge((b, g, r))
        return image


    def __call__(self, image_path,index,class_name):
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)  # Load as a color image

        # Apply augmentations
        image = self.random_rotation(image)
        image = self.random_flip(image)
        image = self.random_zoom(image)
        image = self.color_jittering(image)
        image = np.array(trim(image))
        # Save the augmented image
        filename = os.path.basename(image_path)
        output_image_path = os.path.join(self.output_path, f"Augment-{class_name}-{index}.png")
        cv2.imwrite(output_image_path, np.array(trim(image)))
        return image


def augmentation_main():
    # Define input and output directories
    input_dir = 'resized_dataset'
    output_dir_normal = os.path.join(input_dir, 'Augmented_normal')
    output_dir_tuberculous = os.path.join(input_dir, 'Augmented_tuberculosis')

    # Create output directories if they don't exist
    os.makedirs(output_dir_normal, exist_ok=True)
    os.makedirs(output_dir_tuberculous, exist_ok=True)

    # Create the Augmentation objects
    augmentor_normal = Augmentation(output_path=output_dir_normal, probability=0.6)  # Set lower probability for Normal images
    augmentor_tb = Augmentation(output_path=output_dir_tuberculous, probability=0.8)  # Set higher probability for TB images

    # Augment Normal images (randomly choose 500 samples)
    normal_images_dir = os.path.join(input_dir, 'Normal')
    normal_images = os.listdir(normal_images_dir)
    selected_normal_images = random.sample(normal_images, 500)
    num = 1
    for img_file in tqdm(selected_normal_images):
        img_path = os.path.join(normal_images_dir, img_file)
        augmented_image = augmentor_normal(img_path,index=num,class_name='Normal')
        num+=1

    # Augment Tuberculous images to have a total of 2000 augmented images
    tb_images_dir = os.path.join(input_dir, 'Tuberculosis')
    tb_images = os.listdir(tb_images_dir)
    num_tb_images = 700
    num_augmentations_needed = 2000 - num_tb_images

    for i in tqdm(range(num_augmentations_needed)):
        img_file = random.choice(tb_images)
        img_path = os.path.join(tb_images_dir, img_file)
        augmented_image = augmentor_tb(img_path,index=i+1,class_name='Tuberculosis')

  from .autonotebook import tqdm as notebook_tqdm


# Metrics

In [4]:
def true_positive(y_true,y_pred):
    return ((y_true == 1) & (y_pred == 1)).sum().item()

def true_negative(y_true,y_pred):
    return ((y_true == 0) & (y_pred == 0)).sum().item()

def false_positive(y_true,y_pred):
    return ((y_true == 0) & (y_pred == 1)).sum().item()

def false_negative(y_true,y_pred):
    return ((y_true == 1) & (y_pred == 0)).sum().item()


def precision(y_true, y_pred):
    tp = true_positive(y_true, y_pred)
    fp = false_positive(y_true, y_pred)
    return tp / (tp + fp + 1e-7)

def recall(y_true, y_pred):
    tp = true_positive(y_true, y_pred)
    fn = false_negative(y_true, y_pred)
    return tp / (tp + fn + 1e-7) 

def f1_score(y_true, y_pred):
    prec = precision(y_true, y_pred)
    rec = recall(y_true, y_pred)
    return 2 * (prec * rec) / (prec + rec + 1e-7)

def accuracy(y_true, y_pred):
    tp = true_positive(y_true, y_pred)
    tn = true_negative(y_true, y_pred)
    total_samples = y_true.shape[0]
    return (tp + tn) / total_samples



class F1ScoreCrossEntropyLoss(nn.Module):
    def __init__(self,weight=None,alpha=1.0, beta=0.5, epsilon=1e-7):
        super(F1ScoreCrossEntropyLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.epsilon = epsilon
        self.cross_entropy = nn.CrossEntropyLoss(weight=weight)

    def forward(self, y_pred, y_true):
        _, y_pred_final = torch.max(y_pred, 1)
        y_true_np = y_true.cpu().numpy()
        y_pred_np = y_pred_final.cpu().numpy()
        f1 = f1_score(y_true_np, y_pred_np)
        ce_loss = self.cross_entropy(y_pred, y_true)
        
        loss = self.alpha * ce_loss - self.beta * torch.log(torch.tensor(f1) + self.epsilon) + 1
        return loss.mean()
    
# this loss caculate new weight base on f1score each mini batch 
class F1_Loss(nn.Module):
    def __init__(self, epsilon=1e-7):
        super().__init__()
        self.epsilon = epsilon
        
    def forward(self, y_pred, y_true,):
        y_true_one_hot = F.one_hot(y_true.to(torch.int64), 2).to(torch.float32)
        
        tp = (y_true_one_hot * y_pred).sum(dim=0).to(torch.float32)
        tn = ((1 - y_true_one_hot) * (1 - y_pred)).sum(dim=0).to(torch.float32)
        fp = ((1 - y_true_one_hot) * y_pred).sum(dim=0).to(torch.float32)
        fn = (y_true_one_hot * (1 - y_pred)).sum(dim=0).to(torch.float32)

        precision = tp / (tp + fp + self.epsilon)
        recall = tp / (tp + fn + self.epsilon)

        f1 = 2* (precision*recall) / (precision + recall + self.epsilon)
        f1 = f1.clamp(min=self.epsilon, max=1-self.epsilon)
        f1=f1.detach()
        CE =torch.nn.CrossEntropyLoss(weight=( 1 - f1))(y_pred, y_true_one_hot)
        return  CE.mean()

# Dataset

In [12]:
class TBDataset(torch.utils.data.Dataset):
    def __init__(self, data_df, transform=None):
        self.data_df = data_df
        self.transform = transform if transform is not None else self._default_transform()
        self.class_labels = {'Normal': 0, 'Tuberculosis': 1}

    def __repr__(self):
        return f"TBDataset: Number of samples: {len(self)}"

    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, index):
        filepath = self.data_df.iloc[index, 0]
        label = self.data_df.iloc[index, 1]

        # Load image using PIL
        image = Image.open(filepath)

        if self.transform:
            image = self.transform(image)

        return image, label
    
    def _default_transform(self):
        return transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])


# DataLoader

In [13]:
def create_dataloader(train_transform,valid_transform,test_transform,config):
    train_df, test_df, valid_df, class_count, average_height, average_weight, aspect_ratio = make_dataframes(config.DATASET_DIR)
    train_dataset = TBDataset(train_df, transform=train_transform)
    valid_dataset = TBDataset(valid_df, transform=valid_transform)
    test_dataset = TBDataset(test_df, transform=test_transform)

    train_dataloader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=config.BATCH_SIZE,
        shuffle=True
    )

    valid_dataloader = torch.utils.data.DataLoader(
        dataset=valid_dataset,
        batch_size=config.BATCH_SIZE,
        shuffle=False
    )

    test_dataloader = torch.utils.data.DataLoader(
        dataset=test_dataset,
        batch_size=config.BATCH_SIZE,
        shuffle=False
    )

    return train_dataloader,valid_dataloader,test_dataloader

# Training

In [3]:
# Training loop
def train(model, criterion, optimizer, train_loader):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader):
        images, labels = images.to(config.DEVICE), labels.to(config.DEVICE)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        # print(loss)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    return running_loss / len(train_loader.dataset)

# Testing loop
def test(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    true_labels = []
    pred_labels = []
    with torch.no_grad():
        for images, labels in tqdm(test_loader):
            images, labels = images.to(config.DEVICE), labels.to(config.DEVICE)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            true_labels.extend(labels.cpu().numpy())
            # print("labels shape",labels.shape,"pred shape",predicted.shape)
            pred_labels.extend(predicted.cpu().numpy())
    
    avg_loss = running_loss / len(test_loader.dataset)
    true_labels = torch.tensor(true_labels)
    pred_labels = torch.tensor(pred_labels)
    acc = metrics.accuracy(true_labels, pred_labels)
    macro_f1 = metrics.f1_score(true_labels, pred_labels)

    return avg_loss, acc, macro_f1



def fit(model, criterion, optimizer, train_loader, val_loader,test_loader,num_epochs, log_file, save_dir):
    train_losses = np.zeros(num_epochs)
    val_losses = np.zeros(num_epochs)
    val_accuracies = np.zeros(num_epochs)
    val_f1_scores = np.zeros(num_epochs)

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    best_val_loss = float('inf')
    best_model_state = None

    for epoch in range(num_epochs):
        train_loss = train(model, criterion, optimizer, train_loader)
        val_loss, val_acc, macro_f1 = test(model, criterion, val_loader)
        train_losses[epoch] = train_loss
        val_losses[epoch] = val_loss
        val_accuracies[epoch] = val_acc
        val_f1_scores[epoch] = macro_f1
        utils.log_training_process(epoch, train_loss, val_loss, val_acc, macro_f1)
        print(f"Epoch {epoch+1} | train_loss: {train_loss:.3f} | val_loss: {val_loss:.3f} | val_acc: {val_acc:.3f} | f1_score: {macro_f1:.3f}")
        # Save the model with the best validation loss
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict()
            torch.save({
                "epoch":epoch,
                "model_state_dict":model.state_dict(),
                "optimizer_state_dict":optimizer.state_dict(),
                "train_loss":train_loss,
                "val_loss":val_loss,
                "val_accuracy":val_acc,
                "val_f1_score":macro_f1
            },config.LOGFILE)

    test_loss, test_acc, test_macro_f1 = test(model,criterion,test_loader)
    print(f"test_loss: {test_loss:.3f} | test_acc: {test_acc:.3f} | f1_score: {test_macro_f1:.3f}")

    print("Training completed!")
    return train_losses,val_losses,val_accuracies,val_f1_scores
