# Import library

In [1]:
import os
import cv2 
import pandas as pd
import numpy as np
import joblib
from keras.models import load_model
import skimage
from skimage import measure
from collections import Counter

# Import Dataset from Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
target_dir='/content/gdrive/Shareddrives/Soedirman-Machine-Learning/CT SCAN COVID-19/Demo Patient'
output_dir='/content/gdrive/Shareddrives/Soedirman-Machine-Learning/CT SCAN COVID-19/Demo Patient Lung'



# Load Model

In [4]:
model_CT=load_model("/content/gdrive/Shareddrives/Soedirman-Machine-Learning/CT SCAN COVID-19/Model/model_CT.h5")

# Pre-processing Image

In [5]:
def split_target_dir(target_dir, output_dir):
    target_list = [target_dir + os.sep + file for file in os.listdir(target_dir)]
    for target in target_list:
        img_split = split_lung_parenchyma(target, 10999, -96)
        dst = target.replace(target_dir, output_dir)
        dst_dir = os.path.split(dst)[0]
        if not os.path.exists(dst_dir):
            os.makedirs(dst_dir)
        cv2.imencode('.jpg', img_split)[1].tofile(dst)
    print(f'Target list done with {len(target_list)} items')

In [6]:
def split_lung_parenchyma(target,size,thr):
    img=cv2.imdecode(np.fromfile(target,dtype=np.uint8),cv2.IMREAD_GRAYSCALE)
    try:
        img_thr= cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV,size,thr).astype(np.uint8)
    except:
        img_thr= cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV,999,thr).astype(np.uint8)
    img_thr=255-img_thr
    img_test=measure.label(img_thr, connectivity = 1)
    props = measure.regionprops(img_test)
    img_test.max()
    areas=[prop.area for prop in props]
    ind_max_area=np.argmax(areas)+1
    del_array = np.zeros(img_test.max()+1)
    del_array[ind_max_area]=1
    del_mask=del_array[img_test]
    img_new = img_thr*del_mask
    mask_fill=fill_water(img_new)
    img_new[mask_fill==1]=255
    img_new = 255-img_new
    _, labels, stats, centroids = cv2.connectedComponentsWithStats(img_new.astype( np.uint8 ))
    labels = np.array(labels, dtype=np.float)
    maxnum = Counter(labels.flatten()).most_common(3)
    maxnum = sorted([x[0] for x in maxnum])
    background = np.zeros_like(labels)
    if len(maxnum) == 1:
        pass
    elif len(maxnum) == 2:
        background[labels == maxnum[1]] = 1
    else:
        background[labels == maxnum[1]] = 1
        background[labels == maxnum[2]] = 1
    img_new[background == 0] = 0
    img_new=cv2.dilate(img_new, np.ones((5,5),np.uint8) , iterations=3)
    img_new = cv2.erode(img_new, np.ones((5, 5), np.uint8), iterations=2)
    img_new = cv2.morphologyEx(img_new, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (10, 10)),iterations=2)
    img_new = cv2.medianBlur(img_new.astype(np.uint8), 21)
    img_out=img*img_new.astype(bool)
    return img_out

In [7]:
def fill_water(img):
    copyimg = img.copy()
    copyimg.astype(np.float32)

    height, width = img.shape
    img_exp = np.zeros((height + 20, width + 20))
    height_exp, width_exp = img_exp.shape
    img_exp[10:-10, 10:-10] = copyimg

    mask1 = np.zeros([height + 22, width + 22], np.uint8)
    mask2 = mask1.copy()
    mask3 = mask1.copy()
    mask4 = mask1.copy()

    cv2.floodFill(np.float32(img_exp), mask1, (0, 0), 1)
    cv2.floodFill(np.float32(img_exp), mask2, (height_exp - 1, width_exp - 1), 1)
    cv2.floodFill(np.float32(img_exp), mask3, (height_exp - 1, 0), 1)
    cv2.floodFill(np.float32(img_exp), mask4, (0, width_exp - 1), 1)

    mask = mask1 | mask2 | mask3 | mask4

    output = mask[1:-1, 1:-1][10:-10, 10:-10]
    return output

In [21]:
def read_ct_img_bydir(target_dir):
    img=cv2.imdecode(np.fromfile(target_dir,dtype=np.uint8),cv2.IMREAD_GRAYSCALE)
    img=cv2.resize(img,(224,224))
    return img

In [9]:
img_list=[target_dir+os.sep+file for file in os.listdir(target_dir)]

In [10]:
def top_time_series(df,top=10,time_seq=True,del_deficiency=True,invalid_cutoff=0.5):
    df=df[df['NiCT']<=invalid_cutoff]
    df.sort_values('pCT',ascending=0,inplace=True)
    if time_seq:
        df=df.head(top).sort_index()
    else:
        df=df.head(top)
    if len(df)<top:
        print('Patient with not enough CTs')
    return df

In [11]:
def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        pass
    try:
        import unicodedata
        unicodedata.numeric(s)
        return True
    except (TypeError, ValueError):
        pass
    return False

In [12]:
def X_fromdf(df_top):
    X=np.array([read_ct_img_bydir(file) for file in df_top['File'].tolist()])
    X=X[:,:,:,np.newaxis].transpose(3,1,2,0)[np.newaxis,:,:,:]
    return np.concatenate(X)

# Copy Dataset to New Directory

In [13]:
split_target_dir(target_dir,output_dir)
img_list=[output_dir+os.sep+file for file in os.listdir(output_dir)]

Target list done with 28 items


# Use Model for Predict

In [22]:
X_CT_Valid=np.array([read_ct_img_bydir(file) for file in img_list])
X_CT_Valid = np.repeat(np.expand_dims(X_CT_Valid, axis=3), 3, axis=3)
y_CT_Valid=model_CT.predict_proba(X_CT_Valid)



# Save Predict Value to .csv

In [24]:
df=pd.DataFrame({'File':img_list,'NiCT':y_CT_Valid[:,0],'pCT':y_CT_Valid[:,1],'nCT':y_CT_Valid[:,2]})
df.to_csv('Demo_img_score.txt',sep='\t',index=None)