In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import jinja2
import cv2
import logging

from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Activation, MaxPool2D, Concatenate

In [None]:
def get_f1(y_true, y_pred): #taken from old keras source code
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val

In [None]:
def double_conv_block(x, n_filters):
   # Conv2D then ReLU activation
   x = keras.layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   # Conv2D then ReLU activation
   x = keras.layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   return x

def downsample_block(x, n_filters):
   f = double_conv_block(x, n_filters)
   p = keras.layers.MaxPool2D(2)(f)
   p = keras.layers.Dropout(0.3)(p)
   return f, p

def upsample_block(x, conv_features, n_filters):
   # upsample
   x = keras.layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
   # concatenate
   x = keras.layers.concatenate([x, conv_features])
   # dropout
   x = keras.layers.Dropout(0.3)(x)
   # Conv2D twice with ReLU activation
   x = double_conv_block(x, n_filters)
   return x

In [None]:
def build_unet_model(input_shape):
     # inputs
   inputs = keras.layers.Input(shape=input_shape)
   # encoder: contracting path - downsample
   # 1 - downsample
   f1, p1 = downsample_block(inputs, 16)
   # 2 - downsample
   f2, p2 = downsample_block(p1, 32)
   # 3 - downsample
   f3, p3 = downsample_block(p2, 64)
   # 4 - downsample
   f4, p4 = downsample_block(p3, 128)
   # 5 - bottleneck
   bottleneck = double_conv_block(p4, 256)
   # decoder: expanding path - upsample
   # 6 - upsample
   u6 = upsample_block(bottleneck, f4, 128)
   # 7 - upsample
   u7 = upsample_block(u6, f3, 64)
   # 8 - upsample
   u8 = upsample_block(u7, f2, 32)
   # 9 - upsample
   u9 = upsample_block(u8, f1, 16)
   # outputs
   f = keras.layers.Flatten()(u9) 
   outputs = keras.layers.Dense(1,"sigmoid")(f)#keras.layers.Conv2D(1, 1, padding="same", activation = "softmax")(f)
   # unet model with Keras Functional API
   unet_model = keras.Model(inputs, outputs, name="U-Net")
   
   unet_model.compile(optimizer=lr, loss="binary_crossentropy", metrics=["accuracy", get_f1])
   return unet_model



In [None]:
data_dir_train = 'Dataset/train/'
data_dir_test = 'Dataset/test/'

img_size = 256
input_shape = (img_size, img_size, 3)
target_size =(img_size,img_size)
batch_size = 32
num_classes = 1

test_num = 1

compile_optimizer = "adam"
compile_loss = "binary_crossentropy"
lr = keras.optimizers.Adam(learning_rate=0.001)

csv_path = f'Results/Dataset_U_net_{batch_size}_{img_size}_Test_{test_num}.csv'
save_model_path = f"Models/Dataset_U_net_{batch_size}_{img_size}_Test_{test_num}.h5"

In [None]:
training_data = ImageDataGenerator(
        rescale = 1./255,
        rotation_range = 40,
        width_shift_range = 0.25,
        height_shift_range = 0.25,
        shear_range = 0.25,
        zoom_range = 0.3,
        brightness_range= [0.6,0.9],
        horizontal_flip = True,
        fill_mode = 'nearest',
        validation_split = 0.2,
        )

test_data = ImageDataGenerator(
        rescale = 1./255,
        )

In [None]:
train_ds = training_data.flow_from_directory(
    directory=data_dir_train,
    shuffle = True,
    seed = 42,
    color_mode="rgb",
    class_mode = 'binary',
    target_size=target_size,
    batch_size=batch_size,
    subset="training",
   )

validation_ds = training_data.flow_from_directory(
    directory=data_dir_train,
    shuffle = True,
    seed = 42,
    class_mode = 'binary',
    color_mode="rgb",
    target_size=target_size,
    batch_size=batch_size,
    subset="validation",
    )

test_ds = test_data.flow_from_directory(
    directory=data_dir_test,
    target_size=target_size,
    color_mode="rgb",
    shuffle = False,
    )

In [None]:
model = build_unet_model(input_shape)
#model.summary()

In [None]:
checkpoint = ModelCheckpoint(save_model_path, monitor='val_accuracy',
                             save_best_only=True, save_weights_only=True, mode='auto')

early = EarlyStopping(monitor='val_accuracy', patience=5, verbose=1)

history = model.fit(train_ds, epochs=100, callbacks=[checkpoint,early], validation_data = validation_ds)

In [None]:
model.load_weights(save_model_path)
#df = pd.DataFrame(history.history)
#df.plot(figsize=(10,8))
val_data = model.evaluate(validation_ds)
#log = f'Dataset_2_ResNet34_{batch_size}_{img_size}_Test_{test_num} : '+ str(val_data)
#logging.info(log)

In [None]:
pred = model.predict(test_ds, steps = len(test_ds), verbose=1)
cl = np.round(pred)
classes_prediction = []
for prediction in cl[:,0]:
    if (prediction == 0.0):
        classes_prediction.append('Cracked')
    elif(prediction == 1.0):
        classes_prediction.append('Uncracked')

filenames=test_ds.filenames
results=pd.DataFrame({"file":filenames,"prediction":pred[:,0], "class":classes_prediction})

In [None]:
results.to_csv(csv_path)
results.style

In [None]:
import cv2 
rez = pd.read_csv(csv_path)
plt.figure(figsize=(40,28))
for i in range(56):
    img_path = str(rez['file'][i])
    image_path = data_dir_test + 'test/' + img_path[5:]
    #print(image_path)
    class_pred = str(rez['class'][i])
    img = cv2.imread(image_path)
    plt.subplot(7,8, i+1)
    plt.imshow(img)
    plt.title(class_pred)

In [None]:
print(classes_prediction.count("Cracked"))