# Libraries

In [11]:
# Import Libraries
import shutil
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import SimpleITK as sitk
import matplotlib.pyplot as plt
from tqdm import tqdm
from typing import List
from pathlib import Path

import utils_registration  

# Import Dataset

In [14]:
# Define the path of the main directory
MAIN_PATH = Path('../Lab_03/').resolve()
DATA_PATH = MAIN_PATH/ 'data'

# Define the path of train dataset directory
test_images_dir_path = DATA_PATH / 'train-set' / 'training-images'
test_labels_dir_path = DATA_PATH / 'train-set' / 'training-labels'
test_masks_dir_path  = DATA_PATH / 'train-set' / 'training-masks'
output_dir_path      = DATA_PATH / 'train-set' / 'training-outputs'

# Define the path of the registration parameter directory
parameter_dir_path = MAIN_PATH / 'parameter'

# Define the name of the registration parameter
reg_parameter = 'Par0009.affine.txt'

# Get Mutual Information

In [15]:
# Iterate over fixed images in the training directory
for i, fixed_image_path in tqdm(enumerate(train_images_dir_path.iterdir()), total=len(list(train_images_dir_path.iterdir())), desc='Registration Progressing'):
    # Get the id of the fixed image
    fixed_image_id = fixed_image_path.stem
    
    # Define the path of the output directory to store registered images
    output_path = output_dir_path / 'Par0009.affine' / fixed_image_id 
    
    # Check if the output directory is already available, if yes, skip registration
    if output_path.exists():
        continue
    
    # Create the output directory if it doesn't exist
    output_path.mkdir(exist_ok=True, parents=True)
    
    # Iterate over moving images in the training directory
    for j, moving_image_path in enumerate(train_images_dir_path.iterdir()):
        # Get the id of the moving image
        moving_image_id = moving_image_path.stem
        
        # Define the path of the registered image
        reg_image_path = output_path / f'reg_{moving_image_id}.nii.gz'
        
        # Check if fixed image and moving image are the same; if true, copy the image without registration
        if fixed_image_id == moving_image_id:
            shutil.copy(moving_image_path, reg_image_path)
            continue
        
        # Read and modify parameters from the registration parameter file
        reg_parameter_file_path = parameter_dir_path / reg_parameter
        field_value_pairs = [('ResultImageFormat', 'nii.gz')]
        utils_registration.modify_parameter(field_value_pairs, reg_parameter_file_path)
        
        # Apply registration using elastix wrapper
        utils_registration.elastix(fixed_image_path, moving_image_path, reg_image_path, reg_parameter_file_path)

Registration Progressing: 100%|█████████████████████████████████████████████████████| 20/20 [1:22:10<00:00, 246.51s/it]


In [16]:
# Define the path to the output directory containing registered images
output_image_path = output_dir_path / 'Par0009.affine'

# Initialize lists to store mutual information values and corresponding image names
mutual_information_df = []
index_list = []

# Iterate over registered image directories in the output directory
for reg_image_dir_path in tqdm(output_image_path.iterdir(), total=len(list(output_image_path.iterdir())), desc='Calculating Mutual Information'):
    reg_image_name = reg_image_dir_path.name
    index_list.append(reg_image_name)
    mutual_information_row = {}
    
    # Read the registered image
    reg_image = sitk.GetArrayFromImage(sitk.ReadImage(str(reg_image_dir_path / f'r_{reg_image_name}.nii.gz')))
    
    # Iterate over moving images in the registered image directory
    for moving_image_path in reg_image_dir_path.iterdir():
        # Skip files with .txt extension (assuming these are not images)
        if moving_image_path.name.endswith('txt'):
            continue
        
        # Read the moving image and calculate mutual information
        moving_image = sitk.GetArrayFromImage(sitk.ReadImage(str(moving_image_path)))
        # Store mutual information value in the dictionary
        mutual_information_row[(moving_image_path.stem).lstrip('reg_')] = utils_registration.mutual_information(reg_image, moving_image)
    
    # Append mutual information values for each image to the list
    mutual_information_df.append(mutual_information_row)

# Create a DataFrame using mutual information values and corresponding image names as index
mutual_information_df = pd.DataFrame(mutual_information_df, index=index_list)

Calculating Mutual Information:   0%|                                                           | 0/20 [00:00<?, ?it/s]


RuntimeError: Exception thrown in SimpleITK ReadImage: C:\dafne\SimpleElastix\Code\IO\src\sitkImageReaderBase.cxx:99:
sitk::ERROR: The file "D:\Lab_03\data\test-set\testing-output\Par0009.affine\1003.nii\r_1003.nii.nii.gz" does not exist.

In [None]:
mutual_information_df['fixed_image'] = mutual_information_df.index.values
plot_df = pd.melt(
    mutual_information_df,
    id_vars=['fixed_image'],
    value_vars=[i for i in mutual_information_df.columns if i!='fixed_image'], ignore_index=False
)
plot_df.columns = ['Fixed Image', 'Moving Image', 'Mutual Information']
plot_df = plot_df.loc[plot_df['Mutual Information'] < 1, :]

# Plot
plt.figure(figsize=(10, 4))
plt.title('Mutual Information Between Registered Images and Fixed Images [Test Dataset]')
sns.boxplot(plot_df, x='Fixed Image', y='Mutual Information')
plt.xticks(rotation=45)
sns.despine()
plt.show()

# Train Data Registration

In [10]:
# Define fixed image to use
fixed_image_path = train_images_dir_path / '1003.nii.gz'
fixed_image_id = fixed_image_path.name.rstrip('.nii.gz')

# Loop through each moving image for registration
for i, moving_image_path in tqdm(enumerate(train_images_dir_path.iterdir()), total=len(list(train_images_dir_path.iterdir())), desc='Registration Progressing'):
    
    # Get the moving image id
    moving_image_id = moving_image_path.name.rstrip('.nii.gz')
    
    # Get the path of the moving image
    moving_label_path = test_labels_dir_path / f'{moving_image_id}_3C.nii.gz'
    
    # Get the path of the moving mask
    moving_mask_path = test_masks_dir_path / f'{moving_image_id}_1C.nii.gz'

    # Define output directories for registration results
    output_images_path = output_dir_path / 'Par0009.affine' / 'training-images'
    output_labels_path = output_dir_path / 'Par0009.affine' / 'training-labels'
    output_masks_path  = output_dir_path / 'Par0009.affine' / 'training-mask'

    # Create output directories if they don't exist
    output_images_path.mkdir(exist_ok=True, parents=True)
    output_labels_path.mkdir(exist_ok=True, parents=True)
    output_masks_path.mkdir(exist_ok=True, parents=True)

    # Define paths for registered images, labels, and masks
    reg_image_path = output_images_path / f'r_{moving_image_id}.nii.gz'
    reg_label_path = output_labels_path / f'r_{moving_image_id}_3C.nii.gz'
    reg_mask_path  = output_masks_path  / f'r_{moving_image_id}_1C.nii.gz'

    # Don't register if the fixed image and moving image are same, just copy it
    if fixed_image_id == moving_image_id:
        shutil.copy(moving_image_path, reg_image_path)
        shutil.copy(moving_label_path, reg_label_path)
        shutil.copy(moving_mask_path,  reg_mask_path)
        continue

    # Read and modify parameters file
    reg_parameter_file_path = parameter_dir_path / reg_parameter
    field_value_pairs = [('ResultImageFormat', 'nii.gz'), ('FinalBSplineInterpolationOrder', '0.0')]
    utils_registration.modify_parameter(field_value_pairs, reg_parameter_file_path)

    # Register the moving image
    transform_map_path = utils_registration.elastix(fixed_image_path, moving_image_path, reg_image_path, reg_parameter_file_path)

    # Correct transformation parameters file
    utils_registration.modify_parameter(field_value_pairs, reg_parameter_file_path)

    # Transform labels using the obtained transformation
    utils_registration.transformix(moving_label_path, reg_label_path, transform_map_path)

    # Transform masks using the obtained transformation
    utils_registration.transformix(moving_mask_path, reg_mask_path, transform_map_path)


Registration Progressing: 100%|████████████████████████████████████████████████████████| 20/20 [06:41<00:00, 20.08s/it]


# Generate Atlas From Training Data

In [None]:
# Define the path of registarted image and labels
output_dir_path     = DATA_PATH / 'training-set' / 'training-output'/ 'Par0009.affine'
reg_image_dir_path  = output_dir_path / 'training-images'
reg_labels_dir_path = output_dir_path / 'training-labels'

# Define the path of the reference image
ref_image_path = reg_image_dir_path / 'r_1001.nii.gz'

# Read the reference image and convert into an array
ref_image = sitk.ReadImage(str(ref_image_path))  
ref_image_array = sitk.GetArrayFromImage(ref_image)

# Define the configuration for atlas
num_classes = 4
num_volumes = len(list(reg_labels_dir_path.iterdir()))
ref_image_shape = ref_image_array.shape
labels_keys = {0: 'background', 1: 'csf', 2: 'wm', 3: 'gm'}

# Create atlas accumulators
atlases     = np.zeros((num_classes, ref_image_shape[0], ref_image_shape[1], ref_image_shape[2])).astype('float32')
mean_volume = np.zeros_like(ref_image_array).astype('float32')

# Loop through each registered image for atlas creation
for i, image_path in tqdm(enumerate(reg_image_dir_path.iterdir()), total=len(list(reg_image_dir_path.iterdir()))):
    
    # If image has .txt extension, continue
    if image_path.name.endswith('txt'):
        continue
    
    # Get the id of the registered image and label
    image_id = image_path.name.rstrip('.nii.gz')
    label_id = f'{image_id}_3C.nii.gz'
    
    # Accumulate the mean of an registered image
    image = sitk.GetArrayFromImage(sitk.ReadImage(str(image_path)))
    image = utils.min_max_norm(image, 255).astype('uint8')
    mean_volume += image / num_volumes
    
    # Accumulate probability atlas image
    label = sitk.GetArrayFromImage(sitk.ReadImage(str(reg_labels_dir_path / label_id))) 
    for j in range(atlases.shape[0]):
        atlases[j, :, :, :] += np.where(label == j, 1, 0) / num_volumes

# Get topological atlas by voting
topological_atlas = np.argmax(atlases, axis=0)

# Save atlas results
atlas_output_path = DATA_PATH /"test-set"/ "atlases" / "Par0009.affine"
atlas_output_path.mkdir(exist_ok=True, parents=True)

for k in range(atlases.shape[0]):
    utils_registration.save_segementations(atlases[k, :, :, :], ref_image, atlas_output_path / f'p_atlas_{labels_keys[k]}.nii.gz')
    utils_registration.save_segementations(np.where(topological_atlas == k, 255, 0), ref_image, atlas_output_path / f't_atlas_{labels_keys[k]}.nii.gz')

utils_registration.save_segementations(topological_atlas, ref_image, atlas_output_path / 't_atlas.nii.gz')
utils_registration.save_segementations(mean_volume, ref_image, atlas_output_path / 'mean_volume.nii.gz')

In [None]:
MAIN_PATH = Path('../Lab_03/').resolve()
data_path = DATA_PATH / 'test-set' 

image_names = [
    'training-images/1001.nii.gz', 
    'Atlases/Par0009.affine/mean_volume.nii.gz', 
    'Atlases/Par0009.affine/p_atlas_csf.nii.gz',
    'Atlases/Par0009.affine/p_atlas_wm.nii.gz', 
    'Atlases/Par0009.affine/p_atlas_gm.nii.gz', 
    'Atlases/Par0009.affine/t_atlas.nii.gz']

titles = [
    'Reference', 
    'Mean Image', 
    'CSF Prob. Atlas',
    'WM Prob. Atlas', 
    'GM Prob. Atlas', 
    'Maj. Vot. Atlas']

utils_registration.complete_figure(data_path, image_names, titles)