## New Version for PreProcessing

In [2]:
import os
import numpy as np
import nibabel as nib
from scipy.ndimage import zoom
from sklearn.model_selection import train_test_split

# Step 1: Function to load NIfTI files
def load_nii_file(file_path):
    img = nib.load(file_path)
    return img.get_fdata()

# Step 2: Function to resize 2D images to target shape
def resize_image(image, target_shape):
    factors = (target_shape[0] / image.shape[0], target_shape[1] / image.shape[1])
    return zoom(image, factors, order=1)

# Step 3: Save data as .npy files
def save_as_npy(data, save_path):
    np.save(save_path, data)

# Step 4: Apply mask to MRI and CT slices
def apply_mask(mr_slice, ct_slice, mask_slice):
    masked_mr = np.where(mask_slice > 0, mr_slice, 0)  # Keep only the masked area
    masked_ct = np.where(mask_slice > 0, ct_slice, 0)  # Keep only the masked area
    return masked_mr, masked_ct

# Step 5: Process dataset
def process_dataset(base_dir, output_dir):
    train_input, train_output = [], []
    val_input, val_output = [], []
    test_input, test_output = [], []

    target_shape = (256, 256)

    # Collect folders for processing
    anatomy_dir = os.path.join(base_dir, "./Task1/brain")
    patient_folders = [os.path.join(anatomy_dir, folder) for folder in os.listdir(anatomy_dir) if os.path.isdir(os.path.join(anatomy_dir, folder))]

    # Split folders into learning (train+val) and test sets
    num_test_folders = max(1, round(0.1 * len(patient_folders)))  # Approx. 10% for testing
    learning_folders, test_folders = train_test_split(patient_folders, test_size=num_test_folders, random_state=42)

    # Process test folders
    test_input_dir = os.path.join(output_dir, "Test_Patients_InputData")
    test_output_dir = os.path.join(output_dir, "Test_Patients_OutputData")
    os.makedirs(test_input_dir, exist_ok=True)
    os.makedirs(test_output_dir, exist_ok=True)

    for folder in test_folders:
        print("|", end="")
        mr_path = os.path.join(folder, "mr.nii.gz")
        ct_path = os.path.join(folder, "ct.nii.gz")
        mask_path = os.path.join(folder, "union_mask.nii.gz")

        if os.path.exists(mr_path) and os.path.exists(ct_path) and os.path.exists(mask_path):
            mr_img, ct_img, mask_img = load_nii_file(mr_path), load_nii_file(ct_path), load_nii_file(mask_path)
            num_slices = mr_img.shape[2]

            for slice_idx in range(num_slices):
                mr_slice = resize_image(mr_img[:, :, slice_idx], target_shape)
                ct_slice = resize_image(ct_img[:, :, slice_idx], target_shape)
                mask_slice = resize_image(mask_img[:, :, slice_idx], target_shape)

                masked_mr, masked_ct = apply_mask(mr_slice, ct_slice, mask_slice)
                test_input.append(masked_mr)
                test_output.append(masked_ct)

            # Save test data per folder
            patient_id = os.path.basename(folder)
            save_as_npy(np.array(test_input), os.path.join(test_input_dir, f"{patient_id}_input.npy"))
            save_as_npy(np.array(test_output), os.path.join(test_output_dir, f"{patient_id}_output.npy"))
            test_input, test_output = [], []  # Clear for next folder

    # Process learning folders (train + validation)
    input_data, output_data = [], []
    for folder in learning_folders:
        print(".", end="")
        mr_path = os.path.join(folder, "mr.nii.gz")
        ct_path = os.path.join(folder, "ct.nii.gz")
        mask_path = os.path.join(folder, "union_mask.nii.gz")

        if os.path.exists(mr_path) and os.path.exists(ct_path) and os.path.exists(mask_path):
            mr_img, ct_img, mask_img = load_nii_file(mr_path), load_nii_file(ct_path), load_nii_file(mask_path)
            num_slices = mr_img.shape[2]

            for slice_idx in range(num_slices):
                mr_slice = resize_image(mr_img[:, :, slice_idx], target_shape)
                ct_slice = resize_image(ct_img[:, :, slice_idx], target_shape)
                mask_slice = resize_image(mask_img[:, :, slice_idx], target_shape)

                masked_mr, masked_ct = apply_mask(mr_slice, ct_slice, mask_slice)
                input_data.append(masked_mr)
                output_data.append(masked_ct)


    # Split into train and validation sets
    train_input, val_input, train_output, val_output = train_test_split(
        input_data, output_data, test_size=0.1, random_state=42)

    # Save train and validation data
    save_as_npy(np.array(train_input), os.path.join(output_dir, "train_input.npy"))
    save_as_npy(np.array(train_output), os.path.join(output_dir, "train_output.npy"))
    save_as_npy(np.array(val_input), os.path.join(output_dir, "val_input.npy"))
    save_as_npy(np.array(val_output), os.path.join(output_dir, "val_output.npy"))

    print("\nProcessing completed.")
    print(f"Train set: {len(train_input)} pairs")
    print(f"Validation set: {len(val_input)} pairs")
    print(f"Test folders: {len(test_folders)} folders")

# Step 6: Execute processing
if __name__ == "__main__":
    base_dir = "./"  # Replace with your dataset directory
    output_dir = os.path.join(base_dir, "data(brain)")
    os.makedirs(output_dir, exist_ok=True)

    process_dataset(base_dir, output_dir)


||||................................
Processing completed.
Train set: 5849 pairs
Validation set: 650 pairs
Test folders: 4 folders


## Old Version for Preprocessing data

In [None]:
import os
import numpy as np
import nibabel as nib
from scipy.ndimage import zoom
from sklearn.model_selection import train_test_split

def load_nii_file(file_path):
    img = nib.load(file_path)
    return img

def resize_image(image, target_shape):
    factors = (target_shape[0] / image.shape[0], target_shape[1] / image.shape[1])
    return zoom(image, factors, order=1)

def save_as_npy(data, save_path):
    np.save(save_path, data)

def process_slice(img, slice_idx, target_shape):
    slice_data = img.dataobj[..., slice_idx]
    resized_slice = resize_image(slice_data, target_shape)
    return resized_slice

def process_dataset(base_dir, task, anatomy):
    input_data = []
    output_data = []

    target_shape = (256, 256)

    for root, dirs, files in os.walk(os.path.join(base_dir, task, anatomy)):
        for file in files:
            if file.endswith('mr.nii.gz'):
                mr_path = os.path.join(root, file)
                ct_path = os.path.join(root, file.replace('mr.nii.gz', 'ct.nii.gz'))

                mr_img = load_nii_file(mr_path)
                ct_img = load_nii_file(ct_path)

                num_slices = mr_img.shape[2]

                # Process each slice individually
                for slice_idx in range(num_slices):
                    mr_slice_resized = process_slice(mr_img, slice_idx, target_shape)
                    ct_slice_resized = process_slice(ct_img, slice_idx, target_shape)

                    input_data.append(mr_slice_resized)
                    output_data.append(ct_slice_resized)

    input_data = np.array(input_data)
    output_data = np.array(output_data)

    # Split the data into train, validation, and test sets
    train_input, temp_input, train_output, temp_output = train_test_split(
        input_data, output_data, test_size=0.2, random_state=42)
    val_input, test_input, val_output, test_output = train_test_split(
        temp_input, temp_output, test_size=0.25, random_state=42)

    # Save the split data as .npy files
    save_as_npy(train_input, f'data(brain)/train_input.npy')
    save_as_npy(train_output, f'data(brain)/train_output.npy')
    save_as_npy(val_input, f'data(brain)/val_input.npy')
    save_as_npy(val_output, f'data(brain)/val_output.npy')
    save_as_npy(test_input, f'data(brain)/test_input.npy')
    save_as_npy(test_output, f'data(brain)/test_output.npy')

    # Count the number of images in each split
    num_train = len(train_input)
    num_val = len(val_input)
    num_test = len(test_input)

    # Print the counts
    print(f"Number of MRI-CT pairs in training set: {num_train}")
    print(f"Number of MRI-CT pairs in validation set: {num_val}")
    print(f"Number of MRI-CT pairs in testing set: {num_test}")

# Define the base directory where the dataset is extracted
base_dir = './'

# Create the output directory if it doesn't exist
output_dir = os.path.join(base_dir, 'data(brain)')
os.makedirs(output_dir, exist_ok=True)

# Process the dataset
process_dataset(base_dir, 'Task1', 'brain')