## Utils

In [None]:
import pandas as pd
import numpy as np
import SimpleITK as sitk
from typing import Tuple, Dict

def standardize_nodule_df(dataset: pd.DataFrame) -> pd.DataFrame:
    """
    Function to standardize CSV files exported from CIRRUS Lung Screening for temporal analysis
    Args:
        dataset (pandas.DataFrame): input dataset
    Returns:
        pandas.DataFrame: output dataset
    """
    dataset.PatientID = dataset.PatientID.astype(int)
    dataset.LesionID = dataset.LesionID.astype(int)
    dataset.StudyDate = dataset.StudyDate.astype(int)
    dataset["NoduleID"] = [
        f"{p}_{l}" for p, l in zip(dataset.PatientID, dataset.LesionID)
    ]
    dataset["AnnotationID"] = [
        f"{n}_{s}" for n, s in zip(dataset.NoduleID, dataset.StudyDate)
    ]
    return dataset

def transform(input_image, point):
    """
    Parameters
    ----------
    input_image: SimpleITK Image
    point: array of points
    Returns
    -------
    tNumpyOrigin
    """
    return np.array(
        list(
            reversed(
                input_image.TransformContinuousIndexToPhysicalPoint(
                    list(reversed(point))
                )
            )
        )
    )


def itk_image_to_numpy_image(input_image: sitk.Image) -> Tuple[np.array, Dict]:
    """
    Parameters
    ----------
    input_image: SimpleITK image
    Returns
    -------
    numpyImage: SimpleITK image to numpy image
    header: dict containing origin, spacing and transform in numpy format
    """

    numpyImage = sitk.GetArrayFromImage(input_image)
    numpyOrigin = np.array(list(reversed(input_image.GetOrigin())))
    numpySpacing = np.array(list(reversed(input_image.GetSpacing())))

    # get numpyTransform
    tNumpyOrigin = transform(input_image, np.zeros((numpyImage.ndim,)))
    tNumpyMatrixComponents = [None] * numpyImage.ndim
    for i in range(numpyImage.ndim):
        v = [0] * numpyImage.ndim
        v[i] = 1
        tNumpyMatrixComponents[i] = transform(input_image, v) - tNumpyOrigin
    numpyTransform = np.vstack(tNumpyMatrixComponents).dot(np.diag(1 / numpySpacing))

    # define necessary image metadata in header
    header = {
        "origin": numpyOrigin,
        "spacing": numpySpacing,
        "transform": numpyTransform,
    }

    return numpyImage, header

## Extract nodules (convert from CT to .nii.gz files)

In [None]:
import pandas
import numpy as np
from pathlib import Path
import SimpleITK as sitk
import multiprocessing
import logging

logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)s][%(asctime)s] %(message)s",
    datefmt="%I:%M:%S",
)

MUST_CONTAIN = [
    "SeriesInstanceUID",
    "StudyDate",
    "CoordX",
    "CoordY",
    "CoordZ",
    "PatientID",
]

SAVE_FORMATS = [
    ".nii.gz",
    ".nii",
    ".mha",
    ".mhd",
]


class NoduleExtractor:
    """
    Class to load a dataset and extract nodule patches
    and save them to disk in a format compatible with SimpleITK
    """

    def __init__(
        self,
        csv_path: Path,
        image_root: Path,
        output_path: Path,
        postfix: str = "",
        patch_size: np.array = np.array([128, 128, 64]),
        save_format: str = ".nii.gz",
    ) -> None:

        self.dataset: pandas.DataFrame = pandas.read_csv(csv_path)
        self.output_path = output_path
        self.image_root = image_root
        self.postfix = postfix
        self.patch_size = np.array(patch_size)
        self.save_format = save_format

        self.output_path.mkdir(exist_ok=True, parents=True)

        assert save_format in SAVE_FORMATS, "save format not supported"

        assert np.all(
            [k in self.dataset.keys() for k in MUST_CONTAIN]
        ), f"keys missing: the CSV must contain {MUST_CONTAIN}"

        self.dataset = standardize_nodule_df(self.dataset)

    def process_seriesuid(
        self,
        seriesuid: str,
    ) -> None:
        """Function to generate 3D patches around nodules for a given SeriesInstanceUID
        Expects the dataframe `dataset` to be globally present
        Args:
            seriesuid (str)
        """
        logging.info(f"Extracting nodule blocks from {seriesuid}")

        subset = self.dataset[self.dataset.SeriesInstanceUID == seriesuid]
        extent = self.patch_size // 2

        image_path = str(self.image_root / f"{seriesuid}.mha")

        # Check if nodule is already extracted
        coords = np.array(
            [
                subset.CoordX,
                subset.CoordY,
                subset.CoordZ,
            ]
        ).transpose()

        for i, coord in enumerate(coords):

            pd = subset.iloc[i]
            output_path = (
                    self.output_path
                    / f"{pd.NoduleID}_{int(pd.StudyDate)}{self.postfix}{self.save_format}"
            )

            if Path(output_path).is_file():
                logging.info(f"{pd.NoduleID} of {seriesuid} is already extracted")

            else:
                if Path(image_path).is_file():

                    image = sitk.ReadImage(image_path)

                    pad = False

                    coords = np.array(
                        [
                            subset.CoordX,
                            subset.CoordY,
                            subset.CoordZ,
                        ]
                    ).transpose()

                    for coord in coords:

                        coord = np.array(image.TransformPhysicalPointToIndex(coord))
                        upper_limit_breach = np.any(coord - extent < 0)
                        lower_limit_breach = np.any(coord + extent > np.array(image.GetSize()))

                        if upper_limit_breach or lower_limit_breach:
                            pad = True

                    if pad:

                        image = sitk.ConstantPad(
                            image,
                            [int(e) for e in extent],
                            [int(e) for e in extent],
                            constant=-1024,
                        )

                    for i, coord in enumerate(coords):

                        coord = image.TransformPhysicalPointToIndex(coord)

                        pd = subset.iloc[i]

                        output_path = (
                            self.output_path
                            / f"{pd.NoduleID}_{int(pd.StudyDate)}{self.postfix}{self.save_format}"
                        )

                        image_patch = image[
                            int(coord[0] - extent[0]) : int(coord[0] + extent[0]),
                            int(coord[1] - extent[1]) : int(coord[1] + extent[1]),
                            int(coord[2] - extent[2]) : int(coord[2] + extent[2]),
                        ]

                        if image_patch.GetSize() == tuple(self.patch_size):
                            sitk.WriteImage(image_patch, str(output_path), True)
                        else:
                            logging.info(f"Incorrect patch size in: {seriesuid}")

                else:
                    logging.info(f"Missing mha file: {str(image_path)}")

In [None]:
extractor = NoduleExtractor(
    csv_path=Path(
        'data/LUNA25_Public_Training_Development_Data.csv'
    ),
    image_root=Path('data/raw/'),
    output_path=Path(
        'data/processed/'
    ),
    postfix="_0000",
    save_format=".nii.gz",
)

pool = multiprocessing.Pool(16)
pool.map(
    extractor.process_seriesuid,
    extractor.dataset.SeriesInstanceUID.unique(),
)
pool.close()

## Convert 2 nodule (.npy files)

In [None]:
import SimpleITK as sitk
import numpy as np
import pandas
import multiprocessing
from pathlib import Path
import logging
import argparse

logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)s][%(asctime)s] %(message)s",
    datefmt="%I:%M:%S",
)

class NodulePreProcessor:
    """
    Class to preprocess nodule blocks and store as numpy files
    """

    def __init__(
        self,
        data_path: Path,
        csv_path: Path,
        save_path: Path,
    ) -> None:

        self.data_path = data_path
        self.save_path = save_path

        self.dataset = pandas.read_csv(csv_path)
        self.dataset = standardize_nodule_df(self.dataset)

        self.dst_image_path = self.save_path / "image"
        self.dst_metadata_path = self.save_path / "metadata"

        self.dst_image_path.mkdir(exist_ok=True, parents=True)
        self.dst_metadata_path.mkdir(exist_ok=True, parents=True)


    def prepare_numpy_files(self, annotation_id):
        """Function to load a nifty file (prepared for nnU-Net)
        and convert that into numpy files for fast loading during training
        Args:
            annotation_id (str): The unique annotation ID for a nodule,
            f"{PatientID}_{LesionID}_{StudyDate}"
        """

        logging.info(f"processing {annotation_id}")

        image_path = self.data_path / f"{annotation_id}_0000.nii.gz"

        if image_path.is_file():

            image = sitk.ReadImage(str(image_path))
            image, header = utils.itk_image_to_numpy_image(image)

            np.save(self.dst_image_path / f"{annotation_id}.npy", image)
            np.save(self.dst_metadata_path / f"{annotation_id}.npy", header)

        else:
            logging.info(f"{annotation_id} missing")

In [None]:
preprocessor = NodulePreProcessor(
    data_path=Path(args.data_path),
    csv_path=Path(args.csv_path),
    save_path=Path(args.save_path),
)

tasks = preprocessor.dataset.AnnotationID.unique()

pool = multiprocessing.Pool(args.num_workers)
pool.map(preprocessor.prepare_numpy_files, tasks)
pool.close()