In [2]:
import os
import numpy as np
import pandas as pd
from glob import glob
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from itertools import chain

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms, models
from torchvision.io import read_image
from torchvision.datasets.folder import default_loader
from tqdm import tqdm

ModuleNotFoundError: No module named 'torchvision'

In [1]:
cd "D:\Depaul\DATA_SCIENCE\prog_ml_apps\DATASET\archive (5)"

D:\Depaul\DATA_SCIENCE\prog_ml_apps\DATASET\archive (5)


In [None]:
# Load data
data = pd.read_csv('Data_Entry_2017.csv')
my_glob = glob('images*/images/*.png')
all_image_paths = {os.path.basename(x): x for x in my_glob}


In [None]:
print('Scans found:', len(all_image_paths), ', Total Headers', data.shape[0])
data['path'] = data['Image Index'].map(all_image_paths.get)
data['Patient Age'] = data['Patient Age'].map(lambda x: int(x[:-1]) if isinstance(x, str) else x)
data.sample(3)

In [None]:
# Process labels
data['Finding Labels'] = data['Finding Labels'].map(lambda x: x.replace('No Finding', '') if pd.notnull(x) else '')
all_labels = np.unique(list(chain(*data['Finding Labels'].map(lambda x: x.split('|')).tolist())))
all_labels = [x for x in all_labels if len(x) > 0]
print('All Labels ({}): {}'.format(len(all_labels), all_labels))
for c_label in all_labels:
    if len(c_label) > 1:
        data[c_label] = data['Finding Labels'].map(lambda finding: 1.0 if c_label in finding else 0)
data.sample(3)

In [None]:
# Filter labels
MIN_CASES = 1000
all_labels = ['Effusion', 'Infiltration', 'Mass', 'Nodule', 'Atelectasis', 'Pneumothorax']
print('Clean Labels ({})'.format(len(all_labels)), [(c_label, int(data[c_label].sum())

In [None]:
# Dropping unnecessary columns
columns_to_drop = ['Follow-up #', 'Patient ID', 'Patient Age', 'Patient Gender', 'View Position', 
                   'OriginalImage[Width', 'Height]', 'OriginalImagePixelSpacing[x', 'y]', 'Unnamed: 11']
data = data.drop(columns=[col for col in columns_to_drop if col in data])

In [None]:
# Create disease vector
data['disease_vec'] = data.apply(lambda x: [x[all_labels].values], 1).map(lambda x: x[0])

counts = data['Finding Labels'].value_counts()
mask = data['Finding Labels'].isin(counts[counts >= 251].index)
data = data[mask]

In [None]:
# Filter patients with exactly one disease
data['Total Diseases'] = data[all_labels].sum(axis=1)
data_one_disease = data[data['Total Diseases'] == 1]

In [None]:
# Sampling 5 images per class
def sample_images_per_class(data, class_labels, num_samples=5):
    sampled_data = pd.DataFrame()
    for label in class_labels:
        sampled_data = sampled_data.append(data[data[label] == 1].sample(num_samples, random_state=123))
    return sampled_data

sampled_data = sample_images_per_class(data_one_disease, all_labels, 5)


In [None]:
train_df, valid_df = train_test_split(sampled_data, test_size=0.2, random_state=123)
print('train', train_df.shape[0], 'validation', valid_df.shape[0])

In [None]:
# Image transformations
IMG_SIZE = (128, 128)
train_transforms = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

valid_transforms = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


In [None]:
# Custom Dataset
class ChestXrayDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]['path']
        image = default_loader(img_path)
        label = self.dataframe.iloc[idx][all_labels].values.astype(np.float32)
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [None]:
train_dataset = ChestXrayDataset(train_df, transform=train_transforms)
valid_dataset = ChestXrayDataset(valid_df, transform=valid_transforms)

In [None]:
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
valid_loader = DataLoader(valid_dataset, batch_size=256, shuffle=False, num_workers=4)

In [None]:
# Define model
class CustomDenseNet121(nn.Module):
    def __init__(self, num_classes):
        super(CustomDenseNet121, self).__init__()
        self.base_model = models.densenet121(pretrained=True)
        self.base_model.classifier = nn.Identity()  # Remove the original classifier
        self.dense_block = nn.Sequential(
            nn.BatchNorm2d(1024),
            nn.ReLU(inplace=True),
            nn.Conv2d(1024, 32, kernel_size=3, padding='same'),
            nn.BatchNorm2d(1056),
            nn.ReLU(inplace=True),
            nn.Conv2d(1056, 32, kernel_size=3, padding='same'),
        )
        self.transition_block = nn.Sequential(
            nn.BatchNorm2d(1088),
            nn.ReLU(inplace=True),
            nn.Conv2d(1088, 128, kernel_size=1, padding='same'),
            nn.AvgPool2d(kernel_size=2, stride=2, padding=0)
        )
        self.classifier = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.base_model.features(x)
        x = self.dense_block(x)
        x = self.transition_block(x)
        x = self.classifier(x)
        return x

In [None]:
# Define the model, loss, and optimizer
model = CustomDenseNet121(num_classes=len(all_labels)).to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
# Training loop
num_epochs = 50
best_loss = float('inf')
early_stopping_patience = 3
early_stopping_counter = 0

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for images, labels in tqdm(train_loader):
        images, labels = images.to('cuda'), labels.to('cuda')

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * images.size(0)
    
    train_loss /= len(train_loader.dataset)

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in valid_loader:
            images, labels = images.to('cuda'), labels.to('cuda')
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
    
    val_loss /= len(valid_loader.dataset)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

    if val_loss < best_loss:
        best_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
    
    if early_stopping_counter >= early_stopping_patience:
        print("Early stopping")
        break

# Load the best model
model.load_state_dict(torch.load('best_model.pth'))

In [None]:
# Evaluation
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for images, labels in valid_loader:
        images, labels = images.to('cuda'), labels.to('cuda')
        outputs = model(images)
        preds = torch.argmax(outputs, dim=1)
        all_preds.append(preds.cpu().numpy())
        all_labels.append(labels.cpu().numpy())

all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)

In [None]:
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

print('Precision:', precision)
print('Recall:', recall)
print('F1 Score:', f1)

In [None]:
# Plotting ROC Curves
fig, c_ax = plt.subplots(1, 1, figsize=(6, 6))
for (idx, c_label) in enumerate(all_labels):
    fpr, tpr, _ = roc_curve(all_labels[:, idx].astype(int), all_preds[:, idx])
    c_ax.plot(fpr, tpr, label='%s (AUC:%0.2f)' % (c_label, auc(fpr, tpr)))
c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')
plt.show()