In [157]:
import math
import os
import random
import time

from tqdm import tqdm
import cv2; print(f"OpenCV version: {cv2.__version__}")
import matplotlib.pyplot as plt
import numpy as np; print(f"Numpy version: {np.__version__}")
import pandas as pd; print(f"Pandas version: {pd.__version__}")
import torch; print(f"PyTorch version: {torch.__version__}")
import torchvision; print(f"Torchvision version: {torchvision.__version__}")

OpenCV version: 4.11.0
Numpy version: 1.26.4
Pandas version: 2.2.2
PyTorch version: 2.5.1+cu124
Torchvision version: 0.20.1+cu124


# Data preparation

## Download CheXpert dataset

There are two options, one for one time use and another for recurrent use using Google Drive. Execute just the necessary cells.

Ctrl + M M to convert code cell to text

Ctrl + M Y to convert text cell to code

### Option 1: Download directly to Colab session storage
It takes around 5 minutes to download and unzip the dataset.


1. Retrieve a Kaggle API key formatted as as kaggle.json file.
2. Upload it into the content folder (the default one in Files) in the Colab session storage.
3. Execute the following code block.

!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json # Move API key and set permissions

!kaggle datasets download ashery/chexpert -p /content # Download zip dataset from kaggle
dataset_zip = "/content/chexpert.zip"

!mkdir -p /content/chexpert/CheXpert-v1.0-small
!unzip -q {dataset_zip} -d /content/chexpert/CheXpert-v1.0-small  # Unzip the dataset in appropiate directory

os.remove(dataset_zip) # Remove ZIP file to save space

CHEXPERT_PATH = "/content/chexpert" # Base path of the dataset

### Option 2: Access using Google Drive


In [158]:
from google.colab import drive
drive.mount('/content/drive')

CHEXPERT_PATH = '/content/drive/MyDrive/chexpert'
print("Content of CheXpert-v1.0-small inside selected directory:")
!ls {CHEXPERT_PATH}/CheXpert-v1.0-small

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Content of CheXpert-v1.0-small inside selected directory:
train  train.csv  valid  valid.csv


Inside the selected directory in Drive, you should have a folder called CheXpert-v1.0-small containing the Kaggle dataset.

#### Download the dataset directly to Google Drive

It is possible to download the dataset to Drive directly to the selected CHEXPERT_PATH directory in your drive.

1. Get the Kaggle API key
2. Upload the key into Colab files

from google.colab import files
files.upload()

3. Download the dataset in the Google Drive mount. Takes around 1h.

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!mkdir -p {CHEXPERT_PATH}
!kaggle datasets download ashery/chexpert -p {CHEXPERT_PATH}

!unzip {CHEXPERT_PATH}/chexpert.zip -d {CHEXPERT_PATH}/CheXpert-v1.0-small
!rm {CHEXPERT_PATH}/chexpert.zip

## Dataset exploration

Read the pandas dataframes and filter them only with frontal images.

In [159]:
train_path = CHEXPERT_PATH + "/CheXpert-v1.0-small/train.csv"
train_df = pd.read_csv(train_path)
train_df = train_df[train_df['Frontal/Lateral'] == 'Frontal']
train_df["Patient number"] = train_df["Path"].str.extract(r"patient(\d+)")

test_path = CHEXPERT_PATH + "/CheXpert-v1.0-small/valid.csv"
test_df = pd.read_csv(test_path)
test_df = test_df[test_df['Frontal/Lateral'] == 'Frontal']
test_df["Patient number"] = test_df["Path"].str.extract(r"patient(\d+)")

In [160]:
train_df.head()

Unnamed: 0,Path,Sex,Age,Frontal/Lateral,AP/PA,No Finding,Enlarged Cardiomediastinum,Cardiomegaly,Lung Opacity,Lung Lesion,Edema,Consolidation,Pneumonia,Atelectasis,Pneumothorax,Pleural Effusion,Pleural Other,Fracture,Support Devices,Patient number
0,CheXpert-v1.0-small/train/patient00001/study1/...,Female,68,Frontal,AP,1.0,,,,,,,,,0.0,,,,1.0,1
1,CheXpert-v1.0-small/train/patient00002/study2/...,Female,87,Frontal,AP,,,-1.0,1.0,,-1.0,-1.0,,-1.0,,-1.0,,1.0,,2
2,CheXpert-v1.0-small/train/patient00002/study1/...,Female,83,Frontal,AP,,,,1.0,,,-1.0,,,,,,1.0,,2
4,CheXpert-v1.0-small/train/patient00003/study1/...,Male,41,Frontal,AP,,,,,,1.0,,,,0.0,,,,,3
5,CheXpert-v1.0-small/train/patient00004/study1/...,Female,20,Frontal,PA,1.0,0.0,,,,,0.0,,,,0.0,,,,4


The radiographs are labeled for the presence of 14 observations as positive, negative, or uncertain.

In [161]:
LABELS = [
    'No Finding', 'Enlarged Cardiomediastinum', 'Cardiomegaly',
    'Lung Opacity', 'Lung Lesion', 'Edema', 'Consolidation',
    'Pneumonia', 'Atelectasis', 'Pneumothorax', 'Pleural Effusion',
    'Pleural Other', 'Fracture', 'Support Devices'
]
NR_LABELS = len(LABELS)

### Image visualization tool by labels

Use Ctrl + Click to select multiple labels in the output. Adjust the number of images you want to display

In [162]:
import ipywidgets as widgets
from IPython.display import display

# Create a dropdown selection for training or test data.
df_dropdown = widgets.Dropdown(
    options=["Training", "Test"],
    description='Select training or test data:',
    style={'description_width': 'initial'},
)

# Create a multiple-selection widget for labels.
label_selector = widgets.SelectMultiple(
    options=LABELS,
    description='Select Labels:',
    style={'description_width': 'initial'}
)

def display_images(selected_data, selected_labels, num_images=9):
  """
  Filters the DataFrame to images that have all of the selected labels marked as positive (1)
  and displays a sample of images.

  Parameters:
      selected_labels (tuple): A tuple of labels selected from the widget.
      num_images (int): Number of images to display (default is 9).
  """

  # Choose the dataset depending on the selection
  if selected_data.lower() == "training":
    df = train_df
  elif selected_data.lower() == "test":
    df = test_df
  else:
    print("No data selected.")
    return

  # If no labels are selected, prompt the user to select at least one.
  if not selected_labels:
    print("No labels selected. Please select one or more labels.")
    return

  # Filter the DataFrame: For the selected labels, check that all columns have a value of 1.
  df_selected = df[df[list(selected_labels)].eq(1).all(axis=1)]

  # If there are no matching images, display a message.
  if df_selected.empty:
    print(f"No images found with the labels: {', '.join(selected_labels)}")
    return

  # Randomly sample images (up to num_images) from the filtered DataFrame.
  sample_count = min(num_images, len(df_selected))
  sampled_df = df_selected.sample(n=sample_count, random_state=42)

  # Determine grid size dynamically (using the ceiling of the square root)
  grid_size = math.ceil(math.sqrt(sample_count))
  fig, axes = plt.subplots(grid_size, grid_size, figsize=(10, 10))

  axes = np.atleast_1d(axes).flatten()

  for ax, (_, row) in zip(axes, sampled_df.iterrows()):
    try:
      img = cv2.imread(os.path.join(CHEXPERT_PATH , row['Path']), cv2.IMREAD_GRAYSCALE)
      ax.imshow(img, cmap='gray')
      ax.set_title(f"Patient number: {row['Patient number']}" , fontsize=8)
      ax.axis('off')
    except Exception as e:
      ax.text(0.5, 0.5, f"Error loading image:\n{e}", ha='center', va='center')
      ax.axis('off')

  # Hide any unused subplots
  for ax in axes[len(sampled_df):]:
      ax.axis('off')

  fig.suptitle(f'Selected labels: {selected_labels}', fontsize=14)

  plt.tight_layout()
  plt.show()

# Create an interactive widget linking the multiple-selection widget and number slider to the function.
widgets.interact(display_images,
                 selected_data = df_dropdown,
                 selected_labels = label_selector,
                 num_images=(1, 16, 1))

interactive(children=(Dropdown(description='Select training or test data:', options=('Training', 'Test'), styl…

## Pytorch datasets

In [163]:
class CheXpertDataset(torch.utils.data.Dataset):

  def __init__(self, csv_file, root_dir, label_names):

    all_data = pd.read_csv(csv_file)
    self.data = all_data[all_data['Frontal/Lateral'] == 'Frontal'] # Only frontal images

    self.label_names = label_names

    # Uncertainty approaches
    # 1) 2-class (1: only positives, 0: rest)
    self.data.loc[:, self.label_names] = self.data.loc[:, self.label_names].replace({-1: 0, np.nan: 0})

    self.root_dir = root_dir

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    row = self.data.iloc[idx]
    img_path = os.path.join(self.root_dir, row['Path'])

    # Use OpenCV to read the image
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
      raise FileNotFoundError(f"Image not found: {img_path}")

    img = cv2.resize(img, (224, 224)) # Resize to 224x224
    img = img.astype('float32') / 255.0 # Normalize pixel values
    img = torch.tensor(img).unsqueeze(0) # Shape: (1, 224, 224)

    labels = torch.tensor(row[self.label_names].values.astype('float32'))

    return img, labels

Q: How the train validation split should be done?
- Randomly? Which size?
- Should the sets of training and validation patients be not overlapping?

In [164]:
all_train_dataset = CheXpertDataset(csv_file=train_path, root_dir=CHEXPERT_PATH, label_names=LABELS)

train_length = int(0.95 * len(all_train_dataset))
valid_length = len(all_train_dataset) - train_length

train_dataset, valid_dataset=torch.utils.data.random_split(all_train_dataset,(train_length, valid_length))

test_dataset = CheXpertDataset(csv_file=test_path, root_dir=CHEXPERT_PATH, label_names=LABELS)

print(f"Number training datapoints: {len(train_dataset)}")
print(f"Number validation datapoints: {len(valid_dataset)}")
print(f"Number test datapoints: {len(test_dataset)}")

Number training datapoints: 181475
Number validation datapoints: 9552
Number test datapoints: 202


# Model training

## Models

In [165]:
class DenseNet121(torch.nn.Module):
  def __init__(self, out_size):
    super(DenseNet121, self).__init__()
    self.densenet121 = torchvision.models.densenet121(weights=None)
    self.densenet121.features.conv0 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    self.densenet121.classifier = torch.nn.Linear(self.densenet121.classifier.in_features, out_size)
  def forward(self, x):
    x = self.densenet121(x)
    return x

In [166]:
d121 = DenseNet121(out_size=14)
print(d121.densenet121.features.conv0)

Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)


## Train and test functions

In [167]:
def train(model, dataloader_train, dataloader_val, max_nr_epochs):
  optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)

  loss = torch.nn.BCEWithLogitsLoss()

  min_loss = 1e6
  train_start = []
  train_end = []

  for epoch in range(0, max_nr_epochs):
    train_start.append(time.time()) # training starts
    loss_train = train_epoch(model, dataloader_train, optimizer, max_nr_epochs, criterion=loss)
    train_end.append(time.time()) # training ends
    loss_val = validate_epoch(model, dataloader_val, optimizer, max_nr_epochs, criterion=loss)
    print("Training loss: {:.3f},".format(loss_train), "Valid loss: {:.3f}".format(loss_val))

    if loss_val < min_loss:
      min_loss = loss_val
      torch.save({'epoch': epoch + 1, 'state_dict': model.state_dict(),
                  'best_loss': min_loss, 'optimizer' : optimizer.state_dict()},
                  'm-epoch_FL' + str(epoch + 1) + '.pth.tar')
      print('Epoch ' + str(epoch + 1) + ' [save] loss = ' + str(loss_val))
    else:
      print('Epoch ' + str(epoch + 1) + ' [----] loss = ' + str(loss_val))

    train_time = np.array(train_end) - np.array(train_start)
    print("Training time for each epoch: {} seconds".format(train_time.round(0)))
    params = model.state_dict()
    return params

def train_epoch(model, dataloader_train, optimizer, max_nr_epochs, criterion):
  loss = 0
  model.train()

  for batch, (img, target) in enumerate(tqdm(dataloader_train)):

    #target = target.cuda(non_blocking = True)

    lossvalue = criterion(model(img), target)

    optimizer.zero_grad()
    lossvalue.backward()
    optimizer.step()

    loss += lossvalue.item()

  return loss / len(dataloader_train)

def validate_epoch(model, dataloader_val, optimizer, max_nr_epochs, criterion):
  model.eval()
  loss = 0

  with torch.no_grad():
    for i, (img, target) in enumerate(dataloader_val):

      target = target.cuda()
      varOutput = model(img)

      loss += criterion(varOutput, target)

  return loss / len(dataloader_val)


def test(model, dataLoaderTest, nnClassCount, checkpoint, class_names):
  #torch.backends.cudnn.benchmark = True

  if checkpoint != None and torch.cuda.is_available():
      modelCheckpoint = torch.load(checkpoint)
      model.load_state_dict(modelCheckpoint['state_dict'])

  if torch.cuda.is_available():
      outGT = torch.FloatTensor().cuda()
      outPRED = torch.FloatTensor().cuda()
  else:
      outGT = torch.FloatTensor()
      outPRED = torch.FloatTensor()

  model.eval()

  with torch.no_grad():
    for i, (input, target) in enumerate(dataLoaderTest):

      target = target.cuda()
      outGT = torch.cat((outGT, target), 0).cuda()

      bs, c, h, w = input.size()
      varInput = input.view(-1, c, h, w)

      out = model(varInput)
      outPRED = torch.cat((outPRED, out), 0)

  return outGT, outPRED

## Training experiments

In [168]:
BATCH_SIZE = 16
MAX_NR_EPOCHS = 4

train_dataloader = torch.utils.data.DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
valid_dataloader = torch.utils.data.DataLoader(dataset = valid_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = torch.utils.data.DataLoader(dataset = test_dataset)

model = DenseNet121(NR_LABELS)

if torch.cuda.is_available():
  train(model.cuda(), train_dataloader, valid_dataloader, MAX_NR_EPOCHS)
else:
  train(model, train_dataloader, valid_dataloader, MAX_NR_EPOCHS)

  0%|          | 12/11343 [04:02<63:37:58, 20.22s/it]


KeyboardInterrupt: 