In [None]:

import kagglehub
tawsifurrahman_covid19_radiography_database_path = kagglehub.dataset_download('tawsifurrahman/covid19-radiography-database')

print('Data source import complete.')


In [None]:
# I installed torchmetrics for using Dice Score in segmentation and segmentation-models-pytorch for using Unet model
!pip install torchmetrics
!pip install segmentation-models-pytorch
# Importing libraries
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision import models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet18
import torchmetrics
from torchmetrics.segmentation import DiceScore
from torch.optim.lr_scheduler import StepLR
from segmentation_models_pytorch import Unet
from tqdm import tqdm

In [None]:
# This function is dedicated to loading image paths, mask paths, and labels from the directory.
def load_images_labels_and_masks(image_dir):
    image_paths = []
    mask_paths = []
    labels = []
    class_names = ['COVID', 'Normal', 'Viral Pneumonia', 'Lung_Opacity']


    for class_name in class_names:
        class_folder_images = os.path.join(image_dir, class_name, 'images')
        class_folder_masks = os.path.join(image_dir, class_name, 'masks')
        for filename in os.listdir(class_folder_images):
            if filename.endswith(".png") or filename.endswith(".jpg"):
                image_paths.append(os.path.join(class_folder_images, filename))
                mask_paths.append(os.path.join(class_folder_masks, filename))
                labels.append(class_name)



    return image_paths, mask_paths, labels

In [None]:
# I located and organized the paths to images, masks, and their labels within the downloaded COVID-19 Radiography dataset.
dataset_path = tawsifurrahman_covid19_radiography_database_path
image_dir = os.path.join(dataset_path, 'COVID-19_Radiography_Dataset')
image_paths, mask_paths, labels = load_images_labels_and_masks(image_dir)

In [None]:
# I used label encoder to convert text labels into numerical ones for my model
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)

In [None]:
# Then, I splitted the dataset into train(%80) and test(%20)
train_images, test_images, train_masks, test_masks, train_labels, test_labels = train_test_split(
    image_paths, mask_paths, labels, test_size=0.2, random_state=42
)

In [None]:
# In this step, I defined required transformations for my CXR photos
my_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),

])

In [None]:
# Here, I defined my costume dataset for loading and preprocessing images, masks and labels
class CovidRadiographyDataset(Dataset):
    def __init__(self, image_paths, mask_paths, labels,transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.labels = labels
        self.transform = transform

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        mask_path = self.mask_paths[index]
        label = self.labels[index]


        img = Image.open(image_path).convert('L')
        mask = Image.open(mask_path).convert('L')
        if self.transform:
            img = self.transform(img)
            mask = self.transform(mask)

        return img, mask, label



    def __len__(self):
        return len(self.image_paths)



In [None]:
# Making train and test detasets
train_dataset = CovidRadiographyDataset(train_images, train_masks, train_labels,transform=my_transform)
test_dataset = CovidRadiographyDataset(test_images, test_masks, test_labels,transform=my_transform)

In [None]:
# Making train and test dataloader with batch size of 32
test_dataloader = DataLoader(test_dataset,batch_size=32, shuffle=True)
train_dataloader = DataLoader(train_dataset,batch_size=32, shuffle=True)

In [None]:
# Verifying the right shape of images and masks for my model
print(train_dataset[0][1].shape)
print(train_dataset[0][0].shape)

For the next step I defined a Multi Task Model in order to perform segmentation and classification simultaneously. I used **the encoder of the pretrained Unet model(Resnet18) as my feature extractor.** Then, I put a classification head for predicting the class as well as the built-in segmentor head for segmenting masks. It is worthy to note that using the built-in Resnet18 helped me to only pass each photo once through the model. Therefore, number of calculations and the amount of time needed for training can be minimised to a certain level. So, I can say that, **this architecture allows the model to learn shared representations for both tasks, potentially enhancing its performance.**

In [None]:
class MultiTaskModel(nn.Module):
    def __init__(self, num_classes):
        super(MultiTaskModel, self).__init__()
        self.unet = Unet(
            encoder_name="resnet18", encoder_weights="imagenet", in_channels=1, classes=1
        )

        self.encoder = self.unet.encoder
        self.decoder = self.unet.decoder
        self.segmentation_head = self.unet.segmentation_head
        self.classification_head = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(self.encoder.out_channels[-1], num_classes)
        )

    def forward(self, x):

        encoder_features = self.encoder(x)
        class_logits = self.classification_head(encoder_features[-1])
        decoder_output = self.decoder(*encoder_features)
        segmentation_mask = self.segmentation_head(decoder_output)

        return class_logits, segmentation_mask


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)
model = MultiTaskModel(num_classes=4).to(device)
classification_loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
dice_metric = torchmetrics.segmentation.DiceScore(num_classes=1, include_background=True).to(device)


In [None]:
# I trained my model for 5 epochs
EPOCHS = 5
for epoch in range(EPOCHS):
    model.train()
    running_class_loss = 0.0
    running_seg_loss = 0.0

    for images, masks, labels in tqdm(train_dataloader):
        images, labels, masks = images.to(device), labels.to(device), masks.to(device)


        class_logits, segmentation_mask = model(images)
        class_loss = classification_loss_fn(class_logits, labels)
        dice_loss = 1 - dice_metric(torch.sigmoid(segmentation_mask), masks.long())
        total_loss = class_loss + dice_loss


        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        running_class_loss += class_loss.item()
        running_seg_loss += dice_loss.item()

    scheduler.step()

    print(f"Epoch [{epoch+1}/10], Classification Loss: {running_class_loss/len(train_dataloader):.4f}, Segmentation Loss (Dice): {running_seg_loss/len(train_dataloader):.4f}")

In [None]:
# function for model evaluation
def evaluate_model(model, dataloader, device):
    model.eval()
    all_predictions = []
    all_labels = []
    dice_scores = []

    with torch.no_grad():
        for images, masks, labels in tqdm(dataloader):
            images = images.to(device)
            labels = labels.to(device)
            masks = masks.to(device)

            class_logits, segmentation_mask = model(images)

            _, predicted_classes = torch.max(class_logits, 1)
            all_predictions.extend(predicted_classes.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            dice = dice_metric(torch.sigmoid(segmentation_mask), masks.long())
            dice_scores.append(dice.item())

    accuracy = accuracy_score(all_labels, all_predictions)
    print(f"Accuracy: {accuracy}")
    print(classification_report(all_labels, all_predictions))
    print(f"Mean Dice Score: {np.mean(dice_scores)}")


In [None]:
# The model achieved 95% accuracy in classifying and 0.977 Mean Dice Score in segmenting lung X-rays for four lung conditions.
evaluate_model(model, test_dataloader, device)

100%|██████████| 133/133 [00:46<00:00,  2.85it/s]


Accuracy: 0.9454287739192062
              precision    recall  f1-score   support

           0       0.92      0.99      0.95       701
           1       0.97      0.89      0.92      1175
           2       0.94      0.96      0.95      2085
           3       0.98      0.96      0.97       272

    accuracy                           0.95      4233
   macro avg       0.95      0.95      0.95      4233
weighted avg       0.95      0.95      0.95      4233

Mean Dice Score: 0.9774674181651352


In [None]:
# For last step, I added some visualization of my multi task model for the ease of comparison
# input images, model's segmnetation and masks are visualized side by side
# besides, I visualized model classification vs the actual class of the photo
for images, masks, labels in tqdm(test_dataloader):
    images, labels, masks = images.to(device), labels.to(device), masks.to(device)


    class_logits, segmentation_mask = model(images)


    plt.figure(1, figsize=(15, 5))
    plt.subplot(1, 3, 1)
    plt.imshow(images[0].cpu().squeeze(), cmap='gray')
    plt.title('Input Image')
    plt.subplot(1, 3, 2)
    seg = torch.round(torch.sigmoid(segmentation_mask[0].cpu().detach().squeeze()))
    plt.imshow(seg, cmap='gray')
    plt.title('Segmentation')
    plt.subplot(1, 3, 3)
    plt.imshow(masks[0].cpu().detach().squeeze(), cmap='gray')
    plt.title('Segmentation Mask')

    plt.suptitle(f'Input Image and Segmentation Mask; label: {labels[0].item()}, prediction: {class_logits[0].argmax().item()}')
    plt.tight_layout()
    plt.show()