# Dot Dataset

In this notebook we add the dot artifact to cheXpert dataset. 

First you should download the dataset from <a href="https://www.kaggle.com/datasets/willarevalo/chexpert-v10-small">here</a>.

In [2]:
import os
import sys
project_root= '../'
sys.path.append(project_root)

# Load Required Libraries

In [8]:
import pandas as pd
import random
import os
import shutil
import csv
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.pyplot as plt

In here we filter out the images and exclude PA images. 

In [9]:
train_df = pd.read_csv('/usr/local/faststorage/datasets/chexpert/train.csv')

train_df = train_df[train_df['Frontal/Lateral']=='Frontal' ]
train_df = train_df.reset_index(drop=True)

In the next part, we further filter out images with No Finding and Pleural Effusion while making sure that the Support Devices are not visible in the resulted dataset.

In [13]:
import pandas as pd

info_path = '../datasets/dot_90/info.csv'

# Create parent directories for the file if they do not exist
os.makedirs(os.path.dirname(info_path), exist_ok=True)
# Load the original CSV file
chexpert_data = train_df

# Define a function to determine the health status with the updated criteria
def determine_health_status_final(row):
    if row['No Finding'] == 1.0 and row['Frontal/Lateral'] == 'Frontal' and row['Support Devices'] == 0.0:
        return 1  # Healthy
    elif row['Pleural Effusion'] == 1.0 and row['Frontal/Lateral'] == 'Frontal' and row['Support Devices'] == 0.0:
        return -1  # Unhealthy
    else:
        return None  # None for other cases


chexpert_data['Healthy/Unhealthy'] = chexpert_data.apply(determine_health_status_final, axis=1)

# Drop rows where 'Healthy/Unhealthy' is None
filtered_data_final = chexpert_data.dropna(subset=['Healthy/Unhealthy'])

# Selecting the specified columns
filtered_data_final = filtered_data_final[['Path', 'Sex', 'Age', 'Frontal/Lateral', 'AP/PA', 'Support Devices','Healthy/Unhealthy']]

# Save the new DataFrame to a CSV file
output_file_path_final = info_path # Replace with your desired output file path
filtered_data_final.to_csv(output_file_path_final, index=False)

# Print completion message
print("CSV file has been created at:", output_file_path_final)

CSV file has been created at: ../datasets/dot_90/info.csv


# Grouping

Here we define our groupings:

- group 0: This group denotes the subjects with Pleural Effusion and the artifact. (Contains 90% of the unhealthy subjects)
- group 1: This group denotes the subjects with No Finding and the artifact. (Contains 10% of the healthy subjects)
- group 2: This group denotes the subjects with Pleural Effusion and No artifact. (Contains 10% of the unhealthy subjects)
--group 3: This group denotes the subjects with No Finding and No artifact. (Contains 90% of the healthy subjects)

In [14]:
import pandas as pd
import numpy as np
# Load your CSV file
file_path = info_path  # Replace with the path to your filtered CSV file
data = pd.read_csv(file_path)

# Initialize a new 'group' column
data['group'] = np.nan

# Set a random seed for reproducibility
np.random.seed(0)

# Assign groups based on health status and random chance
for index, row in data.iterrows():
    if row['Healthy/Unhealthy'] == -1:  # Unhealthy
        if np.random.rand() < 0.9:
            data.at[index, 'group'] = 0
        else:
            data.at[index, 'group'] = 2 
    elif row['Healthy/Unhealthy'] == 1:  # Healthy
        if np.random.rand() < 0.9:
            data.at[index, 'group'] = 3
        else:
            data.at[index, 'group'] = 1 

# Save the updated DataFrame to a new CSV file
updated_csv_path = info_path  # Replace with your desired output file path
data.to_csv(updated_csv_path, index=False)


Here we modify the path column. There is not any specific reason for this just a personal choice of the author.

In [15]:
# Function to modify the 'Path' values
def modify_path(path):
    parts = path.split('/')
    new_path = '_'.join(parts[1:])  # Join parts excluding the first element (usually the dataset name)
    return new_path

# Apply the function to the 'Path' column
data['Path'] = data['Path'].apply(modify_path)

# Save the updated DataFrame to a new CSV file
updated_csv_path = info_path  # Replace with your desired output file path
data.to_csv(updated_csv_path, index=False)

print("Updated CSV file saved to:", updated_csv_path)

Updated CSV file saved to: ../datasets/dot_90/info.csv


# Adding Artifact
Here we add the artifact to the designated images and save the new dataset (images) for reproducablity. 

Note: If you don't have enough space to do so you can always save the info.csv file and add the circle on-the-fly in your dataloader.

In [16]:
from PIL import Image, ImageDraw
import os
import shutil

def add_black_circle_to_image(input_image_path, circle_radius):
    with Image.open(input_image_path) as img:
        if img.mode != 'L':
            img = img.convert('L')
        draw = ImageDraw.Draw(img)
        width, height = img.size
        center = (width // 2, height // 2)
        draw.ellipse([center[0] - circle_radius, center[1] - circle_radius, 
                      center[0] + circle_radius, center[1] + circle_radius], fill='black')
        return img

# Load the updated CSV file
csv_file_path = info_path  # Replace with your updated CSV file path
data = pd.read_csv(csv_file_path)

# Set the paths
original_images_folder = '/usr/local/faststorage/datasets/chexpert/img_chexpert'  # Replace with your original dataset path
new_images_folder = '../datasets/dot_90/imgs'
os.makedirs(new_images_folder, exist_ok=True)

# Process the images
circle_radius = 9  # Set the circle radius

for index, row in data.iterrows():
    try: 
        original_image_path = os.path.join(original_images_folder, row['Path'])
        new_image_path = os.path.join(new_images_folder, row['Path'])
        print(new_image_path)
        if row['group'] in [0, 1]:  # Add a circle for groups 0 and 1
            img_with_circle = add_black_circle_to_image(original_image_path, circle_radius)
            img_with_circle.save(new_image_path)
        else:  # Copy the image as is for groups 2 and 3
            shutil.copyfile(original_image_path, new_image_path)
    except:
        # drop the row from the dataframe
        data = data.drop(index)
        print('error')
        continue
# Save the updated DataFrame to a new CSV file
updated_csv_path = info_path  # Replace with your desired output file path
data.to_csv(updated_csv_path, index=False)
print("Images processed and saved in", new_images_folder)


../datasets/dot_90/imgs/train_patient00019_study4_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00062_study2_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00084_study2_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00128_study12_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00134_study7_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00134_study4_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00165_study1_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00169_study2_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00175_study2_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00177_study5_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00204_study16_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00216_study1_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00255_study13_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00256_study5_view1_frontal.jpg
../datasets/dot_90/imgs/train_patient00277_st

# Train / Validation / Test

Here we create our train validation and test set.
We choosed to use 70% of the data for training, 10% for validation and 20% for test. 


We add another column to info.csv callled `partition` which shows the set that the subject is a part of. 

In [17]:
import pandas as pd
import numpy as np

# Load your CSV file
csv_file_path = info_path  # Replace with your CSV file path
data = pd.read_csv(csv_file_path)

# Set a random seed for reproducibility
np.random.seed(0)

# Define the partition sizes
train_size = 0.7
val_size = 0.1
# Test size is implicitly defined as the remaining percentage

# Shuffle the data
data = data.sample(frac=1).reset_index(drop=True)

# Calculate the number of samples for each partition
num_samples = len(data)
num_train = int(train_size * num_samples)
num_val = int(val_size * num_samples)

# Assign partitions
data['partition'] = 2  # Default to test
data.iloc[:num_train, data.columns.get_loc('partition')] = 0  # Train
data.iloc[num_train:num_train + num_val, data.columns.get_loc('partition')] = 1  # Validation

# Save the updated DataFrame to a new CSV file
updated_csv_path = info_path   # Replace with your desired output file path
data.to_csv(updated_csv_path, index=False)

print("Updated CSV file with partitions saved to:", updated_csv_path)


Updated CSV file with partitions saved to: ../datasets/dot_90/info.csv


# Statistics

We run some statistics here to check the number of images in each subgroup

In [18]:
# calculate number of images in each group
import pandas as pd
import numpy as np

# Load your CSV file
csv_file_path = info_path  # Replace with your CSV file path
data = pd.read_csv(csv_file_path)

# Calculate the number of samples for each group
num_samples = len(data)
num_healthy = len(data[data['Healthy/Unhealthy'] == 1])
num_unhealthy = len(data[data['Healthy/Unhealthy'] == -1])

# Print the results
print(f"Total number of samples: {num_samples}")
print(f"Number of healthy samples: {num_healthy}")
print(f"Number of unhealthy samples: {num_unhealthy}")


# calculate number of images in each `group`

group_0 = data[data['group'] == 0]
group_1 = data[data['group'] == 1]
group_2 = data[data['group'] == 2]
group_3 = data[data['group'] == 3]

g0_0 = len(group_0[group_0['partition']==0])
g0_1 = len(group_0[group_0['partition']==1])
g0_2 = len(group_0[group_0['partition']==2])

g1_0 = len(group_1[group_1['partition']==0])
g1_1 = len(group_1[group_1['partition']==1])
g1_2 = len(group_1[group_1['partition']==2])

g2_0 = len(group_2[group_2['partition']==0])
g2_1 = len(group_2[group_2['partition']==1])
g2_2 = len(group_2[group_2['partition']==2])

g3_0 = len(group_3[group_3['partition']==0])
g3_1 = len(group_3[group_3['partition']==1])
g3_2 = len(group_3[group_3['partition']==2])

print(f"Group 0: Train: {g0_0}, Validation: {g0_1}, Test: {g0_2}")
print(f"Group 1: Train: {g1_0}, Validation: {g1_1}, Test: {g1_2}")
print(f"Group 2: Train: {g2_0}, Validation: {g2_1}, Test: {g2_2}")
print(f"Group 3: Train: {g3_0}, Validation: {g3_1}, Test: {g3_2}")

Total number of samples: 2757
Number of healthy samples: 553
Number of unhealthy samples: 2204
Group 0: Train: 1364, Validation: 191, Test: 407
Group 1: Train: 57, Validation: 5, Test: 6
Group 2: Train: 166, Validation: 28, Test: 48
Group 3: Train: 342, Validation: 51, Test: 92


# Final Note

You can first create the paritions and then split into subgroups if you want to have balanced subgroups in the test time. This is what we orignially did in the paper for having fairer results.