# Preprocessing

## Imports libraries

In [None]:
import pandas as pd
import os
import pathlib
import ants
from typing import Union, List, Tuple
import multiprocessing
import SimpleITK as sitk
from nipype.interfaces.dcm2nii import Dcm2niix
import numpy as np
from HD_BET.run import run_hd_bet
from nipype.interfaces import fsl
from intensity_normalization.normalize.zscore import ZScoreNormalize

Extract Dicom Tags from Dicom Files

https://medium.com/@ashkanpakzad/reading-editing-dicom-metadata-w-python-8204223a59f6

https://github.com/pydicom/pydicom

In [1]:
# Get metadata for each sequence from the first file in each sequence

# Get all patients
# Get all the sequences
# Get the first file for each sequence
# Extract Metadata
# Save Metadata in the patient folder named {patientID}_{sequence}_meta

import pydicom

path_to_dicom = ""

dicomFile = pydicom.dcmread(path_to_dicom)

print(dicomFile) # returns list of metadata

# extract needed metadata

# save metadata somewhere


## Step 00: Convert Dicom to Nifti Files
using Dcm2niix, for more information: https://github.com/rordenlab/dcm2niix

Li X, Morgan PS, Ashburner J, Smith J, Rorden C (2016) The first step for neuroimaging data analysis: DICOM to NIfTI conversion. J Neurosci Methods. 264:47-56. doi: 10.1016/j.jneumeth.2016.03.001. PMID: 26945974

In [None]:
path_to_all_patients = ""
path_to_n30 = ""

root = path_to_all_patients

patientDirs = [
    folder for folder in os.listdir(root) if os.path.isdir(os.path.join(root, folder))
]

for patient in patientDirs:

    patientID = patient

    sequenceDirs = [
        sequenceFolder for sequenceFolder in os.listdir(os.path.join(root, patient)) if os.path.isdir(os.path.join(root, patient, sequenceFolder))
    ]

    for sequence in sequenceDirs:
        # new sequence name: {patientID}_{sequence}_{preprocessingStep}

        sequenceType = sequence.split("_")[1]

        converter = Dcm2niix()
        converter.inputs.source_dir = os.path.join(root, patient, sequence)
        converter.inputs.compress = "y" # uses compression, "y" = yes
        converter.inputs.merge_imgs = True
        # converter.inputs.compression = 5
        converter.inputs.out_filename = f"{patientID}_{sequenceType}"
        converter.inputs.output_dir = os.path.join(root, patient)
        converter.run()


# # goes through the list of files/folders at path_to_folder and only adds directories to dirlist
# dirlist = [
#     item for item in os.listdir(root) if os.path.isdir(os.path.join(root, item))
# ]

# print(len(dirlist), "subfolders found")

# for dir in dirlist:
#     converter = Dcm2niix()
#     converter.inputs.source_dir = os.path.join(root, dir)
#     converter.inputs.compress = "y"
#     converter.inputs.merge_imgs = True
#     # converter.inputs.compression = 5
#     converter.inputs.out_filename = "%i_%t_%d_%f_%p_%q_%s_%z"
#     converter.inputs.output_dir = os.path.join(root, dir)
#     converter.run()

## Step 01: Get Caseliste
Reads the excel file with all the cases and returns the patient IDs as a list

In [None]:
path_to_file = ""

cases = pd.read_excel(
        os.path.join(path_to_file, "Cases.xlsx"),
        header=0,
        index_col=None,
        dtype={"ID": str},
    )

caseList = cases.ID.to_list()

## Step 02: Extract Brain
applies FSL.Reorient2Std() (requirement for HD-BET) and returns the extracted brain image

Brain Extraction using HD-BET, for more information: https://github.com/MIC-DKFZ/HD-BET

Isensee F, Schell M, Tursunova I, Brugnara G, Bonekamp D, Neuberger U, Wick A, Schlemmer HP, Heiland S, Wick W, Bendszus M, Maier-Hein KH, Kickingereder P. Automated brain extraction of multi-sequence MRI using artificial neural networks. Hum Brain Mapp. 2019; 1â€“13. https://doi.org/10.1002/hbm.24750

In [None]:
path_to_input_image = ""
path_to_output_image = ""

# Alternative: torchio tocanonical
reorient = fsl.Reorient2Std()
reorient.inputs.in_file = path_to_input_image
reorient.inputs.out_file = path_to_output_image
reorient.run()

run_hd_bet(mri_fnames=path_to_output_image, output_fnames=path_to_output_image)

## Step 03: Fill Holes

In [None]:
def fill_holes(
    binary_image: sitk.Image,
    radius: int = 3,
) -> sitk.Image:
    """
    Fills holes in binary segmentation

    Keyword Arguments:
    - binary_image: sitk.Image = binary brain segmentation
    - radius: int = kernel radius

    Returns:
    - closed_image: sitk.Image = binary brain segmentation with holes filled
    """

    closing_filter = sitk.BinaryMorphologicalClosingImageFilter()
    closing_filter.SetKernelRadius(radius)
    closed_image = closing_filter.Execute(binary_image)

    return closed_image

## Step 04: Binary Segment Brain

In [None]:
def binary_segment_brain(
    image: sitk.Image,
) -> sitk.Image:
    """
    Returns binary segmentation of brain from brain-extracted scan via otsu thresholding

    Keyword Arguments:
    - image: sitk.Image = brain-extracted scan

    Returns:
    - sitk.Image = binary segmentation of brain scan with filled holes
    """

    otsu_filter = sitk.OtsuThresholdImageFilter()
    otsu_filter.SetInsideValue(0)
    otsu_filter.SetOutsideValue(1)
    binary_mask = otsu_filter.Execute(image)

    return fill_holes(binary_mask)

## Step 05: Get Bounding Box

In [None]:
def get_bounding_box(
    image: sitk.Image,
) -> Tuple[int]:
    """
    Returns bounding box of brain-extracted scan

    Keyword Arguments:
    - image: sitk.Image = brain-extracted scan

    Returns
    - bounding_box: Tuple(int) = bounding box (startX, startY, startZ, sizeX, sizeY, sizeZ)
    """

    mask_image = binary_segment_brain(image)

    lsif = sitk.LabelShapeStatisticsImageFilter()
    lsif.Execute(mask_image)
    bounding_box = np.array(lsif.GetBoundingBox(1))

    return bounding_box

## Step 06: Apply Bounding Box

In [None]:
def apply_bounding_box(
    image: sitk.Image,
    bounding_box: Tuple[int],
) -> sitk.Image:
    """
    Returns image, cropped to bounding box

    Keyword Arguments:
    - image: sitk.Image = image
    - bounding_box: Tuple(ing) = bounding box of kind (startX, startY, startZ, sizeX, sizeY, sizeZ)

    Returns
    - cropped_image: sitk.Image = cropped image
    """

    cropped_image = image[
        bounding_box[0] : bounding_box[3] + bounding_box[0],
        bounding_box[1] : bounding_box[4] + bounding_box[1],
        bounding_box[2] : bounding_box[5] + bounding_box[2],
    ]

    return cropped_image

## Step 07: Apply Bias Correction

In [None]:
def apply_bias_correction(
    image: sitk.Image,
) -> sitk.Image:
    """applies N4 bias field correction to image but keeps background at zero

    Keyword Arguments:
    image: sitk.Image = image to apply bias correction to

    Returns:
    image_corrected_masked: sitk.Image = N4 bias field corrected image
    """

    mask_image = binary_segment_brain(image)
    corrector = sitk.N4BiasFieldCorrectionImageFilter()
    image_corrected = corrector.Execute(image, mask_image)

    mask_filter = sitk.MaskImageFilter()
    mask_filter.SetOutsideValue(0)
    image_corrected_masked = mask_filter.Execute(image_corrected, mask_image)

    return image_corrected_masked

## Step 08: Coregister Images

In [None]:
def coregister_antspy(
    fixed_path: Union[str, pathlib.Path],
    moving_path: Union[str, pathlib.Path],
    out_path: Union[str, pathlib.Path],
    num_threads=N_PROC,
) -> ants.core.ants_image.ANTsImage:
    """
    Coregister moving image to fixed image. Return warped image and save to disk.

    Keyword Arguments:
    fixed_path: path to fixed image
    moving_path: path to moving image
    out_path: path to save warped image to
    num_threads: number of threads
    """

    os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = str(num_threads)

    res = ants.registration(
        fixed=ants.image_read(fixed_path),
        moving=ants.image_read(moving_path),
        type_of_transform="antsRegistrationSyNQuick[s]",  # or "SyNRA"
        initial_transform=None,
        outprefix="",
        mask=None,
        moving_mask=None,
        mask_all_stages=False,
        grad_step=0.2,
        flow_sigma=3,
        total_sigma=0,
        aff_metric="mattes",
        aff_sampling=32,
        aff_random_sampling_rate=0.2,
        syn_metric="mattes",
        syn_sampling=32,
        reg_iterations=(40, 20, 0),
        aff_iterations=(2100, 1200, 1200, 10),
        aff_shrink_factors=(6, 4, 2, 1),
        aff_smoothing_sigmas=(3, 2, 1, 0),
        write_composite_transform=False,
        random_seed=None,
        verbose=False,
        multivariate_extras=None,
        restrict_transformation=None,
        smoothing_in_mm=False,
    )

    warped_moving = res["warpedmovout"]

    ants.image_write(warped_moving, out_path)

    return warped_moving

## Step 09: Resample Images

In [None]:
def resample(
    itk_image: sitk.Image,
    out_spacing: Tuple[float, ...],
    is_mask: bool,
) -> sitk.Image:
    """
    Resamples sitk image to expected output spacing

    Keyword Arguments:
    itk_image: sitk.Image
    out_spacing: Tuple
    is_mask: bool = True if input image is label mask -> NN-interpolation

    Returns
    output_image: sitk.Image = image resampled to out_spacing
    """

    original_spacing = itk_image.GetSpacing()
    original_size = itk_image.GetSize()

    out_size = [
        int(round(osz * osp / nsp))
        for osz, osp, nsp in zip(original_size, original_spacing, out_spacing)
    ]

    resample = sitk.ResampleImageFilter()
    resample.SetOutputSpacing(out_spacing)
    resample.SetSize(out_size)
    resample.SetOutputDirection(itk_image.GetDirection())
    resample.SetOutputOrigin(itk_image.GetOrigin())
    resample.SetTransform(sitk.Transform())
    resample.SetDefaultPixelValue(0)

    if is_mask:
        resample.SetInterpolator(sitk.sitkNearestNeighbor)

    else:
        resample.SetInterpolator(
            sitk.sitkBSpline
        )  # sitk.sitkLinear sitk.sitkNearestNeighbor

    output_image = resample.Execute(itk_image)

    return output_image

## Step 10: Z-Score Normalize Images

In [None]:
def zscore_normalize(image: sitk.Image) -> sitk.Image:
    """
    Applies z score normalization to brain scan using a brain mask

    Keyword Arguments:
    image: sitk.Image = input brain scan

    Returns:
    normalized_brain_image: sitk.Image = normalized brain scan
    """

    brain_mask = binary_segment_brain(image)

    normalizer = ZScoreNormalize()
    normalized_brain_array = normalizer(
        sitk.GetArrayFromImage(image),
        sitk.GetArrayFromImage(brain_mask),
    )

    normalized_brain_image = sitk.GetImageFromArray(normalized_brain_array)
    normalized_brain_image.CopyInformation(image)

    return normalized_brain_image