This script registers all aseg of 02_MSLesSeg_FreeSurfer_output in dimension of 01_MSLesSeg_Dataset and store output into 06_transformix_results

In [6]:
from pathlib import Path
from copy import deepcopy
import SimpleITK as sitk
import subprocess
import shutil

code_path = Path.cwd().parent
data_path = Path.cwd().parent.parent / "data"

In [None]:
"""
02 -> 03 and 04
Convert all brain and aseg mgz to nii.gz and store them respectively into 03_brain_nii_results and 04_aseg_nii_results
"""

# Create a directory to store converted brain nii.gz files
brain_nii_results_path = data_path / "03_brain_nii_results"
brain_nii_results_path.mkdir(parents = True, exist_ok = True)

# Create a directory to store converted aseg nii.gz files
aseg_nii_results_path = data_path / "04_aseg_nii_results"
aseg_nii_results_path.mkdir(parents = True, exist_ok = True)

# Convert brain mgz to nii.gz and save them in brain_nii_results
freesurfer_path = data_path / "02_MSLesSeg_FreeSurfer_output"
missing_brain_file = brain_nii_results_path / "missing_brain_file.txt" # Log missing brain.mgz files
missing_aseg_file = aseg_nii_results_path / "missing_aseg_file.txt" # Log missing aseg.mgz files

if missing_brain_file.exists():
    missing_brain_file.unlink()
    
if missing_aseg_file.exists():
    missing_aseg_file.unlink()
    
for sub_dir in freesurfer_path.iterdir():
    if sub_dir.is_dir():
        brain_path = sub_dir / "mri" / "brain.mgz"
        aseg_path = sub_dir / "mri" / "aseg.mgz"
        
        if brain_path.exists():
            converted_brain_path = brain_nii_results_path / f"{sub_dir.name}_brain.nii.gz"
            sitk.WriteImage(sitk.ReadImage(str(brain_path)), str(converted_brain_path))
        else:
            with open(missing_brain_file, "a", encoding = "utf-8") as f:
                f.write(f"{sub_dir.name} has no brain.mgz in FreeSurfer data\n")
                
        if aseg_path.exists():
            converted_aseg_path = aseg_nii_results_path / f"{sub_dir.name}_aseg.nii.gz"
            sitk.WriteImage(sitk.ReadImage(str(aseg_path)), str(converted_aseg_path))
        else:
            with open(missing_aseg_file, "a", encoding = "utf-8") as f:
                f.write(f"{sub_dir.name} has no aseg.mgz in FreeSurfer data\n")

In [None]:
"""
01 and 03 and Parameters.T1Brain.affine.txt -> 05
Register brain to dimension of Dataset and store results into 05_elastix_results
This process could take more or less 25 minutes according to the PC performance
"""

# Arrange converted brain nii.gz files in a dictionnary associated with their split name
brain_split_path_dict = {} # this dictionnary takes brain split path list as key and its path as value

for nii_file in brain_nii_results_path.rglob("*.nii.gz"):
    arg = nii_file.name.split("_")
    arg[-1] = arg[-1].split(".")
    arg = tuple(arg[:-1] + arg[-1])
    brain_split_path_dict[arg] = nii_file


# Arrange converted aseg nii.gz files in a dictionnary associated with their split name
aseg_split_path_dict = {} # this dictionnary takes aseg split path list as key and its path as value

for nii_file in aseg_nii_results_path.rglob("*.nii.gz"):
    arg = nii_file.name.split("_")
    arg[-1] = arg[-1].split(".")
    arg = tuple(arg[:-1] + arg[-1])
    aseg_split_path_dict[arg] = nii_file


# Arrange T1 dataset nii.gz files in a dictionnary associated with their split name
mslesseg_path = data_path / "01_MSLesSeg_Dataset"
t1_split_path_dict = {}

for split_dir in ["train", "test"]:
    split_path = mslesseg_path / split_dir
    
    if not split_path.exists():
        continue
    
    for irm_path in split_path.rglob("*.nii.gz"):
        if irm_path.name.endswith("_T1.nii.gz"):
            arg = irm_path.name.split("_")
            if len(arg) == 2:
                arg.insert(1, "T1")
            
            arg[-1] = arg[-1].split(".")
            arg = tuple(arg[:-1] + arg[-1])
            t1_split_path_dict[arg] = irm_path


# Create a directory to store elastix results 
elastix_results_path = data_path / "05_elastix_results"
elastix_results_path.mkdir(parents = True, exist_ok = True)


# Arrange valid brain nii.gz files that have their corresponding T1 nii.gz files in the dataset
valid_brain_split_path_dict = deepcopy(brain_split_path_dict)
missing_t1_file = elastix_results_path / "missing_t1_file.txt"

if missing_t1_file.exists():
    missing_t1_file.unlink()
    
extracted_t1_split = [split[0:2] for split in t1_split_path_dict.keys()] # split[0:2] is (patient_id, timepoint_id)

for split in brain_split_path_dict.keys():
    if split[0:2] not in extracted_t1_split:
        with open(missing_t1_file, "a", encoding = "utf-8") as f:
            f.write(f"{('_'.join(split[0:3]))} doesn't exist in the dataset\n")
        valid_brain_split_path_dict.pop((split[0:2] + ("T1", "brain", "nii", "gz")), None) # remove invalid brains


# Elastix registration
t1_map = {split[0:2]: path for split, path in t1_split_path_dict.items()}

for split, brain_path in valid_brain_split_path_dict.items():
        extracted_split = split[0:2]
        t1_path = t1_map.get(extracted_split, None)
        
        if t1_path is not None:
            dir_path = elastix_results_path / f"{'_'.join(extracted_split + ('T1',))}"
            dir_path.mkdir(parents = True, exist_ok = True)
            
            cmd = [
            "elastix",
            "-m", str(brain_path),
            "-f", str(t1_path),
            "-p", "Parameters.T1Brain.affine.txt",
            "-out", str(dir_path)
            ]
            
            result = subprocess.run(cmd, capture_output = True, text = True, shell = True)

In [None]:
"""
04 and 05 -> 06
Register aseg to dimension of Dataset and store results into 06_transformix_results
"""

# Rewrite TransformParameters.0.txt files to use NearestNeighbor interpolation
for sub_dir in elastix_results_path.iterdir():
    if sub_dir.is_dir():
        param_path = sub_dir /"TransformParameters.0.txt"
        with param_path.open("r", encoding = "utf-8") as f:
            lines = f.readlines()
        
        new_lines = []
        for line in lines:
            if line.strip() == "(FinalBSplineInterpolationOrder 3)":
                continue
            if '(ResampleInterpolator "FinalBSplineInterpolator")' in line:
                line = '(ResampleInterpolator "FinalNearestNeighborInterpolator")\n'
            new_lines.append(line)
        
        new_param_path = sub_dir / "New_TransformParameters.0.txt"
        if new_param_path.exists():
            new_param_path.unlink()
        with new_param_path.open("w", encoding="utf-8") as f:
            f.writelines(new_lines)


# Create a directory to store transformix results
transformix_results_path = data_path / "06_transformix_results"
transformix_results_path.mkdir(parents = True, exist_ok = True)


# Transformix registration
aseg_map = {split[0:2]: value for (split, value) in aseg_split_path_dict.items()}

for sub_dir in aseg_nii_results_path.rglob("*.nii.gz"):
    arg = sub_dir.name.split("_")
    arg[-1] = arg[-1].split(".")
    split = tuple(arg[:-1] + arg[-1])
    
    extracted_split = split[0:2]
    aseg_path = aseg_map.get(extracted_split, None)
    
    if aseg_path is not None:
        transformix_dir_path = transformix_results_path / f"{'_'.join(extracted_split + ('T1',))}"
        transformix_dir_path.mkdir(parents = True, exist_ok = True)
        
        cmd = [
        "transformix",
        "-in", str(aseg_path),
        "-tp", str(elastix_results_path / f"{'_'.join(extracted_split + ('T1',))}" / "New_TransformParameters.0.txt"),
        "-out", str(transformix_dir_path)
        ]
        
        result = subprocess.run(cmd, capture_output = True, text = True, shell = True)

In [None]:
"""
06 -> 07
Add registered aseg and all missing file logs into 07_registered_aseg_results
"""

registered_aseg_path = data_path / "07_registered_aseg_results"
registered_aseg_path.mkdir(parents = True, exist_ok = True)

for sub_dir in transformix_results_path.iterdir():
    if not sub_dir.is_dir():
        continue

    src = sub_dir / "result.nii.gz"
    if not src.exists():
        continue

    # Patient information extraction
    try:
        patient_id, timepoint_id, _ = sub_dir.name.split("_", 2)
    except ValueError:
        continue  # additional safety check

    # Destination path determination
    dataset_sub_dir = mslesseg_path / "train" / patient_id / timepoint_id
    dst = registered_aseg_path / f"{patient_id}_{timepoint_id}_aseg.nii.gz"
    if not dataset_sub_dir.exists():
        dataset_sub_dir = mslesseg_path / "test" / patient_id
        dst = registered_aseg_path / f"{patient_id}_aseg.nii.gz"
    if not dataset_sub_dir.exists():
        continue  # additional safety check

    # Copy and rename the aseg file
    shutil.copy2(src, dst)
    
    # Copy all missing file logs
    shutil.copy2(missing_brain_file, registered_aseg_path)
    shutil.copy2(missing_aseg_file, registered_aseg_path)
    shutil.copy2(missing_t1_file, registered_aseg_path)

CAREFUL, the code below are not mandatory to execute, it's optional.

In [None]:
"""
!!!ALERT!!! : This sub-script suppresses all intermediate directories created during the process as 03, 04, 05 and 06
Change 'remove' to True if you want to delete them
"""

remove = False

if remove == True:
    shutil.rmtree(brain_nii_results_path)
    shutil.rmtree(aseg_nii_results_path)
    shutil.rmtree(elastix_results_path)
    shutil.rmtree(transformix_results_path)