Process testing data

In [None]:
# %% [markdown]
# # Images

# %% [markdown]
# ## Reading

# %%
import os
import zipfile
import numpy as np
import tensorflow as tf  # for data preprocessing
import nibabel as nib
from scipy import ndimage

import keras
from keras import layers
import gc
from tensorflow.keras import Input, Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense

# AFTER:
from tensorflow.keras.callbacks import ModelCheckpoint
import os


# tf.enable_eager_execution()


# %%
def read_nifti_file(filepath):
    """Read and load volume"""
    # Read file
    try:
        scan = nib.load(filepath)
        # Get raw data
        scan = scan.get_fdata()
        return scan
    except Exception as e:
        print(e)
        return None


# %%
def normalize(volume):
    """Normalize the volume"""
    min = -1000
    max = 400
    volume[volume < min] = min
    volume[volume > max] = max
    volume = (volume - min) / (max - min)
    volume = volume.astype("float32")
    return volume


def resize_volume(img):
    """Resize across z-axis"""
    # Set the desired depth
    desired_depth = 250
    desired_width = 350
    desired_height = 350
    # Get current depth
    current_depth = img.shape[-1]
    current_width = img.shape[0]
    current_height = img.shape[1]
    # Compute depth factor
    depth = current_depth / desired_depth
    width = current_width / desired_width
    height = current_height / desired_height
    depth_factor = 1 / depth
    width_factor = 1 / width
    height_factor = 1 / height
    # Rotate
    # img = ndimage.rotate(img, 90, reshape=False)
    # Resize across z-axis
    img = ndimage.zoom(img, (width_factor, height_factor, depth_factor), order=1)
    img = img.astype(np.float16)
    return img


def process_scan(scan):
    """Read and resize volume"""
    # Normalize
    volume = normalize(scan)
    # Resize width, height and depth
    volume = resize_volume(volume)
    return volume


scan_paths_main = [
    os.path.join("/d/hpc/projects/training/RIS/data/RIS", x)
    for x in os.listdir("/d/hpc/projects/training/RIS/data/RIS")
]

print("all paths: " + str(len(scan_paths_main)))
scan_paths_main = sorted(scan_paths_main)

scan_paths = scan_paths_main
print("used paths: " + str(len(scan_paths_main)))
combined_scans_main = []
pos_indices_main = []


# %%
# Scan masks to sort between positive and negative samples
mask_scans_list = [read_nifti_file(path + "/MASK.nii.gz") for path in scan_paths]
ok_idx_mask = [i for i, x in enumerate(mask_scans_list) if x is not None]
ct_scans_list = [read_nifti_file(path + "/CT.nii.gz") for path in scan_paths]
ok_idx_ct = [i for i, x in enumerate(ct_scans_list) if x is not None]
pet_scans_list = [read_nifti_file(path + "/PET.nii.gz") for path in scan_paths]
ok_idx_pet = [i for i, x in enumerate(pet_scans_list) if x is not None]

# Filter for corrupted files
ok_indices = set(ok_idx_mask) & set(ok_idx_ct) & set(ok_idx_pet)
print(ok_indices)

ct_scans_list = [ct_scans_list[i] for i in ok_indices]
mask_scans_list = [mask_scans_list[i] for i in ok_indices]
pet_scans_list = [pet_scans_list[i] for i in ok_indices]

# %%
# find positive and negative samples: if the mask has any pixel > 0 it is a positive sample
pos_indices = np.where([np.max(mask) > 0 for mask in mask_scans_list])[0]
print(pos_indices)
pos_indices_main.extend(pos_indices)
del mask_scans_list

# %% [markdown]
# ## Processing

# %%
# mask_scans_processed = np.array([process_scan(scan) for scan in mask_scans_list])
ct_scans_processed = np.array([process_scan(scan) for scan in ct_scans_list])
combined_scans = 0.02 * ct_scans_processed
del ct_scans_processed
del ct_scans_list

pet_scans_processed = np.array([process_scan(scan) for scan in pet_scans_list])
combined_scans += 0.98 * pet_scans_processed
del pet_scans_processed
del pet_scans_list
gc.collect()