In [1]:
import os
import cv2
import random
import numpy as np

from matplotlib import pyplot as plt
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from keras_unet.models import custom_unet
from skimage.io import imread

import warnings
warnings.filterwarnings('ignore')

import tensorflow as tf
physical_devices = tf.config.list_physical_devices('GPU') 
tf.config.experimental.set_memory_growth(physical_devices[0], True)

-----------------------------------------
keras-unet init: TF version is >= 2.0.0 - using `tf.keras` instead of `Keras`
-----------------------------------------


In [2]:
def showRandomPredict(images, masks, predicts, sample_count, if_predicted= False):
    sample_count = sample_count * 4
    plt.figure(figsize=(15, 30))
    for i in range(0, sample_count, 4):
        img_index = random.choice(range(len(images)))
        plt.subplot(5,4,i + 1)
        random_img = images[img_index,:,:]
        random_img = cv2.resize(random_img, (IMG_HEIGHT, IMG_WIDTH))
        plt.imshow(random_img, cmap=plt.cm.bone)
        plt.axis('off')
        plt.title('Lung X-Ray')

        plt.subplot(5,4,i + 2)
        random_mask = masks[img_index,:,:]
        random_mask = cv2.resize(random_mask, (IMG_HEIGHT, IMG_WIDTH))
        plt.imshow(random_mask, cmap='gray')
        plt.axis('off')
        plt.title('Mask Ground Truth')
        
        if not if_predicted:
            continue
        random_pred = predicts[img_index,:,:,0]
        plt.subplot(5,4,i + 3)
        plt.imshow(random_pred, cmap='gray')
        plt.axis('off')
        plt.title('Predicted Mask')

        plt.subplot(5,4,i + 4)
        plt.imshow(cv2.bitwise_and(images[img_index,:,:], images[img_index,:,:], mask=random_pred.astype(np.uint8)), cmap=plt.cm.bone)
        plt.axis('off')
        plt.title('Predicted Lung Segmentation')
    plt.show()

def SaveGeneratedMask(Image_collection_path, save_path, modelName):
    
    test_images = os.listdir(Image_collection_path)
    x = np.zeros((len(test_images), IMG_HEIGHT, IMG_WIDTH))
    
    for i, img_id in enumerate(test_images):
        img_collection = Image_collection_path  + img_id

        cv2_image = cv2.imread(img_collection,0)
        cv2_image = cv2.resize(cv2_image, (IMG_HEIGHT, IMG_WIDTH))
        cv2.imwrite('img.jpg', cv2_image)
        img = imread('img.jpg')
        x[i] = img
        os.remove('img.jpg')
    
    model2 = load_model(modelName)
    pred_img = model2.predict(x/255.)*255.0
    
    for j, img_id in enumerate(test_images):   
        new_img_path =save_path + 'mask_' + img_id
        cv2.imwrite(new_img_path, pred_img[j])




In [4]:
IMG_HEIGHT = 128
IMG_WIDTH = 128

image_path = "C:\\Users\\ugur_\\Python Projects\\DATA\\Lung Segmentation\\CXR_png\\"
mask_path = "C:\\Users\\ugur_\\Python Projects\\DATA\Lung Segmentation\\masks\\"

images = os.listdir(image_path)
mask = os.listdir(mask_path)

print(len(images), len(mask))

800 704


In [None]:
mask2 = [fName.split(".png")[0] for fName in mask]
image_file_name = [fName.split("_mask")[0] for fName in mask2]
print(len(image_file_name), len(mask))

In [None]:
check = [i for i in mask if "mask" in i]
print(len(check))

In [None]:
x = np.zeros((len(mask), IMG_HEIGHT, IMG_WIDTH))
y = np.zeros((len(mask), IMG_HEIGHT, IMG_WIDTH))

for i, img_id in enumerate(image_file_name):
    
    file_path = image_path  + img_id + ".png"
    
    # read the file as an array
    cv2_image = cv2.imread(file_path, 0)
    # resize the image
    cv2_image = cv2.resize(cv2_image, (IMG_HEIGHT, IMG_WIDTH))
    # save the image at the destination as a jpg file
    cv2.imwrite('img.jpg', cv2_image)
    
    # read the image using skimage
    img = imread('img.jpg')
    x[i] = img
    
    os.remove('img.jpg')

    
for i, img_id in enumerate(mask):
    
    file_path = mask_path  + img_id 
    
    # read the file as an array
    cv2_image = cv2.imread(file_path, 0)
    # resize the image
    cv2_image = cv2.resize(cv2_image, (IMG_HEIGHT, IMG_WIDTH))
    # save the image at the destination as a jpg file
    cv2.imwrite('img.jpg', cv2_image)
    
    # read the image using skimage
    img = imread('img.jpg')
    y[i] = img
    
    os.remove('img.jpg')

In [None]:
X_train, X_test, y_train_m, y_test_m = train_test_split(x, y, test_size=0.2, random_state=123)

x2 = np.flip(X_train , axis = 2)
y2 = np.flip(y_train_m , axis = 2)

X_train= np.append(X_train, x2,axis=0)
y_train_m = np.append(y_train_m, y2,axis=0)

In [None]:
print(X_train.shape,y_train_m.shape)
showRandomPredict(X_train, y_train_m, y_train_m, 2)

In [None]:
model = custom_unet(
    (128, 128, 1),
    use_batch_norm=True,
    num_classes=1,
    filters=64,
    dropout=0.2,
    output_activation='sigmoid',)

callbacks = [
    EarlyStopping(monitor='val_accuracy', patience=4, verbose=1, min_delta=1e-4),
    ModelCheckpoint('model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max'),]

model.compile(
    optimizer=Adam(),
    loss='binary_crossentropy',
    metrics=['accuracy'],)

In [None]:
model.summary()

In [None]:
y_test_m2 = (y_test_m/255.> .5).astype(int)
y_train_m2 = (y_train_m/255.> .5).astype(int)

In [None]:
h = model.fit(
    X_train/255., y_train_m2,   
    callbacks=callbacks,
    validation_data=(X_test/255., y_test_m2),
    epochs=15, batch_size=20,
)

In [None]:
model = load_model("model.h5")

plt.plot(h.history['accuracy'])
plt.plot(h.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(h.history['loss'])
plt.plot(h.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


losses = h.history['loss']
accs = h.history['accuracy']
val_losses = h.history['val_loss']
val_accs = h.history['val_accuracy']
epochs = len(losses)

preds = model.predict(X_test/255.)*255
showRandomPredict(X_test, y_test_m, preds, 2, True)

In [None]:
callbacks = [
    ReduceLROnPlateau(monitor='loss', patience=3, verbose=1, factor=0.5, min_lr=0.00001),
    ModelCheckpoint('model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max'),
]

In [None]:
h2 = model.fit(
    X_train/255., y_train_m2,
    validation_data=(X_test/255., y_test_m2),
    epochs=50, batch_size=16,
    shuffle=True,
    verbose=2,
    callbacks=callbacks
)

In [None]:
model = load_model('model.h5')

plt.plot(h2.history['accuracy'])
plt.plot(h2.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(h2.history['loss'])
plt.plot(h2.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


losses = h2.history['loss']
accs = h2.history['accuracy']
val_losses = h2.history['val_loss']
val_accs = h2.history['val_accuracy']
epochs = len(losses)
showRandomPredict(X_test, y_test_m, preds, 3, True)

In [5]:
Image_collection_path = 'C:\\Users\\ugur_\\Python Projects\\LungClassifier\\images\\images_001\\images\\'
save_path ='C:\\Users\\ugur_\\Python Projects\\LungClassifier\\Predicted Mask\\'
SaveGeneratedMask(Image_collection_path=Image_collection_path, save_path= save_path, modelName='model.h5')

