In [2]:
from rsna_dataloader import *
import torchio as tio

DATA_BASEPATH = "../data/rsna-2024-lumbar-spine-degenerative-classification/"
TRAINING_DATA = retrieve_coordinate_training_data(DATA_BASEPATH)

transform_3d_val = tio.Compose([
    tio.Resize((128, 128, 128), image_interpolation="bspline"),
    tio.RescaleIntensity(out_min_max=(0, 1)),
])

(trainloader, valloader, test_loader,
 trainset, valset, testset) = create_subject_level_datasets_and_loaders(TRAINING_DATA,
                                                                        transform_3d_train=transform_3d_val,
                                                                        transform_3d_val=transform_3d_val,
                                                                        base_path=os.path.join(
                                                                            DATA_BASEPATH,
                                                                            "train_images"),
                                                                        num_workers=0,
                                                                        split_factor=0.3,
                                                                        batch_size=1,
                                                                        )

ImportError: cannot import name 'VisibleDeprecationWarning' from 'numpy' (unknown location)

In [None]:
TEST_DATA = TRAINING_DATA[TRAINING_DATA["study_id"].isin(testset.subjects["study_id"].values)]

In [None]:
def convert_train_data_to_solution(train_df):
    ret = train_df.loc[:,["row_id", "severity"]]
    ret["normal_mild"] = 0
    ret["moderate"] = 0
    ret["severe"] = 0
    ret["sample_weight"] = 1
    
    for index, row in ret.iterrows():
        ret.loc[index, row["severity"]] = 1
        if row["severity"] == "normal_mild":
            ret.loc[index, "sample_weight"] = 1
        elif row["severity"] == "moderate":
            ret.loc[index, "sample_weight"] = 2
        elif row["severity"] == "severe":
            ret.loc[index, "sample_weight"] = 3
        else:
            print(row["severity"])

            
    return ret[["row_id", "normal_mild", "moderate", "severe", "sample_weight"]]


test_solution = convert_train_data_to_solution(TEST_DATA)
test_solution[test_solution["severe"] == 1]

In [None]:
import os


def retrieve_image_paths(base_path, study_id, series_id):
    series_dir = os.path.join(base_path, str(study_id), str(series_id))
    images = os.listdir(series_dir)
    image_paths = [os.path.join(series_dir, img) for img in images]
    return image_paths

In [None]:
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torchio as tio
import albumentations
import cv2
import pydicom
import itk

CONDITIONS = {
    "Sagittal T2/STIR": ["Spinal Canal Stenosis"],
    "Axial T2": ["Left Subarticular Stenosis", "Right Subarticular Stenosis"],
    "Sagittal T1": ["Left Neural Foraminal Narrowing", "Right Neural Foraminal Narrowing"],
}


def read_series_as_volume(dirName, verbose=False):
    PixelType = itk.ctype("signed short")
    Dimension = 3

    ImageType = itk.Image[PixelType, Dimension]

    namesGenerator = itk.GDCMSeriesFileNames.New()
    namesGenerator.SetUseSeriesDetails(True)
    namesGenerator.AddSeriesRestriction("0008|0021")
    namesGenerator.SetGlobalWarningDisplay(False)
    namesGenerator.SetDirectory(dirName)

    seriesUID = namesGenerator.GetSeriesUIDs()

    if verbose:
        if len(seriesUID) < 1:
            print("No DICOMs in: " + dirName)

        print("The directory: " + dirName)
        print("Contains the following DICOM Series: ")
        for uid in seriesUID:
            print(uid)

    reader = None
    dicomIO = None
    for i in range(10):
        for uid in seriesUID:
            seriesIdentifier = uid
            if verbose:
                print("Reading: " + seriesIdentifier)
            fileNames = namesGenerator.GetFileNames(seriesIdentifier)

            reader = itk.ImageSeriesReader[ImageType].New()
            dicomIO = itk.GDCMImageIO.New()
            reader.SetImageIO(dicomIO)
            reader.SetFileNames(fileNames)
            reader.ForceOrthogonalDirectionOff()
        if reader is not None:
            break

    if reader is None or dicomIO is None:
        raise FileNotFoundError(f"Empty path? {os.path.abspath(dirName)}")
    reader.Update()
    data = itk.GetArrayFromImage(reader.GetOutput())

    del namesGenerator
    del dicomIO
    del reader

    return data


class PatientLevelTestset(Dataset):
    def __init__(self,
                 base_path: str,
                 dataframe: pd.DataFrame,
                 transform_3d=None):
        self.base_path = base_path

        self.dataframe = (dataframe[['study_id', "series_id", "series_description"]]
                          .drop_duplicates())

        self.subjects = self.dataframe[['study_id']].drop_duplicates().reset_index(drop=True)

        self.transform_3d = transform_3d

    def __len__(self):
        return len(self.subjects)

    def __getitem__(self, index):
        curr = self.subjects.iloc[index]
        images_basepath = os.path.join(self.base_path, str(curr["study_id"]))
        images = []

        for series_desc in CONDITIONS.keys():
            # !TODO: Multiple matching series
            series = self.dataframe.loc[
                (self.dataframe["study_id"] == curr["study_id"]) &
                (self.dataframe["series_description"] == series_desc)].sort_values("series_id")['series_id'].iloc[0]

            series_path = os.path.join(images_basepath, str(series))
            series_images = read_series_as_volume(series_path)

            if self.transform_3d is not None:
                series_images = self.transform_3d(np.expand_dims(series_images, 0))  #.data

            images.append(torch.Tensor(series_images).squeeze(0))

        return torch.stack(images), curr["study_id"]

In [None]:
transform_3d = tio.Compose([
    tio.Resize((144, 144, 144), image_interpolation="bspline"),
    tio.RescaleIntensity(out_min_max=(0, 1)),
])


In [None]:
def create_subject_level_testset_and_loader(df: pd.DataFrame,
                                            transform_3d,
                                            base_path: str,
                                            batch_size=1,
                                            num_workers=0):
    testset = PatientLevelTestset(base_path, df, transform_3d=transform_3d)
    test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    return testset, test_loader

In [None]:
import os

dataset, dataloader = create_subject_level_testset_and_loader(TEST_DATA, transform_3d,
                                                              os.path.join(DATA_BASEPATH, "train_images"))

In [None]:
import torch

device = torch.device("cuda") if torch.cuda.is_available() else "cpu"

In [None]:
import torch.nn as nn
import timm_3d


class CNN_Model_3D_Multihead(nn.Module):
    def __init__(self, backbone="efficientnet_lite0", in_chans=1, out_classes=5, out_dim=3, pretrained=True):
        super(CNN_Model_3D_Multihead, self).__init__()
        self.out_classes = out_classes

        self.encoder = timm_3d.create_model(
            backbone,
            num_classes=out_classes * CONFIG["out_dim"],
            features_only=False,
            drop_rate=CONFIG["drop_rate"],
            drop_path_rate=CONFIG["drop_path_rate"],
            # drop_rate_last=CONFIG["drop_rate_last"],
            pretrained=pretrained,
            in_chans=in_chans,
        )
        head_in_dim = self.encoder.classifier.in_features
        self.encoder.classifier = nn.Identity()
        self.heads = nn.ModuleList(
            [nn.Sequential(
                nn.Linear(head_in_dim, 1),
                LogisticCumulativeLink(CONFIG["out_dim"])
            ) for i in range(out_classes)]
        )

        self.ascension_callback = AscensionCallback()

    def forward(self, x):
        feat = self.encoder(x)
        return torch.swapaxes(torch.stack([head(feat) for head in self.heads]), 0, 1)

    def _ascension_callback(self):
        for head in self.heads:
            self.ascension_callback.clip(head[1])


In [None]:
model = torch.load(
    "../models\\tf_efficientnetv2_m_144_3d_spacecutter\\tf_efficientnetv2_m_144_3d_spacecutter_35.pt", map_location=torch.device('cpu')).to(
    device)

In [None]:
CONDITIONS = {
    "Sagittal T2/STIR": ["spinal_canal_stenosis"],
    "Axial T2": ["left_subarticular_stenosis", "right_subarticular_stenosis"],
    "Sagittal T1": ["left_neural_foraminal_narrowing", "right_neural_foraminal_narrowing"],
}

ALL_CONDITIONS = sorted(["spinal_canal_stenosis", "left_subarticular_stenosis", "right_subarticular_stenosis",
                         "left_neural_foraminal_narrowing", "right_neural_foraminal_narrowing"])
LEVELS = ["l1_l2", "l2_l3", "l3_l4", "l4_l5", "l5_s1"]

results_df = pd.DataFrame({"row_id": [], "normal_mild": [], "moderate": [], "severe": []})

ALL_CONDITIONS

In [None]:
# Pre-populate results df
import glob
import os

study_ids = TEST_DATA["study_id"].values
# study_ids = glob.glob("/kaggle/input/rsna-2024-lumbar-spine-degenerative-classification/test_images/*")
# study_ids = [os.path.basename(e) for e in study_ids]

results_df = pd.DataFrame({"row_id": [], "normal_mild": [], "moderate": [], "severe": []})
for study_id in study_ids:
    for condition in ALL_CONDITIONS:
        for level in LEVELS:
            row_id = f"{study_id}_{condition}_{level}"
            results_df = results_df._append(
                {"row_id": row_id, "normal_mild": 1 / 3, "moderate": 1 / 3, "severe": 1 / 3}, ignore_index=True)

In [None]:
import numpy as np
import pandas as pd
import pandas.api.types
import sklearn.metrics


class ParticipantVisibleError(Exception):
    pass


def get_condition(full_location: str) -> str:
    # Given an input like spinal_canal_stenosis_l1_l2 extracts 'spinal'
    for injury_condition in ['spinal', 'foraminal', 'subarticular']:
        if injury_condition in full_location:
            return injury_condition
    raise ValueError(f'condition not found in {full_location}')


def score(
        solution: pd.DataFrame,
        submission: pd.DataFrame,
        row_id_column_name: str,
        any_severe_scalar: float
    ) -> float:
    '''
    Pseudocode:
    1. Calculate the sample weighted log loss for each medical condition:
    2. Derive a new any_severe label.
    3. Calculate the sample weighted log loss for the new any_severe label.
    4. Return the average of all of the label group log losses as the final score, normalized for the number of columns in each group.
       This mitigates the impact of spinal stenosis having only half as many columns as the other two conditions.
    '''

    target_levels = ['normal_mild', 'moderate', 'severe']

    # Run basic QC checks on the inputs
    if not pandas.api.types.is_numeric_dtype(submission[target_levels].values):
        raise ParticipantVisibleError('All submission values must be numeric')

    if not np.isfinite(submission[target_levels].values).all():
        raise ParticipantVisibleError('All submission values must be finite')

    if solution[target_levels].min().min() < 0:
        raise ParticipantVisibleError('All labels must be at least zero')
    if submission[target_levels].min().min() < 0:
        raise ParticipantVisibleError('All predictions must be at least zero')

    solution['study_id'] = solution['row_id'].apply(lambda x: x.split('_')[0])
    solution['location'] = solution['row_id'].apply(lambda x: '_'.join(x.split('_')[1:]))
    solution['condition'] = solution['row_id'].apply(get_condition)

    # del solution[row_id_column_name]
    # del submission[row_id_column_name]
    # assert sorted(submission.columns) == sorted(target_levels)
    
    submission = submission.sort_values(by="row_id")
    solution = solution.sort_values(by="row_id")

    submission['study_id'] = solution['study_id']
    submission['location'] = solution['location']
    submission['condition'] = solution['condition']

    condition_losses = []
    condition_weights = []
    for condition in ['spinal', 'foraminal', 'subarticular']:
        condition_indices = solution.loc[solution['condition'] == condition].index.values
        condition_loss = sklearn.metrics.log_loss(
            y_true=solution.loc[condition_indices, target_levels].values,
            y_pred=submission.loc[condition_indices, target_levels].values,
            sample_weight=solution.loc[condition_indices, 'sample_weight'].values
        )
        condition_losses.append(condition_loss)
        condition_weights.append(1)

    any_severe_spinal_labels = pd.Series(solution.loc[solution['condition'] == 'spinal'].groupby('study_id')['severe'].max())
    any_severe_spinal_weights = pd.Series(solution.loc[solution['condition'] == 'spinal'].groupby('study_id')['sample_weight'].max())
    any_severe_spinal_predictions = pd.Series(submission.loc[submission['condition'] == 'spinal'].groupby('study_id')['severe'].max())
    any_severe_spinal_loss = sklearn.metrics.log_loss(
        y_true=any_severe_spinal_labels,
        y_pred=any_severe_spinal_predictions,
        sample_weight=any_severe_spinal_weights
    )
    condition_losses.append(any_severe_spinal_loss)
    condition_weights.append(any_severe_scalar)
    return np.average(condition_losses, weights=condition_weights)

In [None]:
results_df = results_df.drop_duplicates(subset=["row_id"]).reset_index(drop=True)
test_solution = test_solution.drop_duplicates(subset=["row_id"]).reset_index(drop=True)

score(test_solution, results_df, "row_id", 1.0)

In [None]:
softmax = nn.Softmax(dim=2)

In [None]:
results_df

In [None]:
from tqdm import tqdm

with torch.no_grad():
    model.eval()

    for item_index, item in tqdm(enumerate(testset)):
        images, _  = item
        output = model(images.unsqueeze(0).to(device))
        output = output.reshape((-1, 25, 3))
        output = softmax(output)
        output = output.detach().cpu().numpy()[0]
        study_id = testset.subjects["study_id"].values[item_index]
        for index, level in enumerate(output):
            row_id = f"{str(study_id)}_{ALL_CONDITIONS[index // 5]}_{LEVELS[index % 5]}"
            results_df.loc[results_df.row_id == row_id, 'normal_mild'] = level[0]
            results_df.loc[results_df.row_id == row_id, 'moderate'] = level[1]
            results_df.loc[results_df.row_id == row_id, 'severe'] = level[2]

In [None]:
score(test_solution, results_df, "row_id", 1.0)

In [None]:
with torch.no_grad():
    model.eval()

    for item_index, item in tqdm(enumerate(testset)):
        images, _ = item
        output = model(images.unsqueeze(0).to(device))
        output = output.reshape((-1, 25, 3))
        output = softmax(output * (1 / np.array([1, 2, 4])))
        output = output.detach().cpu().numpy()[0]
        study_id = testset.subjects["study_id"].values[item_index]
        for index, level in enumerate(output):
            row_id = f"{str(study_id)}_{ALL_CONDITIONS[index // 5]}_{LEVELS[index % 5]}"
            results_df.loc[results_df.row_id == row_id, 'normal_mild'] = level[0]
            results_df.loc[results_df.row_id == row_id, 'moderate'] = level[1]
            results_df.loc[results_df.row_id == row_id, 'severe'] = level[2]


In [None]:
score(test_solution, results_df, "row_id", 1.0)