# **Setup the Environment**

In [None]:
!pip install nilearn==0.5.2 niwidgets==0.2.2 numpy-stl==2.11.2
!pip install open3d k3d
!pip3 install ipympl

**After succesful installation, restart and clear cell outputs.**
Comment the above lines, hit run all

# **Import all dependencies**

In [None]:
import os, glob
import nibabel as nib
import numpy as np
import matplotlib.pyplot as plt
import cv2
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator 
from tensorflow.keras.models import load_model
from tensorflow import keras
import shutil, pathlib, fnmatch
import PIL
from niwidgets import NiftiWidget
import numpy as np
import open3d as o3d
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
from skimage import measure
from stl import mesh
from sklearn.metrics import confusion_matrix

# **Pre-processing**

In [None]:
# Load image and see max min Hounsfield units
imgPath = os.path.join(imagePathInput, "coronacases_org_002.nii")
img = nib.load(imgPath).get_fdata()
np.min(img), np.max(img), img.shape, type(img)

In [None]:
# CONSTANTS!!!

# STEP 1 - Load and visualize data
dataInputPath = '../input/covid19-ct-scans'
imagePathInput = os.path.join(dataInputPath, 'ct_scans/')
maskPathInput = os.path.join(dataInputPath, 'infection_mask/')

dataOutputPath = './slices/'
imageSliceOutput = os.path.join(dataOutputPath, 'img/')
maskSliceOutput = os.path.join(dataOutputPath, 'mask/')
os.makedirs(imageSliceOutput,mode = 0o666)
os.makedirs(maskSliceOutput,mode = 0o666)
# STEP 2 - Image normalization
HOUNSFIELD_MIN = -1000
HOUNSFIELD_MAX = 2000
HOUNSFIELD_RANGE = HOUNSFIELD_MAX - HOUNSFIELD_MIN

# STEP 3 - Slicing and saving
SLICE_X = True
SLICE_Y = True
SLICE_Z = True

SLICE_DECIMATE_IDENTIFIER = 3

In [None]:
# Load image mask and see max min Hounsfield units
maskPath = os.path.join(maskPathInput, "coronacases_002.nii")
mask = nib.load(maskPath).get_fdata()
np.min(mask), np.max(mask), mask.shape, type(mask)

In [None]:
# Show image slice
imgSlice = mask[150,:,:]
plt.imshow(imgSlice, cmap='gray')
plt.show()

In [None]:
# Normalize image
def normalizeImageIntensityRange(img):
    img[img < HOUNSFIELD_MIN] = HOUNSFIELD_MIN
    img[img > HOUNSFIELD_MAX] = HOUNSFIELD_MAX
    return (img - HOUNSFIELD_MIN) / HOUNSFIELD_RANGE

nImg = normalizeImageIntensityRange(img)
np.min(nImg), np.max(nImg), nImg.shape, type(nImg)

In [None]:
# Read image or mask volume
def readImageVolume(imgPath, normalize=False):
    img = nib.load(imgPath).get_fdata()
    if normalize:
        return normalizeImageIntensityRange(img)
    else:
        return img
    
readImageVolume(imgPath, normalize=True)
readImageVolume(maskPath, normalize=False)

In [None]:
# Save volume slice to file
def saveSlice(img, fname, path):
    print("image shape:",img.shape) 
    img = np.uint8(img * 255)
    fout = os.path.join(path, f'{fname}.png')
    cv2.imwrite(fout, img)
    print(f'[+] Slice saved: {fout}', end='\r')
    
# saveSlice(nImg[20,:,:], 'test', imageSliceOutput)
# saveSlice(mask[20,:,:], 'test', maskSliceOutput)

In [None]:
# Slice image in all directions and save
def sliceAndSaveVolumeImage(vol, fname, path):
    (dimx, dimy, dimz) = vol.shape
    print(dimx, dimy, dimz)
    cnt = 0
    if SLICE_X:
        cnt += dimx
        print('Slicing X: ')
        for i in range(dimx):
            saveSlice(vol[i,:,:], fname+f'-slice{str(i).zfill(SLICE_DECIMATE_IDENTIFIER)}_x', path)
            
    if SLICE_Y:
        cnt += dimy
        print('Slicing Y: ')
        for i in range(dimy):
            saveSlice(vol[:,i,:], fname+f'-slice{str(i).zfill(SLICE_DECIMATE_IDENTIFIER)}_y', path)
            
    if SLICE_Z:
        cnt += dimz
        print('Slicing Z: ')
        for i in range(dimz):
            saveSlice(vol[:,:,i], fname+f'-slice{str(i).zfill(SLICE_DECIMATE_IDENTIFIER)}_z', path)
    return cnt

In [None]:
# Read and process image volumes
for index, filename in enumerate(sorted(glob.iglob(imagePathInput+'coronacases*.nii'))):
    img = readImageVolume(filename, True)
    print(filename, img.shape, np.sum(img.shape), np.min(img), np.max(img))
    numOfSlices = sliceAndSaveVolumeImage(img, 'lungs'+str(index), imageSliceOutput)
    print(f'\n{filename}, {numOfSlices} slices created \n')

In [None]:
# Read and process image mask volumes
for index, filename in enumerate(sorted(glob.iglob(maskPathInput+'coronacases*.nii'))):
    img = readImageVolume(filename, False)
    print(filename, img.shape, np.sum(img.shape), np.min(img), np.max(img))
    numOfSlices = sliceAndSaveVolumeImage(img, 'lungs'+str(index), maskSliceOutput)
    print(f'\n{filename}, {numOfSlices} slices created \n')

In [None]:
count=0
for filename in os.listdir("./slices/mask"):
    count+=1
count*2

# **Lung Segmentation using U-net**

Organizing data in appropriate train & test folders

In [None]:
dataPath = './'
traindataSliceOutput = os.path.join(dataPath, 'training/img/img/')
trainmaskSliceOutput = os.path.join(dataPath, 'training/mask/mask/')
testdataSliceOutput = os.path.join(dataPath, 'test/img/img/')
testmaskSliceOutput = os.path.join(dataPath, 'test/mask/mask/')


In [None]:
#function to move data from source to destination using string regex
def move_dir(src: str, dst: str, pattern: str = '*'):
    if not os.path.isdir(dst):
        pathlib.Path(dst).mkdir(parents=True, exist_ok=True)
    for f in fnmatch.filter(os.listdir(src), pattern):
        shutil.move(os.path.join(src, f), os.path.join(dst, f))

In [None]:
os.makedirs(traindataSliceOutput,mode = 0o666)
os.makedirs(trainmaskSliceOutput,mode = 0o666)
os.makedirs(testdataSliceOutput,mode = 0o666)
os.makedirs(testmaskSliceOutput,mode = 0o666)

Uncomment if more test samples are required. Currently using 2 test, 8 train.

In [None]:
# move_dir(imageSliceOutput,testdataSliceOutput,'lungs6*.png')
# move_dir(imageSliceOutput,testdataSliceOutput,'lungs7*.png')
move_dir(imageSliceOutput,testdataSliceOutput,'lungs8*.png')
move_dir(imageSliceOutput,testdataSliceOutput,'lungs9*.png')
# move_dir(maskSliceOutput,testmaskSliceOutput,'lungs6*.png')
# move_dir(maskSliceOutput,testmaskSliceOutput,'lungs7*.png')
move_dir(maskSliceOutput,testmaskSliceOutput,'lungs8*.png')
move_dir(maskSliceOutput,testmaskSliceOutput,'lungs9*.png')

In [None]:
#Move the remaining files into training data folder
move_dir(imageSliceOutput,traindataSliceOutput,'*.png')
move_dir(maskSliceOutput,trainmaskSliceOutput,'*.png')

In [None]:
# Define constants
SEED = 909
BATCH_SIZE_TRAIN = 16
BATCH_SIZE_TEST = 16

IMAGE_HEIGHT = 512
IMAGE_WIDTH = 256
IMG_SIZE = (IMAGE_HEIGHT, IMAGE_WIDTH)

data_dir = './'
data_dir_train = os.path.join(data_dir, 'training')
# The images should be stored under: "data/slices/training/img/img"
data_dir_train_image = os.path.join(data_dir_train, 'img')
# The images should be stored under: "data/slices/training/mask/img"
data_dir_train_mask = os.path.join(data_dir_train, 'mask')

data_dir_test = os.path.join(data_dir, 'test')
# The images should be stored under: "data/slices/test/img/img"
data_dir_test_image = os.path.join(data_dir_test, 'img')
# The images should be stored under: "data/slices/test/mask/img"
data_dir_test_mask = os.path.join(data_dir_test, 'mask')

NUM_TRAIN = len(os.listdir(traindataSliceOutput))
NUM_TEST = len(os.listdir(testdataSliceOutput))

NUM_OF_EPOCHS = 5

Folder structure organization done! Create Data generators for training.

In [None]:
#Uncomment below lines, if Data augmentation during training is required.
def create_segmentation_generator_train(img_path, msk_path, BATCH_SIZE):
    data_gen_args = dict(rescale=1./255
#                      featurewise_center=True,
#                      featurewise_std_normalization=True,
#                      rotation_range=90
#                      width_shift_range=0.2,
#                      height_shift_range=0.2,
#                      zoom_range=0.3
                        )
    datagen = ImageDataGenerator(**data_gen_args)
    
    img_generator = datagen.flow_from_directory(img_path, target_size=IMG_SIZE, class_mode=None, color_mode='grayscale', batch_size=BATCH_SIZE, seed=SEED)
    msk_generator = datagen.flow_from_directory(msk_path, target_size=IMG_SIZE, class_mode=None, color_mode='grayscale', batch_size=BATCH_SIZE, seed=SEED)
    return zip(img_generator, msk_generator)

# Remember not to perform any image augmentation in the test generator!
def create_segmentation_generator_test(img_path, msk_path, BATCH_SIZE):
    data_gen_args = dict(rescale=1./255)
    datagen = ImageDataGenerator(**data_gen_args)
    
    img_generator = datagen.flow_from_directory(img_path, target_size=IMG_SIZE, class_mode=None, color_mode='grayscale', batch_size=BATCH_SIZE, seed=SEED)
    msk_generator = datagen.flow_from_directory(msk_path, target_size=IMG_SIZE, class_mode=None, color_mode='grayscale', batch_size=BATCH_SIZE, seed=SEED)
    return zip(img_generator, msk_generator)

In [None]:
train_generator = create_segmentation_generator_train(data_dir_train_image, data_dir_train_mask, BATCH_SIZE_TRAIN)
test_generator = create_segmentation_generator_test(data_dir_test_image, data_dir_test_mask, BATCH_SIZE_TEST)

In [None]:
#Function to compare between input, mask and prediction
def display(display_list):
    plt.figure(figsize=(15,15))
    
    title = ['Input Image', 'True Mask', 'Predicted Mask']
    
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]), cmap='gray')
    plt.show()

In [None]:
#Function to display top n images in datagen
def show_dataset(datagen, num=1):
    for i in range(0,num):
        image,mask = next(datagen)
        display([image[0], mask[0]])

In [None]:
#Visualize top 3 image comparisons in datagen
show_dataset(train_generator, 3)

U-net model development

In [None]:
def unet(n_levels, initial_features=32, n_blocks=2, kernel_size=3, pooling_size=2, in_channels=1, out_channels=1):
    inputs = keras.layers.Input(shape=(IMAGE_HEIGHT, IMAGE_WIDTH, in_channels))
    x = inputs
    
    convpars = dict(kernel_size=kernel_size, activation='relu', padding='same')
    
    #downstream
    skips = {}
    for level in range(n_levels):
        for _ in range(n_blocks):
            x = keras.layers.Conv2D(initial_features * 2 ** level, **convpars)(x)
        if level < n_levels - 1:
            skips[level] = x
            x = keras.layers.MaxPool2D(pooling_size)(x)
            
    # upstream
    for level in reversed(range(n_levels-1)):
        x = keras.layers.Conv2DTranspose(initial_features * 2 ** level, strides=pooling_size, **convpars)(x)
        x = keras.layers.Concatenate()([x, skips[level]])
        for _ in range(n_blocks):
            x = keras.layers.Conv2D(initial_features * 2 ** level, **convpars)(x)
            
    # output
    activation = 'sigmoid' if out_channels == 1 else 'softmax'
    x = keras.layers.Conv2D(out_channels, kernel_size=1, activation=activation, padding='same')(x)
    
    return keras.Model(inputs=[inputs], outputs=[x], name=f'UNET-L{n_levels}-F{initial_features}')
        

In [None]:
EPOCH_STEP_TRAIN = NUM_TRAIN // BATCH_SIZE_TRAIN
EPOCH_STEP_TEST = NUM_TEST // BATCH_SIZE_TEST
#Instantiate U-net with 4 levels
model = unet(4)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
model.fit_generator(generator=train_generator, 
                    steps_per_epoch=EPOCH_STEP_TRAIN, 
                    validation_data=test_generator, 
                    validation_steps=EPOCH_STEP_TEST,
                   epochs=NUM_OF_EPOCHS)

In [None]:
model.save(f'UNET-LungSegmentation_{IMAGE_HEIGHT}_{IMAGE_WIDTH}.h5')

In [None]:
test_generator = create_segmentation_generator_test(data_dir_test_image, data_dir_test_mask, 1)

In [None]:
def show_prediction(datagen, num=1):
    for i in range(0,num):
        image,mask = next(datagen)
        pred_mask = model.predict(image)[0] > 0.5
        display([image[0], mask[0], pred_mask])

In [None]:
show_prediction(test_generator, 3)

# **3D mesh generation**

In [None]:
#normalization. All constants have been defined above
def normalizeImageIntensityRange(img):
    img[img < HOUNSFIELD_MIN] = HOUNSFIELD_MIN
    img[img > HOUNSFIELD_MAX] = HOUNSFIELD_MAX
    return (img - HOUNSFIELD_MIN)/HOUNSFIELD_RANGE

In [None]:
#Linear interpolation to target width & height
def scaleImg(img, height, width):
    return cv2.resize(img, dsize=(width, height), interpolation=cv2.INTER_LINEAR)

In [None]:
sliceIndex = 100

Load a sample test case, inference it and visualize

In [None]:
targetImagePath = f'../input/covid19-ct-scans/ct_scans/coronacases_org_010.nii'
targetMaskPath  = f'../input/covid19-ct-scans/infection_mask/coronacases_010.nii'

imgTargetNii = nib.load(targetImagePath)
imgMaskNii = nib.load(targetMaskPath)

imgTarget = normalizeImageIntensityRange(imgTargetNii.get_fdata())
imgMask = imgMaskNii.get_fdata()

In [None]:
# show input image slice
plt.figure(figsize=(15,15))
imgSlice = imgTarget[sliceIndex,:,:]
imgDimX, imgDimY = imgSlice.shape
imgSliceScaled = scaleImg(imgSlice, IMAGE_HEIGHT, IMAGE_WIDTH)
plt.subplot(1,2,1)
plt.imshow(imgSlice, cmap='gray')
plt.subplot(1,2,2)
plt.imshow(imgSliceScaled, cmap='gray')
plt.show()
imgSlice.shape, imgSliceScaled.shape

In [None]:
# show input mask slice
plt.figure(figsize=(15,15))
maskSlice = imgMask[sliceIndex,:,:]
maskSliceScaled = scaleImg(maskSlice, IMAGE_HEIGHT, IMAGE_WIDTH)
plt.subplot(1,2,1)
plt.imshow(maskSlice, cmap='gray')
plt.subplot(1,2,2)
plt.imshow(maskSliceScaled, cmap='gray')
plt.show()
maskSlice.shape, maskSliceScaled.shape

In [None]:
# Predict with UNET model
plt.figure(figsize=(15,15))
imageInput = imgSliceScaled[np.newaxis,:,:,np.newaxis]
maskPredict = model.predict(imageInput)[0,:,:,0]
maskPredictScaled = scaleImg(maskPredict, imgDimX, imgDimY)
plt.subplot(1,2,2)
plt.imshow(maskPredict, cmap='gray')
plt.subplot(1,2,1)
plt.imshow(maskPredictScaled, cmap='gray')
plt.show()
maskPredictScaled.shape, maskPredict.shape

In [None]:
def predictVolume(inImg, toBin=True):
    (xMax, yMax, zMax) = inImg.shape
    
    outImgX = np.zeros((xMax, yMax, zMax))
    outImgY = np.zeros((xMax, yMax, zMax))
    outImgZ = np.zeros((xMax, yMax, zMax))
    
    cnt = 0.0
    if SLICE_X:
        cnt += 1.0
        for i in range(xMax):
            img = scaleImg(inImg[i,:,:], IMAGE_HEIGHT, IMAGE_WIDTH)[np.newaxis,:,:,np.newaxis]
            tmp = model.predict(img)[0,:,:,0]
            outImgX[i,:,:] = scaleImg(tmp, yMax, zMax)
    if SLICE_Y:
        cnt += 1.0
        for i in range(yMax):
            img = scaleImg(inImg[:,i,:], IMAGE_HEIGHT, IMAGE_WIDTH)[np.newaxis,:,:,np.newaxis]
            tmp = model.predict(img)[0,:,:,0]
            outImgY[:,i,:] = scaleImg(tmp, xMax, zMax)
    if SLICE_Z:
        cnt += 1.0
        for i in range(zMax):
            img = scaleImg(inImg[:,:,i], IMAGE_HEIGHT, IMAGE_WIDTH)[np.newaxis,:,:,np.newaxis]
            tmp = model.predict(img)[0,:,:,0]
            outImgZ[:,:,i] = scaleImg(tmp, xMax, yMax)
            
    outImg = (outImgX + outImgY + outImgZ)/cnt
    if(toBin):
        outImg[outImg>0.5] = 1.0
        outImg[outImg<=0.5] = 0.0
    return outImg

In [None]:
predImg = predictVolume(imgTarget)

Visualize Input image

In [None]:
my_widget = NiftiWidget(imgTargetNii)
my_widget.nifti_plotter(colormap='gray')

Visualize Predicted/inferenced mask

In [None]:
my_widget = NiftiWidget(nib.dataobj_images.DataobjImage(predImg))
my_widget.nifti_plotter(colormap='gray')

Visualize annotated mask

In [None]:
my_widget = NiftiWidget(imgMaskNii)
my_widget.nifti_plotter(colormap='gray')

In [None]:
vertices,faces,_,_ = measure.marching_cubes(predImg) #inference mask
ctvertices,ctfaces,ct,ct=measure.marching_cubes(imgTarget) #input scan 
maskvertices,maskfaces,mk,mk=measure.marching_cubes(imgMask) #annotated mask

Convert vertices and faces to mesh 

In [None]:
def dataToMesh(vert, faces):
    mm = mesh.Mesh(np.zeros(faces.shape[0], dtype=mesh.Mesh.dtype))
    for i, f in enumerate(faces):
        for j in range(3):
            mm.vectors[i][j] = vert[f[j],:]
    return mm

Save all .stl mesh files

In [None]:
mm = dataToMesh(vertices, faces)
mm.save('./Inferenced_lung_010.stl')
mm = dataToMesh(ctvertices, ctfaces)
mm.save('./Input_lung_010.stl')
mm = dataToMesh(maskvertices, maskfaces)
mm.save('./Mask_lung_010.stl')

# **Metrics**

In [None]:
def confusion_matrix_gen(y_true,y_pred):
    y_true=y_true.flatten()
    y_pred=y_pred.flatten()
    y_true[y_true>0.5]=1.0
    y_pred[y_pred>0.5]=1.0
    tn, fp, fn, tp=confusion_matrix(y_true, y_pred).ravel()
    return (tn,fp,fn,tp)

def accuracy(TN, FP, FN, TP):
    return (TP+TN)/(TN+FP+FN+TP)

def precision(TN, FP, FN, TP):
    return TP/(TP+FP)

def recall(TN, FP, FN, TP):
    return TP/(TP+FN)

def dsc(TN, FP, FN, TP):
    return (2*TP)/((2*TP)+FP+FN)

def jsi(TN, FP, FN, TP):
    return TP/(TP+FP+FN)

def f1_score(TN, FP, FN, TP):
    precision=TP/(TP+FP)
    recall=TP/(TP+FN)
    return 2*(precision*recall)/(precision+recall)

In [None]:
testset=["009","010"]
TN, FP, FN, TP = 0, 0, 0, 0
for i in testset:
    targetImagePath = f'../input/covid19-ct-scans/ct_scans/coronacases_org_{}.nii'.format(i)
    targetMaskPath  = f'../input/covid19-ct-scans/infection_mask/coronacases_{}.nii'.format(i)

    imgTargetNii = nib.load(targetImagePath)
    imgMaskNii = nib.load(targetMaskPath)

    imgTarget = normalizeImageIntensityRange(imgTargetNii.get_fdata())
    imgMask = imgMaskNii.get_fdata()
    
    predImg = predictVolume(imgTarget)
    tn, fp, fn, tp = confusion_matrix_gen(imgMask,predImg)
    TN+=tn
    FP+=fp
    FN+=fn
    TP+=tp
    
print("acuracy:",accuracy(TN, FP, FN, TP))
print("precision:",precision(TN, FP, FN, TP))
print("recall:",recall(TN, FP, FN, TP))
print("dsc:",dsc(TN, FP, FN, TP))
print("jsi:",jsi(TN, FP, FN, TP))
print("f1_score:",f1_score(TN, FP, FN, TP))

# **Point cloud generation**

In [None]:
mesh = o3d.io.read_triangle_mesh("./Inferenced_lung_010.stl")
print("_")
pointcloud = mesh.sample_points_poisson_disk(100000)
xyz_load = np.asarray(pointcloud.points,dtype=np.float32)
print('xyz_load shape', xyz_load.shape)

In [None]:
%matplotlib widget

fig = plt.figure()
ax = Axes3D(fig)
  
# creating the plot
plot_geeks = ax.scatter(xyz_load[:10000,0], xyz_load[:10000,1], xyz_load[:10000,2], color='green')
  
# setting title and labels
ax.set_title("3D plot")
ax.set_xlabel('x-axis')
ax.set_ylabel('y-axis')
ax.set_zlabel('z-axis')
  
# displaying the plot
plt.show()

In [None]:
fig = plt.figure()
ax = plt.axes(projection ="3d")
ax.scatter3D(xyz_load[:,0], xyz_load[:,1], xyz_load[:,2], color = "green")
plt.title("simple 3D scatter plot")
plt.show()

In [None]:
with open('Inferenced_lung_010_pcd.xyz', 'w') as f:
    for line in xyz_load:
        f.write("{}\t {}\t {}\t".format(str(line[0]),str(line[1]),str(line[2])))
        f.write('\n')

One-time pcd loop generation

In [None]:
for filename in sorted(glob.iglob("./" + '*.stl')):
    mesh = o3d.io.read_triangle_mesh(filename)
    print("Mesh read complete")
    pointcloud = mesh.sample_points_poisson_disk(100000)
    print("PCD generation complete")
    xyz_load = np.asarray(pointcloud.points,dtype=np.float32)
    print('xyz_load shape', xyz_load.shape)
    with open('{}_pcd.xyz'.format(filename[:-4]), 'w') as f:
        for line in xyz_load:
            f.write("{}\t {}\t {}\t".format(str(line[0]),str(line[1]),str(line[2])))
            f.write('\n')