##### In this dataset, we have 3 labels and they are not one-hot encoding:
    -  Label
    -  Label_1_Virus_category 
    -  Label_2_Virus_category

##### Hence, there are 2 possible approaches:
    -  Combine both 3 labels to 1 label => The Dataset will have 7 categories => The accuracy may be lower.
    -  Build each model, for each main and sub label => The accuracy may be higher.

In [2]:
import pandas as pd
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import os
from PIL import Image
import matplotlib.pyplot as plt

In [3]:
PROJECT_ROOT = Path().resolve().parent 
RAW_DATA_DIR = PROJECT_ROOT / "project4368" / "data"
CSV1_PATH = RAW_DATA_DIR / "Chest_xray_Corona_Metadata.csv"
CSV2_PATH = RAW_DATA_DIR / "Chest_xray_Corona_dataset_Summary.csv"
IMG_DIR_train = RAW_DATA_DIR / "Coronahack-Chest-XRay-Dataset" / "train"
IMG_DIR_test = RAW_DATA_DIR / "Coronahack-Chest-XRay-Dataset" / "test"

# Data overview

In [7]:
df = pd.read_csv(CSV1_PATH)
df.head()

Unnamed: 0.1,Unnamed: 0,X_ray_image_name,Label,Dataset_type,Label_2_Virus_category,Label_1_Virus_category
0,0,IM-0128-0001.jpeg,Normal,TRAIN,,
1,1,IM-0127-0001.jpeg,Normal,TRAIN,,
2,2,IM-0125-0001.jpeg,Normal,TRAIN,,
3,3,IM-0122-0001.jpeg,Normal,TRAIN,,
4,4,IM-0119-0001.jpeg,Normal,TRAIN,,


In [9]:
df_sum = pd.read_csv(CSV2_PATH)
df_sum

Unnamed: 0.1,Unnamed: 0,Label,Label_1_Virus_category,Label_2_Virus_category,Image_Count
0,0,Normal,,,1576
1,1,Pnemonia,Stress-Smoking,ARDS,2
2,2,Pnemonia,Virus,,1493
3,3,Pnemonia,Virus,COVID-19,58
4,4,Pnemonia,Virus,SARS,4
5,5,Pnemonia,bacteria,,2772
6,6,Pnemonia,bacteria,Streptococcus,5


In [11]:
print(df['Label'].value_counts())

Label
Pnemonia    4334
Normal      1576
Name: count, dtype: int64


In [13]:
print("Normal and label 1 virus:")
print(df[df['Label'] == "Normal"]['Label_1_Virus_category'].value_counts())
print("\nPnemonia and label 1 virus:")
print(df[df['Label'] == "Pnemonia"]['Label_1_Virus_category'].value_counts())

Normal and label 1 virus:
Series([], Name: count, dtype: int64)

Pnemonia and label 1 virus:
Label_1_Virus_category
bacteria          2777
Virus             1555
Stress-Smoking       2
Name: count, dtype: int64


In [15]:
def combine_generate_caption(row):
    label = row.get("Label")
    virus_1 = row.get("Label_1_Virus_category")
    virus_2 = row.get("Label_2_Virus_category")

    # Combine label
    if label == "Normal":
        return "Normal"

    elif label == "Pnemonia":
        if pd.isna(virus_2) or str(virus_2).strip() == "":
            return f"Pnemonia_{virus_1}"
        else:
            return f"Pnemonia_{virus_1}_{virus_2}"


In [17]:
df["caption"] = df.apply(combine_generate_caption, axis=1)
df

Unnamed: 0.1,Unnamed: 0,X_ray_image_name,Label,Dataset_type,Label_2_Virus_category,Label_1_Virus_category,caption
0,0,IM-0128-0001.jpeg,Normal,TRAIN,,,Normal
1,1,IM-0127-0001.jpeg,Normal,TRAIN,,,Normal
2,2,IM-0125-0001.jpeg,Normal,TRAIN,,,Normal
3,3,IM-0122-0001.jpeg,Normal,TRAIN,,,Normal
4,4,IM-0119-0001.jpeg,Normal,TRAIN,,,Normal
...,...,...,...,...,...,...,...
5905,5928,person1637_virus_2834.jpeg,Pnemonia,TEST,,Virus,Pnemonia_Virus
5906,5929,person1635_virus_2831.jpeg,Pnemonia,TEST,,Virus,Pnemonia_Virus
5907,5930,person1634_virus_2830.jpeg,Pnemonia,TEST,,Virus,Pnemonia_Virus
5908,5931,person1633_virus_2829.jpeg,Pnemonia,TEST,,Virus,Pnemonia_Virus


In [19]:
df['caption'].value_counts()

caption
Pnemonia_bacteria                  2772
Normal                             1576
Pnemonia_Virus                     1493
Pnemonia_Virus_COVID-19              58
Pnemonia_bacteria_Streptococcus       5
Pnemonia_Virus_SARS                   4
Pnemonia_Stress-Smoking_ARDS          2
Name: count, dtype: int64

In [21]:
mid_index = 5286
df_train = df.iloc[:mid_index]
df_test = df.iloc[mid_index:]

In [23]:
df_train.head()

Unnamed: 0.1,Unnamed: 0,X_ray_image_name,Label,Dataset_type,Label_2_Virus_category,Label_1_Virus_category,caption
0,0,IM-0128-0001.jpeg,Normal,TRAIN,,,Normal
1,1,IM-0127-0001.jpeg,Normal,TRAIN,,,Normal
2,2,IM-0125-0001.jpeg,Normal,TRAIN,,,Normal
3,3,IM-0122-0001.jpeg,Normal,TRAIN,,,Normal
4,4,IM-0119-0001.jpeg,Normal,TRAIN,,,Normal


In [25]:
df_test.head()

Unnamed: 0.1,Unnamed: 0,X_ray_image_name,Label,Dataset_type,Label_2_Virus_category,Label_1_Virus_category,caption
5286,5309,IM-0021-0001.jpeg,Normal,TEST,,,Normal
5287,5310,IM-0019-0001.jpeg,Normal,TEST,,,Normal
5288,5311,IM-0017-0001.jpeg,Normal,TEST,,,Normal
5289,5312,IM-0016-0001.jpeg,Normal,TEST,,,Normal
5290,5313,IM-0015-0001.jpeg,Normal,TEST,,,Normal


In [27]:
df_train['caption'].value_counts()

caption
Pnemonia_bacteria                  2530
Pnemonia_Virus                     1345
Normal                             1342
Pnemonia_Virus_COVID-19              58
Pnemonia_bacteria_Streptococcus       5
Pnemonia_Virus_SARS                   4
Pnemonia_Stress-Smoking_ARDS          2
Name: count, dtype: int64

In [29]:
class XRayDataset(Dataset):
    def __init__(self, dataframe, img_dir, transform=None, minority_transform=None):
        self.data = dataframe.reset_index(drop=True)  
        self.img_dir = img_dir
        self.transform = transform
        self.minority_transform = minority_transform

        # Encode labels to integers
        self.label_map = {label: idx for idx, label in enumerate(self.data['Label'].unique())}
        self.data['label_idx'] = self.data['Label'].map(self.label_map)

        # Identify minority class indices (you can customize this logic)
        class_counts = self.data['label_idx'].value_counts()
        self.minority_classes = class_counts[class_counts < 100].index.tolist()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = self.data.iloc[idx]['X_ray_image_name']
        img_path = os.path.join(self.img_dir, img_name)
        label = self.data.iloc[idx]['label_idx']

        try:
            image = Image.open(img_path).convert('L')  # Grayscale
        except:
            image = Image.new('L', (224, 224))

        # Apply different transforms based on class
        if label in self.minority_classes and self.minority_transform:
            image = self.minority_transform(image)
        elif self.transform:
            image = self.transform(image)

        return image, label


In [31]:
# Transforms all img to 1 size and normalize
standard_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# Repeat images - augmentation for minorities fields
minority_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(25),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

In [33]:
# Load datasets
train_dataset = XRayDataset(dataframe=df_train, img_dir=IMG_DIR_train, transform=standard_transform, minority_transform=minority_transform)
test_dataset = XRayDataset(dataframe=df_test, img_dir=IMG_DIR_test, transform=standard_transform)

In [35]:
# Compute sample weights for the training set
class_counts = df_train['Label'].value_counts()
class_weights = 1.0 / class_counts
sample_weights = df_train['Label'].map(class_weights).values

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

In [37]:
# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [51]:
class ModelCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(ModelCNN, self).__init__()
        
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.conv_block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128 * 28 * 28, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.conv_block1(x)  # [B, 32, 112, 112]
        x = self.conv_block2(x)  # [B, 64, 56, 56]
        x = self.conv_block3(x)  # [B, 128, 28, 28]
        x = x.view(x.size(0), -1)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x


In [53]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ModelCNN().to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 5  # You can increase this

print("Training...")
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

Training...
Epoch [1/5], Loss: 1.6421


In [None]:
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")