# Preprocessing Notebook

**Purpose:** Prepare the dataset for training and evaluation.  
**Pipeline order:** This notebook must be run first, before `training.ipynb` and `errordatasetcreation.ipynb`.

**Inputs required:**
- VinDr-CXR dataset (unzipped in `vinbigdata-chest-xray-abnormalities-detection/`)

**Outputs:**
- Preprocessed CSVs and PNG images for training, validation, and testing.

# Imports

In [2]:
import os
import pydicom
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
from pathlib import Path
import shutil
import random
import yaml

# Preprocessing Annotations

## Rescale Bounding Boxes to 1024×1024

In [None]:
folder_path = r"vinbigdata-chest-xray-abnormalities-detection\train"

TARGET_SIZE = 1024

csv_path = Path("vinbigdata-chest-xray-abnormalities-detection/train.csv")
dicom_folder = Path("vinbigdata-chest-xray-abnormalities-detection/train")
output_csv = Path("preprocessed_1024.csv")
TARGET_SIZE = 1024

# Load data
data = pd.read_csv(csv_path)

for idx in data.index:
    if data.at[idx, "class_id"] != 14:
        dicom_file = dicom_folder / f"{data.at[idx, 'image_id']}.dicom"
        if not dicom_file.exists():
            print(f"Warning: {dicom_file} not found, skipping.")
            continue
        ds = pydicom.dcmread(str(dicom_file))
        height, width = ds.pixel_array.shape
        scale_x = TARGET_SIZE / width
        scale_y = TARGET_SIZE / height

        xmin = float(data.at[idx, "x_min"])
        ymin = float(data.at[idx, "y_min"])
        xmax = float(data.at[idx, "x_max"])
        ymax = float(data.at[idx, "y_max"])

        data.at[idx, "x_min"] = int(xmin * scale_x)
        data.at[idx, "y_min"] = int(ymin * scale_y)
        data.at[idx, "x_max"] = int(xmax * scale_x)
        data.at[idx, "y_max"] = int(ymax * scale_y)

# Save output
data.to_csv(output_csv, index=False)
print(f"Saved {output_csv}")

## Split into training, validation, and testing sets

In [None]:
data = pd.read_csv("preprocessed_1024.csv")

image_ids = data['image_id'].unique()

ids_training_validation, ids_testing = train_test_split(image_ids, test_size=0.2, random_state=42)

image_ids_train, image_ids_val = train_test_split(ids_training_validation, test_size=0.2, random_state=42)

group_test = data[data['image_id'].isin(ids_testing)]
group_train = data[data['image_id'].isin(image_ids_train)]
group_val = data[data['image_id'].isin(image_ids_val)]

group_test.to_csv("testing.csv", index=False)
group_train.to_csv("training.csv", index=False)
group_val.to_csv("validation.csv", index=False)

# Preprocessing Images

## Convert images from DICOM to PNG and resize to 1024x1024

In [None]:
DICOM_DIR   = Path(r"vinbigdata-chest-xray-abnormalities-detection\train")
OUTPUT_DIR  = Path(r"images")
TARGET_SIZE = 1024

OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

for dcm_path in DICOM_DIR.glob("*.dcm"):
    ds  = pydicom.dcmread(dcm_path)
    arr = ds.pixel_array.astype(np.float32)

    arr -= arr.min()
    if arr.max() != 0:
        arr = (arr / arr.max()) * 255.0
    arr = arr.astype(np.uint8)

    img = Image.fromarray(arr, mode="L")

    img = img.resize((TARGET_SIZE, TARGET_SIZE), resample=Image.LANCZOS)

    out_path = OUTPUT_DIR / f"{dcm_path.stem}.png"
    img.save(out_path)

    print(f"Saved {out_path}")

## Split images into training, validation, and testing sets

In [None]:
# Folder containing images
input_folder = "images"

# CSV listing id's for split
train_csv = "training.csv"
validation_csv = "validation.csv"
test_csv = "testing.csv"

# Where to put each split
train_folder = "YOLODataset/train/images"
validation_folder = "YOLODataset/val/images"
test_folder = "Error Dataset/images"

for folder in (train_folder, validation_folder, test_folder):
    os.makedirs(folder, exist_ok=True)

# Create list of image ids
train_ids = pd.read_csv(train_csv)["image_id"].astype(str).tolist()
val_ids = pd.read_csv(validation_csv)["image_id"].astype(str).tolist()
test_ids = pd.read_csv(test_csv)["image_id"].astype(str).tolist()

# Split Images
for fname in os.listdir(input_folder):
    
    image_id = os.path.splitext(fname)[0]
    image_path = os.path.join(input_folder, fname)

    if image_id in train_ids:
        dst_folder = train_folder
    elif image_id in val_ids:
        dst_folder = validation_folder
    elif image_id in test_ids:
        dst_folder = test_folder
    else:
        continue

    shutil.move(image_path, os.path.join(dst_folder, fname))

print("Images have been split")

## Add background images to training, validation, and testing sets

In [None]:
def add_bg(source, destination):

    destination_images = [f for f in os.listdir(destination)]
    source_images = [f for f in os.listdir(source)]

    num_to_move = len(destination_images)

    selected_images = random.sample(source_images, num_to_move)

    for img in selected_images:
        src_path = os.path.join(source, img)
        dst_path = os.path.join(destination, img)
        shutil.move(src_path, dst_path)

    print(f"Moved {num_to_move} images from source to destination.")

add_bg("images", r"YOLODataset/train/images")
add_bg("images", r"YOLODataset/val/images")
add_bg("images", "Error Dataset/images")

# Convert to YOLO format

## Convert annotations to text file

In [None]:
def create_yolo_txt_files(csv_path, label_dir, img_size=1024):
    df = pd.read_csv(csv_path)

    os.makedirs(label_dir, exist_ok=True)

    grouped = df.groupby('image_id')

    for image_id, group in grouped:
        txt_file = os.path.join(label_dir, f"{image_id}.txt")
        with open(txt_file, "w") as f:
            for _, row in group.iterrows():
                class_id = row['class_id']
                x_min, y_min, x_max, y_max = row['x_min'], row['y_min'], row['x_max'], row['y_max']

                x_center = ((x_min + x_max) / 2) / img_size
                y_center = ((y_min + y_max) / 2) / img_size
                width = (x_max - x_min) / img_size
                height = (y_max - y_min) / img_size

                f.write(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")

    print(f"YOLO .txt files created in: {label_dir}")

csv_path = "training.csv"
label_dir = "YOLODataset/train/labels"
img_size = 1024

create_yolo_txt_files(csv_path, label_dir, img_size)

csv_path = "validation.csv"
label_dir = "YOLODataset/val/labels"

create_yolo_txt_files(csv_path, label_dir, img_size)

## Create a single annotation class

In [None]:
for split in ["train", "test"]:
    annotation_dir = f"/content/drive/MyDrive/YOLODataset/{split}/labels"

    for file_name in os.listdir(annotation_dir):
        file_path = os.path.join(annotation_dir, file_name)
        with open(file_path, "r") as file:
            lines = file.readlines()

        with open(file_path, "w") as file:
            for line in lines:
                parts = line.strip().split()
                parts[0] = "0"
                file.write(" ".join(parts) + "\n")

## Create YAML file for Ultralytics requirements 

In [None]:
def create_yaml_file(output_path, dataset_path, num_classes, class_names):

    data = {
        "path": dataset_path,
        "train": "train/images",
        "val": "val/images",
        "nc": num_classes,
        "names": class_names
    }

    with open(output_path, "w") as file:
        yaml.dump(data, file, default_flow_style=False)
    print(f"YAML file created at: {output_path}")

dataset_base_path = "YOLOExperiment"
output_yaml_path = "YOLOExperiment.yaml"
number_of_classes = 1
class_names_list = [
    "Abnormality"
]

create_yaml_file(output_yaml_path, dataset_base_path, number_of_classes, class_names_list)