In [5]:
import pydicom
import torch
from model import RegressionModel
from sklearn.preprocessing import StandardScaler
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from numpy import load
import nibabel as nib 
import os
import math

In [6]:
#Path of the ct scan
path = 'ct_scans/chest_ct/'

In [None]:
#read the dicom examination and sort the slices based on the slice location
def read_dicom(path):

    slices = [pydicom.dcmread(path + '/' + s, force=True) for s in os.listdir(path)]
    slices.sort(key = lambda x: float(x.ImagePositionPatient[2]))
    return slices

dicom_image_serie = read_dicom(path)

#convert the dicom image to a numpy array
def dicom_to_array(slices):
        
    image = np.stack([s.pixel_array for s in slices])
    image = image.astype(np.int16)
    return image

dicom_image_array = dicom_to_array(dicom_image_serie)

#converting pixel values to Hounsfield units
def convert_to_hu(dicom_image_serie, dicom_image_array):
    intercept = dicom_image_serie[0].RescaleIntercept
    slope = dicom_image_serie[0].RescaleSlope
    hu_image = dicom_image_array * slope + intercept
    return hu_image

hu_image = convert_to_hu(dicom_image_serie, dicom_image_array)

#extracting mA values
mA = [s.XRayTubeCurrent for s in dicom_image_serie]
mean_mA = np.mean(mA)

#extracting slice location
slice_location = [s.SliceLocation for s in dicom_image_serie]

#creating organ masks
os.makedirs('inference/organ_masks', exist_ok=True)

os.system('TotalSegmentator -i %s -o %s' %(path, 'inference/organ_masks/'))
os.system('TotalSegmentator -i %s -o %s -ta body' % (path, 'inference/organ_masks/'))
os.system('TotalSegmentator -i %s -o %s -ta lung_vessels' % (path, 'inference/organ_masks/'))

#WED calculation

#voxel x and y dimensions
x_dim = float(dicom_image_serie[0].PixelSpacing[0])
y_dim = float(dicom_image_serie[0].PixelSpacing[1])

body_mask = np.rot90(nib.load('inference/organ_masks/body.nii.gz').get_fdata())

#transose the array to match the dimensions of the hu_image
body_mask = np.transpose(body_mask, (2, 0, 1))
body_mask[body_mask != 1] = np.nan

wed_array = []

for j in range(0, len(dicom_image_serie)):
    pix_number = np.count_nonzero(body_mask[j] == 1)
    area = pix_number*x_dim*y_dim
    croped = hu_image[j]*body_mask[j]
    mean_HU = np.nanmean(croped)
    wed = 2*(math.sqrt(((mean_HU/1000)+1)*(area/math.pi)))
    wed_array.append(wed)

#inference array creation

def inference_array(organ):
    mask = np.rot90(nib.load('inference/organ_masks/' + organ + '.nii.gz').get_fdata())
    mask = np.transpose(mask, (2, 0, 1))
    mask[mask != 1] = np.nan

    #retrieving the organ's HU values
    masked_organ = hu_image*mask

    organ_training_array = np.empty((len(dicom_image_serie), 5), dtype=np.float32)
    idx = []
    for j in range(len(dicom_image_serie)):
        if np.all(np.isnan(masked_organ[j])):
            idx.append(j)
        else:
            organ_training_array[j][0] = np.nanmean(masked_organ[j])
            organ_training_array[j][1] = np.nanstd(masked_organ[j])
            organ_training_array[j][2] = float(mA[j])
            organ_training_array[j][3] = wed_array[j]
            organ_training_array[j][4] = float(slice_location[j])

    clean_organ_training_array = np.delete(organ_training_array, idx, axis=0)

    #save the inference array
    os.makedirs('inference/inference_arrays/' + organ, exist_ok=True)
    file_name = f'inference/inference_arrays/{organ}/{organ}_inference_array.npy'
    np.save(file_name, clean_organ_training_array)
    


organs_list = ['esophagus', 'aorta', 'pulmonary_artery', 'lung_lower_lobe_left', 'lung_lower_lobe_right',
                'lung_upper_lobe_left', 'lung_upper_lobe_right', 'lung_middle_lobe_right', 
                'heart_atrium_left', 'heart_atrium_right', 'heart_ventricle_left', 'heart_ventricle_right', 
                'heart_myocardium', 'skin', 'lung', 'lung_vessels', 'lung_trachea_bronchia', 'thyroid_gland']


for organ in organs_list:
    inference_array(organ)

In [None]:
#dose prediction

#model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

hidden_sizes = [512, 256, 128, 64]
model = RegressionModel(input_size=5, hidden_sizes=hidden_sizes, output_size=1, num_layers=5, dropout=0.1)

print('Predicted dose for the organs of interest' )
print('-----------------------------------------')

def evaluate(organ):

    #model loading for the organ
    model.load_state_dict(torch.load('chest_models/'+ organ + '_model.pth'))
    model.to(device)

    #scaler for the organ
    data = load('training_arrays/chest_merged_training_arrays/'+ organ + '_training_array.npy')

    X = data[:,:-1]

    # Find rows containing NaN values in X
    nan_rows_X = np.isnan(X).any(axis=1)

    # Drop rows with NaN values from X 
    X = X[~nan_rows_X]

    # fit scaler 
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    
    X_test  = load(f'inference/inference_arrays/{organ}/{organ}_inference_array.npy')
    
    # Find rows containing NaN values in X
    nan_rows_X_test = np.isnan(X_test).any(axis=1)

    # Drop rows with NaN values from X and y
    X_test = X_test[~nan_rows_X_test]
    
    #scaling
    X_test = scaler.transform(X_test)

    #data to tensor
    X_test = torch.tensor(X_test, dtype=torch.float32).to(device)

    #evaluate
    with torch.no_grad():
        model.eval()
        predictions = model(X_test).cpu().numpy()


    print(f'{organ} : {np.mean(predictions):.1f} mGy')   

organs_list = ['esophagus', 'aorta', 'pulmonary_artery', 'lung_lower_lobe_left', 'lung_lower_lobe_right',
                'lung_upper_lobe_left', 'lung_upper_lobe_right', 'lung_middle_lobe_right', 
                'heart_atrium_left', 'heart_atrium_right', 'heart_ventricle_left', 'heart_ventricle_right', 
                'heart_myocardium', 'skin', 'lung', 'lung_vessels', 'lung_trachea_bronchia', 'thyroid_gland']


for organ in organs_list:
    evaluate(organ)