In [1]:
import os
import pandas as pd
import numpy as np
import pydicom
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import cv2
import glob

In [2]:
IMG_DIR = "/kaggle/input/rsna-2024-lumbar-spine-degenerative-classification/train_images"

In [3]:
FOLDS = [0,1,2,3,4]
OD_INPUT_SIZE = 384
STD_BOX_SIZE = 20
SAMPLE = None
CONDITIONS = ['Spinal Canal Stenosis']
SEVERITIES = ['Normal/Mild', 'Moderate', 'Severe']
LEVELS = ['l1_l2', 'l2_l3', 'l3_l4', 'l4_l5', 'l5_s1']

In [5]:
train_val_df = pd.read_csv('/kaggle/input/rsna-2024-lumbar-spine-degenerative-classification/train.csv')
train_xy = pd.read_csv('/kaggle/input/rsna-2024-lumbar-spine-degenerative-classification/train_label_coordinates.csv')
train_des = pd.read_csv('/kaggle/input/rsna-2024-lumbar-spine-degenerative-classification/train_series_descriptions.csv')
fold_df = pd.read_csv('/kaggle/input/lsdc-fold-split/5folds.csv')
train_xy.head(3)

In [9]:
def get_level(text):
    for lev in ['l1_l2', 'l2_l3', 'l3_l4', 'l4_l5', 'l5_s1']:
        if lev in text:
            split = lev.split('_')
            split[0] = split[0].capitalize()
            split[1] = split[1].capitalize()
            return '/'.join(split)
    raise ValueError('Level not found '+ lev)
    
def get_condition(text):
    split = text.split('_')
    for i in range(len(split)):
        split[i] = split[i].capitalize()
    split = split[:-2]
    return ' '.join(split)
#     raise ValueError('Condition not found '+ lev)

In [None]:
train_xy['condition'].unique()

In [12]:
label_df = {'study_id':[], 'condition': [], 'level':[], 'label':[]}

for i, row in train_val_df.iterrows():
    study_id = row['study_id']
    for k, label in row.iloc[1:].to_dict().items():
        level = get_level(k)
        condition = get_condition(k)
        label_df['study_id'].append(study_id)
        label_df['condition'].append(condition)
        label_df['level'].append(level)
        label_df['label'].append(label)

label_df = pd.DataFrame(label_df)
label_df = label_df.merge(fold_df, on='study_id')

In [13]:
train_xy = train_xy.merge(train_des, how='inner', on=['study_id', 'series_id'])
label_df = label_df.merge(train_xy, how='inner', on=['study_id', 'condition', 'level'])

In [19]:
def read_dcm(src_path):
    dicom_data = pydicom.dcmread(src_path)
    image = dicom_data.pixel_array
    image = (image - image.min()) / (image.max() - image.min() +1e-6) * 255
    image = np.stack([image]*3, axis=-1).astype('uint8')
    return image

In [25]:
filtered_df = label_df[label_df.condition.map(lambda x: x in CONDITIONS)]

In [26]:
label2id = {}
id2label = {}
i = 0
for cond in CONDITIONS:
    for level in LEVELS:
        for severity in SEVERITIES:
            cls_ = f"{cond.lower().replace(' ', '_')}_{level}_{severity.lower()}"
            label2id[cls_] = i
            id2label[i] = cls_
            i+=1
id2label

In [28]:
def gen_yolo_format(ann_df, phase='train'):
    for name, group in tqdm(ann_df.groupby(['study_id', 'series_id', 'instance_number'])):
        study_id, series_id, instance_num = name[0], name[1], name[2]
        path = f'{IMG_DIR}/{study_id}/{series_id}/{instance_num}.dcm'
        img = read_dcm(path)
        H, W = img.shape[:2]

        img_dir = os.path.join(OUT_DIR, 'images', phase)
        os.makedirs(img_dir, exist_ok=True)
        img_path = os.path.join(img_dir, f'{study_id}_{series_id}_{instance_num}.jpg')
        cv2.imwrite(img_path, img)

        ann_dir = os.path.join(OUT_DIR, 'labels', phase)
        os.makedirs(ann_dir, exist_ok=True)
        ann_path = os.path.join(ann_dir, f'{study_id}_{series_id}_{instance_num}.txt')
        with open(ann_path, 'w') as f:
            for i, row in group.iterrows():
                cond = row['condition']
                level = row['level']
                severity = row['label']
                class_label = f"{cond.lower().replace(' ', '_')}_{level.lower().replace('/', '_')}_{severity.lower()}"
                class_id = label2id[class_label]
                x_center = row['x'] / W
                y_center = row['y'] / H
                width = W / OD_INPUT_SIZE * STD_BOX_SIZE / W
                height = H /  OD_INPUT_SIZE * STD_BOX_SIZE / H
                f.write(f'{class_id} {x_center} {y_center} {width} {height}\n')

In [None]:
for FOLD in FOLDS:
    print('Gen data fold', FOLD)
    OUT_DIR = f'data_fold{FOLD}'
    os.makedirs(OUT_DIR, exist_ok=True)
    
    train_df = filtered_df[filtered_df.fold != FOLD]
    val_df = filtered_df[filtered_df.fold == FOLD]
    
    gen_yolo_format(train_df, phase='train')
    gen_yolo_format(val_df, phase='val')