# Chest X-ray Classification and Localization using DenseNet

This notebook implements a transfer learning approach using DenseNet for chest X-ray classification and localization on the NIH Chest X-ray dataset.

In [140]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torch.optim as optim
from torchvision import models
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tqdm import tqdm

## Data Loading and Preprocessing

In [152]:
# Define paths
BASE_PATH = 'DL_for_HIN_Chest_X_Ray'
IMAGES_PATH = os.path.join(BASE_PATH, 'archive','images', 'images')
CSV_PATH = os.path.join(BASE_PATH, 'Data_Entry_2017_filtered_2.csv')
BBOX_PATH = os.path.join(BASE_PATH,'archive', 'BBox_List_2017.csv')

# Load the filtered dataset
df = pd.read_csv(CSV_PATH)
df.rename(columns={'Finding Labels': 'Finding Label'}, inplace=True)
df.drop(columns=['Follow-up #', 'Patient ID', 'Patient Age', 'Patient Gender', 'View Position', 'OriginalImage[Width','Height]', 'OriginalImagePixelSpacing[x', 'y]'], inplace=True)
no_finding_images = df[df['Finding Label'] == 'No Finding'].head(2016)


bbox_df = pd.read_csv(BBOX_PATH)
bbox_df.drop(columns=['Unnamed: 6', 'Unnamed: 7', 'Unnamed: 8'], inplace=True)
bbox_df.columns = ['Image Index','Finding Label', 'x', 'y', 'width', 'height']

combined_df = pd.concat([no_finding_images, bbox_df])

combined_df.shape
combined_df['Image Index'].value_counts()

combined_df[combined_df['Image Index'] == '00010277_000.png']

Unnamed: 0,Image Index,Finding Label,x,y,width,height
326,00010277_000.png,Effusion,863.004444,693.229045,72.817778,112.64
474,00010277_000.png,Infiltrate,633.173333,416.749045,271.928889,221.866667
590,00010277_000.png,Mass,297.528889,310.935712,540.444444,277.617778
752,00010277_000.png,Pneumonia,642.275556,423.575712,265.102222,223.004444


In [144]:
import os
from PIL import Image

# Define the new directory to save resized images
resized_images_dir = 'resized_images'
os.makedirs(resized_images_dir, exist_ok=True)  # Create the directory if it doesn't exist

# Strip whitespace from image filenames
combined_df['Image Index'] = combined_df['Image Index'].str.strip()

# Iterate over all images_[XXX] folders in the archive directory
archive_path = os.path.join('DL_for_HIN_Chest_X_Ray', 'archive')  # Adjust this path as needed

# Resize and save images
for index, row in combined_df.iterrows():
    image_name = row['Image Index']
    found = False  # Flag to check if the image was found

    # Walk through the archive directory
    for root, dirs, files in os.walk(archive_path):
        if image_name in files:
            image_path = os.path.join(root, image_name)  # Construct the full image path
            print(f"Found image: {image_path}")  # Debugging line
            found = True
            try:
                # Open the image
                with Image.open(image_path) as img:
                    # Resize the image
                    img_resized = img.resize((224, 224))
                    # Save the resized image to the new directory
                    img_resized.save(os.path.join(resized_images_dir, image_name))
                    print(f"Saved resized image: {image_name}")  # Debugging line
            except OSError as e:
                print(f"Error processing {image_name}: {e}")
            except Exception as e:
                print(f"Unexpected error with {image_name}: {e}")
            break  # Exit the loop once the image is found

    if not found:
        print(f"File does not exist in any subdirectory: {image_name}")
    print(index)

Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000002_000.png
Saved resized image: 00000002_000.png
1
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_000.png
Saved resized image: 00000005_000.png
3
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_001.png
Saved resized image: 00000005_001.png
4
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_002.png
Saved resized image: 00000005_002.png
5
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_003.png
Saved resized image: 00000005_003.png
6
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_004.png
Saved resized image: 00000005_004.png
7
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000005_005.png
Saved resized image: 00000005_005.png
8
Found image: DL_for_HIN_Chest_X_Ray/archive/images_001/images/00000006_000.png
Saved resized image: 00000006_000.png
10
Found image: DL_for_HIN_Chest_X_Ray/archive/ima

KeyboardInterrupt: 

In [153]:
combined_df.to_csv('bbox_resized_filtered_images.csv', index=False)

In [154]:
IMAGES_PATH_RESIZED = os.path.join('resized_images')
PREPROCESSED_IMAGES_PATH = os.path.join('bbox_resized_filtered_images.csv')
df_preprocessed = pd.read_csv(PREPROCESSED_IMAGES_PATH)

In [155]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class ChestXrayDataset(Dataset):
    def __init__(self, image_dir, df, transform=None):
        self.image_dir = image_dir
        self.df = df
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img_name = self.df.iloc[idx]['Image Index']
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        # Get labels (binary classification: normal vs abnormal)
        label = 1 if self.df.iloc[idx]['Finding Label'] != 'No Finding' else 0
        return image, torch.tensor(label, dtype=torch.float32)

Using device: cpu


In [139]:
def get_transforms():
    """Define image transformations."""
    return {
        'train': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        'val': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    }

# Split data and create data loaders
train_df, temp_df = train_test_split(df_preprocessed, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Get transforms
transforms_dict = get_transforms()

# Create datasets
train_dataset = ChestXrayDataset(IMAGES_PATH_RESIZED, train_df, transform=transforms_dict['train'])
val_dataset = ChestXrayDataset(IMAGES_PATH_RESIZED, val_df, transform=transforms_dict['val'])
test_dataset = ChestXrayDataset(IMAGES_PATH_RESIZED, test_df, transform=transforms_dict['val'])

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

Training samples: 2400
Validation samples: 300
Test samples: 300


In [124]:
from torchvision import models

def create_model():
    """Create a DenseNet model with transfer learning."""
    # Load pre-trained DenseNet
    model = models.densenet121(pretrained=True)
    
    # Freeze all layers
    for param in model.parameters():
        param.requires_grad = False
    
    # Replace the classifier
    num_features = model.classifier.in_features
    model.classifier = nn.Sequential(
        nn.Linear(num_features, 512),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(512, 1),
        nn.Sigmoid()
    )
    
    return model.to(device)

# Create model
model = create_model()
#print(model)

In [125]:
def train_model(model, train_loader, val_loader, num_epochs=10):
    """Train the model."""
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
    
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for images, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            images = images.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            predicted = (outputs.squeeze() > 0.5).float()
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        
        avg_train_loss = train_loss / len(train_loader)
        train_accuracy = 100 * train_correct / train_total
        train_losses.append(avg_train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for images, labels in val_loader:
                images = images.to(device)
                labels = labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs.squeeze(), labels)
                
                val_loss += loss.item()
                predicted = (outputs.squeeze() > 0.5).float()
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * val_correct / val_total
        val_losses.append(avg_val_loss)
        
        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Training Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%')
        print(f'Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')
        
        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_densenet_model.pth')
            print('Model saved!')
        
        print('-' * 60)
    
    return train_losses, val_losses

def plot_training_history(train_losses, val_losses):
    """Plot training history."""
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [126]:
# Train the model
train_losses, val_losses = train_model(model, train_loader, val_loader, num_epochs=10)

# Plot training history
plot_training_history(train_losses, val_losses)

Epoch 1/10:   4%|▍         | 3/75 [00:13<05:32,  4.61s/it]


KeyboardInterrupt: 