<a href="https://colab.research.google.com/github/El-amin/FairCXRnet-A-Multi-Task-Learning-Model-for-Chest-X-Ray-Classification-for-Low-Resource-Settings-/blob/main/Nigeria_VS_Others_MTL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import zipfile
import tensorflow as tf
import numpy as np
from tensorflow.keras.applications import DenseNet201

In [None]:
# Example for loading data using Kaggle API
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

# Use libraries like pandas, NumPy, or TensorFlow Datasets to load the images and labels


Dataset URL: https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia
License(s): other
Downloading chest-xray-pneumonia.zip to /content
100% 2.29G/2.29G [01:56<00:00, 22.3MB/s]
100% 2.29G/2.29G [01:56<00:00, 21.2MB/s]


In [None]:
!kaggle datasets download -d aminumusa/nigeria-chest-x-ray-dataset

Dataset URL: https://www.kaggle.com/datasets/aminumusa/nigeria-chest-x-ray-dataset
License(s): CC-BY-NC-SA-4.0
Downloading nigeria-chest-x-ray-dataset.zip to /content
 99% 239M/241M [00:16<00:00, 15.7MB/s]
100% 241M/241M [00:16<00:00, 15.5MB/s]


In [None]:
!mkdir 'Nigeria_dataset'

In [None]:
!mkdir 'China_dataset'

In [None]:
local_zip = '/content/chest-xray-pneumonia.zip'
zip_ref   = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('content/China_dataset')
zip_ref.close()

In [None]:
local_zip = '/content/nigeria-chest-x-ray-dataset.zip'
zip_ref   = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('content/Nigeria_dataset')
zip_ref.close()

In [None]:
data1_dir='/content/content/China_dataset/chest_xray/train'
data1_val_dir='/content/content/China_dataset/chest_xray/val'
data2_dir='/content/content/Nigeria_dataset/my_dataset/train_folder'
data2_val_dir='/content/content/Nigeria_dataset/my_dataset/test_folder'

**Pytorch** **Section**

In [None]:
#loading 2
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader, ConcatDataset
from PIL import Image, UnidentifiedImageError
import os

# Define transforms for data preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Define a function to check if a file is a valid image
def is_image_file(file_path):
    try:
        Image.open(file_path).verify()  # Verify if it's a valid image
        return True
    except (UnidentifiedImageError, IOError):
        return False

# Define custom dataset for Nigerian Chest X-ray Dataset (4 classes)
class NigerianChestXRayDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['PNEUMONIA', 'COVID', 'TB', 'NORMAL']
        self.image_paths = []
        self.labels = []

        for idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)

                # Filter out non-image files
                if is_image_file(img_path):
                    self.image_paths.append(img_path)
                    self.labels.append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')  # Convert to RGB
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Define custom dataset for Kaggle Pneumonia Dataset (2 classes)
class PneumoniaDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['PNEUMONIA', 'NORMAL']
        self.image_paths = []
        self.labels = []

        for idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)

                # Filter out non-image files
                if is_image_file(img_path):
                    self.image_paths.append(img_path)
                    self.labels.append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')  # Convert to RGB
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Load datasets
nigerian_dataset = NigerianChestXRayDataset('/content/content/Nigeria_dataset/my_dataset/train_folder', transform=transform)
pneumonia_dataset = PneumoniaDataset('/content/content/China_dataset/chest_xray/train', transform=transform)

# Combine datasets into one
combined_dataset = ConcatDataset([nigerian_dataset, pneumonia_dataset])

# Split into training and validation datasets
val_size = int(0.2 * len(combined_dataset))
train_dataset, val_dataset = torch.utils.data.random_split(combined_dataset, [len(combined_dataset) - val_size, val_size])

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)



In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os

# Define transforms for data preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Define custom dataset for binary classification (Pneumonia vs Normal)
class NigerianChestXRayDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['PNEUMONIA', 'NORMAL']  # Binary classification
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.classes)}
        self.image_paths = []
        self.labels = []

        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)
                self.image_paths.append(img_path)
                self.labels.append(self.class_to_idx[class_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Initialize dataset
nigerian_binary_dataset = NigerianChestXRayDataset('/content/content/Nigeria_dataset/my_dataset/train_folder', transform=transform)

# Define DataLoader
train_loader = DataLoader(nigerian_binary_dataset, batch_size=16, shuffle=True)

# Print dataset information
print(f"Number of samples in the dataset: {len(nigerian_binary_dataset)}")



Number of samples in the dataset: 1000


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class MultiTaskDenseNet201(nn.Module):
    def __init__(self):
        super(MultiTaskDenseNet201, self).__init__()
        self.base_model = models.densenet201(pretrained=True)
        self.base_model.classifier = nn.Identity()  # Remove the final classifier

        self.shared_fc = nn.Sequential(
            nn.Linear(1920, 1024),
            nn.ReLU()
        )

        # Task-specific output heads
        self.task1_head = nn.Linear(1024, 2)  # For Nigerian dataset (2 classes)
        self.task2_head = nn.Linear(1024, 2)  # For Kaggle Pneumonia dataset (2 classes)

    def forward(self, x):
        features = self.base_model(x)
        shared_features = self.shared_fc(features)
        task1_output = self.task1_head(shared_features)
        task2_output = self.task2_head(shared_features)
        return task1_output, task2_output

model = MultiTaskDenseNet201()


Downloading: "https://download.pytorch.org/models/densenet201-c1103571.pth" to /root/.cache/torch/hub/checkpoints/densenet201-c1103571.pth
100%|██████████| 77.4M/77.4M [00:00<00:00, 197MB/s]


In [None]:
def train_model(model, train_loader, val_loader, num_epochs=10):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion_task1 = torch.nn.CrossEntropyLoss()  # Loss for Nigerian dataset
    criterion_task2 = torch.nn.CrossEntropyLoss()  # Loss for Kaggle Pneumonia dataset

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            # Forward pass
            outputs_task1, outputs_task2 = model(inputs)


            # Assuming the labels are the same for both tasks, split them for each task
            labels_task1 = labels
            labels_task2 = labels

            # Ensure the outputs and labels match in size
            assert outputs_task1.size(0) == labels_task1.size(0), "Mismatch in batch size for task 1"
            assert outputs_task2.size(0) == labels_task2.size(0), "Mismatch in batch size for task 2"

            optimizer.zero_grad()

            # Compute losses
            loss_task1 = criterion_task1(outputs_task1, labels_task1)
            loss_task2 = criterion_task2(outputs_task2, labels_task2)

            # Total loss
            loss = loss_task1 + loss_task2
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

train_model(model, train_loader, val_loader, num_epochs=10)


Epoch 1/10, Loss: 0.2354
Epoch 2/10, Loss: 0.1020
Epoch 3/10, Loss: 0.0862
Epoch 4/10, Loss: 0.1764
Epoch 5/10, Loss: 0.1480
Epoch 6/10, Loss: 0.0855
Epoch 7/10, Loss: 0.0155
Epoch 8/10, Loss: 0.0451
Epoch 9/10, Loss: 0.0456
Epoch 10/10, Loss: 0.0490


In [None]:
#saving model
torch.save(model.state_dict(), 'multi_task_model.pth')

In [None]:
import torch
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score

def validate_model(model, val_loader):
    model.eval()  # Set the model to evaluation mode
    criterion_task1 = torch.nn.CrossEntropyLoss()  # Loss for Nigerian dataset
    criterion_task2 = torch.nn.CrossEntropyLoss()  # Loss for Kaggle Pneumonia dataset

    val_loss_task1 = 0.0
    val_loss_task2 = 0.0
    correct_task1 = 0
    correct_task2 = 0
    total_task1 = 0
    total_task2 = 0

    all_labels_task1 = []
    all_preds_task1 = []
    all_labels_task2 = []
    all_preds_task2 = []

    with torch.no_grad():  # Disable gradient calculation
        for inputs, labels in val_loader:
            # Forward pass
            outputs_task1, outputs_task2 = model(inputs)

            # Make sure labels are binary (0 or 1)
            labels = labels.clamp(0, 1)

            # Compute losses
            loss_task1 = criterion_task1(outputs_task1, labels)
            loss_task2 = criterion_task2(outputs_task2, labels)

            val_loss_task1 += loss_task1.item() * inputs.size(0)
            val_loss_task2 += loss_task2.item() * inputs.size(0)

            # Compute predictions
            _, predicted_task1 = torch.max(outputs_task1, 1)
            _, predicted_task2 = torch.max(outputs_task2, 1)

            total_task1 += labels.size(0)
            correct_task1 += (predicted_task1 == labels).sum().item()

            total_task2 += labels.size(0)
            correct_task2 += (predicted_task2 == labels).sum().item()

            # Collect all predictions and labels
            all_labels_task1.extend(labels.cpu().numpy())
            all_preds_task1.extend(predicted_task1.cpu().numpy())
            all_labels_task2.extend(labels.cpu().numpy())
            all_preds_task2.extend(predicted_task2.cpu().numpy())

    # Calculate metrics
    val_loss_task1 /= len(val_loader.dataset)
    val_loss_task2 /= len(val_loader.dataset)
    accuracy_task1 = 100 * correct_task1 / total_task1
    accuracy_task2 = 100 * correct_task2 / total_task2

    # Binary classification metrics
    precision_task1 = precision_score(all_labels_task1, all_preds_task1, average='weighted')
    recall_task1 = recall_score(all_labels_task1, all_preds_task1, average='weighted')
    f1_task1 = f1_score(all_labels_task1, all_preds_task1, average='weighted')

    precision_task2 = precision_score(all_labels_task2, all_preds_task2, average='weighted')
    recall_task2 = recall_score(all_labels_task2, all_preds_task2, average='weighted')
    f1_task2 = f1_score(all_labels_task2, all_preds_task2, average='weighted')

    auc_task1 = roc_auc_score(all_labels_task1, all_preds_task1)
    auc_task2 = roc_auc_score(all_labels_task2, all_preds_task2)

    print(f"Validation Loss Task 1: {val_loss_task1:.4f}, Accuracy: {accuracy_task1:.2f}%")
    print(f"Validation Loss Task 2: {val_loss_task2:.4f}, Accuracy: {accuracy_task2:.2f}%")
    print(f"Task 1 - Precision: {precision_task1:.4f}, Recall: {recall_task1:.4f}, F1 Score: {f1_task1:.4f}, AUC: {auc_task1:.4f}")
    print(f"Task 2 - Precision: {precision_task2:.4f}, Recall: {recall_task2:.4f}, F1 Score: {f1_task2:.4f}, AUC: {auc_task2:.4f}")


In [None]:
validate_model(model, val_loader)

Validation Loss Task 1: 1.9663, Accuracy: 77.96%
Validation Loss Task 2: 2.0551, Accuracy: 77.96%
Task 1 - Precision: 0.7988, Recall: 0.7796, F1 Score: 0.7647, AUC: 0.7352
Task 2 - Precision: 0.7982, Recall: 0.7796, F1 Score: 0.7650, AUC: 0.7355


In [None]:
#model 2
import torch.nn as nn
import torchvision.models as models

class MultiTaskDenseNet201(nn.Module):
    def __init__(self):
        super(MultiTaskDenseNet201, self).__init__()
        self.base_model = models.densenet201(pretrained=True)
        self.base_model.classifier = nn.Identity()  # Remove the final classifier

        self.shared_fc = nn.Sequential(
            nn.Linear(1920, 1024),
            nn.ReLU()
        )

        # Task-specific output heads
        self.task1_head = nn.Linear(1024, 4)  # For Nigerian dataset (4 classes)
        self.task2_head = nn.Linear(1024, 2)  # For Kaggle Pneumonia dataset (2 classes)

    def forward(self, x):
        features = self.base_model(x)
        shared_features = self.shared_fc(features)
        task1_output = self.task1_head(shared_features)
        task2_output = self.task2_head(shared_features)
        return task1_output, task2_output

model = MultiTaskDenseNet201()




In [None]:
import torch

def train_model(model, train_loader, val_loader, num_epochs=10):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion_task1 = torch.nn.CrossEntropyLoss()  # Loss for task 1 (4 classes)
    criterion_task2 = torch.nn.CrossEntropyLoss()  # Loss for task 2 (2 classes)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for batch in train_loader:
            # Unpack batch
            inputs, labels = batch

            optimizer.zero_grad()

            # Forward pass
            outputs_task1, outputs_task2 = model(inputs)

            # Print shapes for debugging
            print(f"Outputs task 1 shape: {outputs_task1.shape}")
            print(f"Labels shape: {labels.shape}")  # Assuming labels are combined

            # Assuming labels are split such that:
            # First half is for task1, second half is for task2
            # Adjust this if the actual label structure is different
            num_labels_task1 = outputs_task1.size(1)
            num_labels_task2 = outputs_task2.size(1)

            # Ensure labels are split correctly based on number of classes
            labels_task1 = labels % num_labels_task1  # Dummy operation if labels are categorical
            labels_task2 = labels // num_labels_task1  # Dummy operation if labels are categorical

            # Compute losses
            loss_task1 = criterion_task1(outputs_task1, labels_task1)
            loss_task2 = criterion_task2(outputs_task2, labels_task2)

            # Total loss
            loss = loss_task1 + loss_task2
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch

                outputs_task1, outputs_task2 = model(inputs)

                # Ensure labels are split correctly based on number of classes
                labels_task1 = labels % num_labels_task1  # Dummy operation if labels are categorical
                labels_task2 = labels // num_labels_task1  # Dummy operation if labels are categorical

                loss_task1 = criterion_task1(outputs_task1, labels_task1)
                loss_task2 = criterion_task2(outputs_task2, labels_task2)

                loss = loss_task1 + loss_task2
                val_loss += loss.item() * inputs.size(0)

        val_loss = val_loss / len(val_loader.dataset)
        print(f"Validation Loss: {val_loss:.4f}")





In [None]:
train_model(model, train_loader, val_loader, num_epochs=10)