In [None]:
from google.colab import drive
drive.mount('/content/drive')

FOLDERNAME = "Computer Vision Project/kits23"
assert FOLDERNAME is not None, "[!] Enter the foldername."

import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))

%cd /content/drive/My\ Drive/$FOLDERNAME

Mounted at /content/drive
/content/drive/.shortcut-targets-by-id/1tnt3cKRdWn9lFKrM82KlrZH6CO5EgchD/Computer Vision Project/kits23


In [None]:
!pip install nibabel

import numpy as np
from scipy.ndimage import zoom
from tqdm import tqdm
import os
from os.path import join, dirname, basename, exists
import pandas as pd
from sklearn.model_selection import train_test_split
import nibabel as nib
import matplotlib.pyplot as plt

%matplotlib inline



In [None]:
def calculate_intensity_stats(imaging_paths, segmentation_paths):
    intensities = []

    for img_path, seg_path in zip(imaging_paths, segmentation_paths):
        img = nib.load(img_path).get_fdata()
        seg = nib.load(seg_path).get_fdata()

        # Extract foreground intensities
        foreground = img[seg > 0]
        intensities.extend(foreground)

    intensities = np.array(intensities)

    p_min = np.percentile(intensities, 0.5)
    p_max = np.percentile(intensities, 99.5)
    mean = np.mean(intensities)
    std = np.std(intensities)

    return p_min, p_max, mean, std

In [None]:
p_min, p_max, mean, std = calculate_intensity_stats(imaging_paths, segmentation_paths)

print(f"p_min: {p_min}")
print(f"p_max: {p_max}")
print(f"Mean: {mean}")
print(f"Standard Deviation: {std}")

In [None]:
df = pd.read_csv('../dataset.csv')
df = df[df['split'] == 'train']
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df["label_column"] if "label_column" in df.columns else None)


In [None]:
csv_path = '../dataset.csv'
df = pd.read_csv(csv_path)

train_df = df[df['split'] == 'train']
val_df = df[df['split'] == 'val']
test_df = df[df['split'] == 'test']

# preprocessed_train_df = pd.read_csv('../preprocessed_train.csv')
# preprocessed_val_df = pd.read_csv('../preprocessed_val.csv')
# preprocessed_test_df = pd.read_csv('../preprocessed_test.csv')

# imaging_paths_train = train_df[~train_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_train_df['case'])]['image'].tolist()
# segmentation_paths_train = train_df[~train_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_train_df['case'])]['label'].tolist()
imaging_paths_train = train_df['image_path'].tolist()
segmentation_paths_train = train_df['segment_path'].tolist()
imaging_paths_train = imaging_paths_train[:60]
segmentation_paths_train = segmentation_paths_train[:60]

# imaging_paths_val = val_df[~val_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_val_df['case'])]['image'].tolist()
# segmentation_paths_val = val_df[~val_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_val_df['case'])]['label'].tolist()
imaging_paths_val = val_df['image_path'].tolist()
segmentation_paths_val = val_df['segment_path'].tolist()

# imaging_paths_test = test_df[~test_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_test_df['case'])]['image'].tolist()
# segmentation_paths_test = test_df[~test_df['segment_path'].apply(lambda x: x.split('/')[1]).isin(preprocessed_test_df['case'])]['label'].tolist()
imaging_paths_test = test_df['image_path'].tolist()
segmentation_paths_test = test_df['segment_path'].tolist()


In [None]:
print(len(imaging_paths_train))
print(len(segmentation_paths_train))
print(imaging_paths_train[:2])
print(segmentation_paths_train[:2])

60
60
['dataset/case_00108/imaging.nii.gz', 'dataset/case_00581/imaging.nii.gz']
['dataset/case_00108/segmentation.nii.gz', 'dataset/case_00581/segmentation.nii.gz']


In [None]:
print(len(imaging_paths_val))
print(len(segmentation_paths_val))
print(imaging_paths_val[:2])
print(segmentation_paths_val[:2])


79
79
['dataset/case_00449/imaging.nii.gz', 'dataset/case_00446/imaging.nii.gz']
['dataset/case_00449/segmentation.nii.gz', 'dataset/case_00446/segmentation.nii.gz']


In [None]:
print(len(imaging_paths_test))
print(len(segmentation_paths_test))
print(imaging_paths_test[:5])
print(segmentation_paths_test[:5])

98
98
['dataset/case_00551/imaging.nii.gz', 'dataset/case_00084/imaging.nii.gz', 'dataset/case_00534/imaging.nii.gz', 'dataset/case_00572/imaging.nii.gz', 'dataset/case_00528/imaging.nii.gz']
['dataset/case_00551/segmentation.nii.gz', 'dataset/case_00084/segmentation.nii.gz', 'dataset/case_00534/segmentation.nii.gz', 'dataset/case_00572/segmentation.nii.gz', 'dataset/case_00528/segmentation.nii.gz']


In [None]:
def load_ct_image(file_path):
    image = nib.load(file_path)
    ct_array = image.get_fdata()
    spacing = image.header.get_zooms()[:3]
    return ct_array, spacing

def resample_image(image, original_spacing, target_spacing):
    resize_factors = [os / ts for os, ts in zip(original_spacing, target_spacing)]
    return zoom(image, resize_factors, order=3)  # Cubic interpolation

def resample_segmentation(segmentation, original_spacing, target_spacing):
    resize_factors = [os / ts for os, ts in zip(original_spacing, target_spacing)]
    return zoom(segmentation, resize_factors, order=0)  # Nearest neighbor interpolation

def crop_to_nonzero(image, segmentation):
    nonzero_coords = np.argwhere(image > 0)
    min_coords = nonzero_coords.min(axis=0)
    max_coords = nonzero_coords.max(axis=0) + 1

    cropped_image = image[min_coords[0]:max_coords[0],
                          min_coords[1]:max_coords[1],
                          min_coords[2]:max_coords[2]]
    cropped_segmentation = segmentation[min_coords[0]:max_coords[0],
                                        min_coords[1]:max_coords[1],
                                        min_coords[2]:max_coords[2]]
    return cropped_image, cropped_segmentation

def ct_normalize(image, clip_range=(-61, 305), mean=103, std=75.17):
    image = np.clip(image, clip_range[0], clip_range[1])
    normalized_image = (image - mean) / std
    return normalized_image

In [None]:
def extract_random_patches(image, segment, patch_size=(128, 128, 128), num_patches=10, foreground_ratio=0.4):
    patches = []
    ground_truths = []
    x_max, y_max, z_max = image.shape
    x_patch, y_patch, z_patch = patch_size

    num_foreground_patches = int(num_patches * foreground_ratio)
    num_background_patches = num_patches - num_foreground_patches

    foreground_indices = np.argwhere(segment >= 2)
    while len(patches) < num_foreground_patches and len(foreground_indices) > 0:
        idx = np.random.randint(0, len(foreground_indices))
        x, y, z = foreground_indices[idx]

        # Define the patch start and end coordinates
        x_start = max(0, x - x_patch // 2)
        y_start = max(0, y - y_patch // 2)
        z_start = max(0, z - z_patch // 2)
        x_end = min(x_start + x_patch, x_max)
        y_end = min(y_start + y_patch, y_max)
        z_end = min(z_start + z_patch, z_max)

        # Adjust start coordinates if the patch size is out of bounds
        x_start = x_end - x_patch if x_end - x_start < x_patch else x_start
        y_start = y_end - y_patch if y_end - y_start < y_patch else y_start
        z_start = z_end - z_patch if z_end - z_start < z_patch else z_start

        patch = image[x_start:x_end, y_start:y_end, z_start:z_end]
        ground_truth = segment[x_start:x_end, y_start:y_end, z_start:z_end]

        if patch.shape == patch_size and ground_truth.shape == patch_size:
            patches.append(patch)
            ground_truths.append(ground_truth)

    while len(patches) < num_patches:
        x = np.random.randint(0, max(x_max - x_patch, 1))
        y = np.random.randint(0, max(y_max - y_patch, 1))
        z = np.random.randint(0, max(z_max - z_patch, 1))
        patch = image[x:x + x_patch, y:y + y_patch, z:z + z_patch]
        ground_truth = segment[x:x + x_patch, y:y + y_patch, z:z + z_patch]

        # Ensure the patch does not contain foreground voxels
        if patch.shape == patch_size and ground_truth.shape == patch_size:
            patches.append(patch)
            ground_truths.append(ground_truth)

    return patches, ground_truths

In [None]:
import torch
import torch.nn.functional as F

def preprocessed_to_tensor(image, label):
    image_tensor = torch.tensor(image, dtype=torch.float32).unsqueeze(0)
    label_tensor = torch.tensor(label, dtype=torch.long)

    return image_tensor, label_tensor

In [None]:
def preprocess_to_patches(image, segmentation, patch_size, num_patches, output_data_dir, output_label_dir, csv_path, case_id, df):
    image_patches, segmentation_patches = extract_random_patches(image, segmentation, patch_size, num_patches)

    for i, (img_patch, seg_patch) in enumerate(zip(image_patches, segmentation_patches)):
        img_patch, seg_patch = preprocessed_to_tensor(img_patch, seg_patch)
        output_data_path = os.path.join(output_data_dir, f"image_{case_id}_{i}.pth")
        output_label_path = os.path.join(output_label_dir, f"segment_{case_id}_{i}.pth")
        df = pd.concat([df, pd.DataFrame({'image': [output_data_path], 'label': [output_label_path], 'case': [case_id]})], ignore_index=True)
        torch.save(img_patch, output_data_path)
        torch.save(seg_patch, output_label_path)

    df.to_csv(csv_path, index=False)

In [None]:
def preprocess(file_paths, seg_paths, train, target_spacing, output_data_dir, output_label_dir, csv_path):
    if not exists(output_data_dir):
            os.makedirs(output_data_dir)
    if not exists(output_label_dir):
            os.makedirs(output_label_dir)

    if not exists(csv_path):
        df = pd.DataFrame(columns=['image', 'label', 'case'])
    else:
        df = pd.read_csv(csv_path)

    for idx, (ct_path, seg_path) in enumerate(tqdm(zip(file_paths, seg_paths))):
        ct_array, original_spacing = load_ct_image(ct_path)
        segmentation, _ = load_ct_image(seg_path)
        case_id = ct_path.split('/')[1]

        # Resample image and segmentation
        resampled_image = resample_image(ct_array, original_spacing, target_spacing)
        resampled_segmentation = resample_segmentation(segmentation, original_spacing, target_spacing)

        # Normalize the image
        normalized_image = ct_normalize(resampled_image)

        if train:
          preprocess_to_patches(normalized_image, resampled_segmentation, (128, 128, 128), 10, output_data_dir, output_label_dir, csv_path, case_id, df)
        else:
          # Convert To Tensor
          normalized_image, resampled_segmentation = preprocessed_to_tensor(normalized_image, resampled_segmentation)

          # Save the images
          output_data_path = os.path.join(output_data_dir, f"image_{case_id}.pth")
          output_label_path = os.path.join(output_label_dir, f"segment_{case_id}.pth")
          df = pd.concat([df, pd.DataFrame({'image': [output_data_path], 'label': [output_label_path], 'case': [case_id]})], ignore_index=True)
          torch.save(normalized_image, output_data_path)
          torch.save(resampled_segmentation, output_label_path)
          df.to_csv(csv_path, index=False)

In [None]:
preprocess(
    imaging_paths_train,
    segmentation_paths_train,
    train=True,
    target_spacing=(0.78, 0.78, 1.0),
    output_data_dir='../preprocessed_image_train_2nd',
    output_label_dir='../preprocessed_segment_train_2nd',
    csv_path='../preprocessed_train_1.csv',
    )

# preprocess(
#     imaging_paths_train,
#     segmentation_paths_train,
#     train=True,
#     target_spacing=(0.78, 0.78, 1.0),
#     output_data_dir='../preprocessed_image_train',
#     output_label_dir='../preprocessed_segment_train',
#     csv_path='../preprocessed_train_2.csv',
#     )

# preprocess(
#     imaging_paths_train,
#     segmentation_paths_train,
#     train=True,
#     target_spacing=(0.78, 0.78, 1.0),
#     output_data_dir='../preprocessed_image_train',
#     output_label_dir='../preprocessed_segment_train',
#     csv_path='../preprocessed_train_3.csv',
#     )

# preprocess(
#     imaging_paths_val,
#     segmentation_paths_val,
#     train=False,
#     target_spacing=(0.78, 0.78, 1.0),
#     output_data_dir='../preprocessed_image_val',
#     output_label_dir='../preprocessed_segment_val',
#     csv_path='../preprocessed_val.csv',
#     )
# preprocess(
#     imaging_paths_test,
#     segmentation_paths_test,
#     train=False,
#     target_spacing=(0.78, 0.78, 1.0),
#     output_data_dir='../preprocessed_image_test',
#     output_label_dir='../preprocessed_segment_test',
#     csv_path='../preprocessed_test.csv',
#     )

60it [1:20:44, 80.74s/it]
