In [9]:
# Standard Libraries
import os
import pydicom
from PIL import Image
import yaml

# Data Manipulation Libraries
import pandas as pd
import numpy as np

# Machine Learning Libraries
from sklearn.model_selection import train_test_split

**Define Parameters**

In [10]:
# File path
EXTRACTED_PATH = '/cluster/home/bjorneme/projects/Data/vinbigdata-chest-xray-abnormalities-detection-extracted'
train_csv = os.path.join(EXTRACTED_PATH, 'train.csv')

# Disease labels
disease_labels = [
    "Aortic enlargement", "Atelectasis", "Calcification", "Cardiomegaly",
    "Consolidation", "ILD", "Infiltration", "Lung Opacity",
    "Nodule/Mass", "Other lesion", "Pleural effusion", "Pleural thickening",
    "Pneumothorax", "Pulmonary fibrosis"
]

In [11]:
def open_dicom_image(path):
    ds = pydicom.dcmread(path)
    img = ds.pixel_array.astype(np.float32)
    img = ((img - img.min()) / (img.max() - img.min()) * 255).astype(np.uint8)
    return Image.fromarray(img)

In [12]:
def convert_annotation_to_yolo(group, img_size_override=None):
    # Size image
    img_path = group.iloc[0]['Path']
    img_width, img_height = open_dicom_image(img_path).size
    
    lines = []
    for _, row in group.iterrows():
        x_min, y_min, x_max, y_max = row[['x_min','y_min','x_max','y_max']]
        x_center = ((x_min + x_max) / 2) / img_width
        y_center = ((y_min + y_max) / 2) / img_height
        box_width = (x_max - x_min) / img_width
        box_height = (y_max - y_min) / img_height
        lines.append(f"{int(row['class_id'])} {x_center:.6f} {y_center:.6f} {box_width:.6f} {box_height:.6f}")
    return img_path, lines

In [13]:
def load_labels(csv_path, image_path, disease_labels):
    df = pd.read_csv(csv_path)
    for d in disease_labels:
        df[d] = df['class_name'].str.contains(d).astype(int)
    df['No finding'] = df['class_name'].str.contains('No finding').astype(int)
    df['Path'] = df['image_id'].apply(lambda x: os.path.join(image_path, 'train', f"{x}.dicom"))
    return df

In [14]:
def process_split(df, split_name, base_dir):
    for image_id, group in df.groupby('image_id'):
        img_path, yolo_lines = convert_annotation_to_yolo(group)
        img_filename = os.path.basename(img_path)
        ext = os.path.splitext(img_filename)[1].lower()
        img_dir = os.path.join(base_dir, "images", split_name)
        label_dir = os.path.join(base_dir, "labels", split_name)
        os.makedirs(img_dir, exist_ok=True)
        os.makedirs(label_dir, exist_ok=True)
        
        pil_img = open_dicom_image(img_path)
        img_filename = os.path.splitext(img_filename)[0] + ".png"
        pil_img.save(os.path.join(img_dir, img_filename))
        
        label_filename = os.path.splitext(img_filename)[0] + ".txt"
        with open(os.path.join(label_dir, label_filename), "w") as f:
            f.write("\n".join(yolo_lines))

In [15]:
df = load_labels(train_csv, EXTRACTED_PATH, disease_labels)
df = df[df['class_id'] != 14]
print(f"Train size: {df.shape[0]}")

unique_ids = df['image_id'].unique()
train_ids, val_ids = train_test_split(unique_ids, test_size=0.2, random_state=42)
train_df, val_df = df[df['image_id'].isin(train_ids)], df[df['image_id'].isin(val_ids)]
print(f"Train images: {len(train_ids)}, Val images: {len(val_ids)}")

for split in ['train', 'val']:
    os.makedirs(os.path.join(EXTRACTED_PATH, "images", split), exist_ok=True)
    os.makedirs(os.path.join(EXTRACTED_PATH, "labels", split), exist_ok=True)

print("Processing training set...")
# process_split(train_df, "train", EXTRACTED_PATH)
print("Processing validation set...")
# process_split(val_df, "val", EXTRACTED_PATH)

Train size: 36096
Train images: 3515, Val images: 879
Processing training set...
Processing validation set...


In [16]:
data_yaml = {
    "train": os.path.join(os.getcwd(), EXTRACTED_PATH, "images", "train"),
    "val": os.path.join(os.getcwd(), EXTRACTED_PATH, "images", "val"),
    "nc": 14,
    "names": disease_labels
}

with open("data.yaml", "w") as f:
    yaml.dump(data_yaml, f, default_flow_style=False)