In [None]:
import sys
import gc
import os, shutil
import tempfile
from os import listdir
import clr_callback

import numpy as np
import pandas as pd

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib.pyplot import imshow
import seaborn as sns
from PIL import Image

from keras import backend as K

import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras.preprocessing import image
from tensorflow.keras import mixed_precision, regularizers
from tensorflow.keras.metrics import top_k_categorical_accuracy
from tensorflow.keras.layers import Input, Add, Dropout, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.initializers import random_uniform, glorot_uniform, constant, identity, he_normal
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler, CSVLogger
from tensorflow.keras.losses import CategoricalCrossentropy, BinaryCrossentropy
from tensorflow.keras.applications import InceptionV3, Xception, MobileNetV3Large,EfficientNetB0,EfficientNetV2B0
from resnet import resnet18

from sklearn.metrics import classification_report,confusion_matrix, matthews_corrcoef
from sklearn.utils.class_weight import compute_class_weight

In [None]:
from pathlib import Path
from PIL import Image
import os, shutil
from os import listdir
from skimage.io import imshow, imread, imsave

In [None]:
loss = BinaryCrossentropy(
    from_logits=False, label_smoothing=0.0, axis=-1,
    name='binary_crossentropy'
)

In [None]:
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.05)

clr = clr_callback.CyclicLR(base_lr=1e-4, max_lr=5e-3,
               step_size=2000, mode='triangular2')
    
es = EarlyStopping(monitor='val_accuracy', mode='max', min_delta=0.005, patience = 7,  restore_best_weights=True)
lrs = LearningRateScheduler(scheduler)

## Data Preprocessing

In [None]:
plt.rcParams["figure.figsize"] = (9,12)

def plot_model_history(model, path):
    fig, (ax1, ax2) = plt.subplots(2)
    ax1.plot(model.history['accuracy'])
    ax1.plot(model.history['val_accuracy'])
    ax1.set_title('Model Accuracy')
    ax1.set_ylabel('Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.legend(['Train', 'Val'], loc='upper left')
    
    ax2.plot(model.history['loss'], 'b')
    ax2.plot(model.history['val_loss'], 'r')
    ax2.set_title('Training and Validation loss')
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epoch')
    ax2.legend(['Train Loss', 'Val Loss'], loc='upper left')
    fig.savefig(path, bbox_inches = 'tight')

    plt.show()

In [None]:
def plot_confusion_matrix(df_confusion, path, title='Confusion Matrix'):
    print(df_confusion)
    plt.figure(figsize=(8, 6))
    plt.title(title)

    heatmaps = sns.heatmap(df_confusion, annot=True, cmap = "viridis",
               vmin = 0, vmax = 1)
    plt.setp(heatmaps.get_xticklabels(), rotation=30)
    plt.setp(heatmaps.get_yticklabels(), rotation=30)
    plt.tight_layout()
    plt.savefig(path)
    plt.show()

In [None]:
def report_classifier(model,generator, testing_size, batch_size, evaluate = False, efficient_net = False, 
                      path_report = None):
    Y_pred = model.predict(generator)
    y_pred = np.argmax(Y_pred, axis=1)
    k = matthews_corrcoef(generator.classes, y_pred)
    print(f'Matthew Correlation Coefficient : {k:.2f}')
    
    df_confusion_ori = pd.crosstab(generator.classes, y_pred, 
                               rownames=['Actual'], colnames=['Predicted'], margins=True)
    df_confusion = pd.crosstab(generator.classes, y_pred, 
                               rownames=['Actual'], colnames=['Predicted'], margins=True, normalize = "index")
    df_confusion.rename(columns={0: 'Confined Masonry', 1 : 'RC Infilled', 2 : 'Timber', 3 : 'Unconfined'}, 
              index={0: 'Confined Masonry', 1 : 'RC Infilled', 2 : 'Timber', 3 : 'Unconfined'}, inplace = True)
    plot_confusion_matrix(df_confusion[:4][:], path = path_report)
    
    print('Classification Report')
    target_names = ['Confined', 'RC', 'Timber', 'Unconfined']
    print(classification_report(generator.classes, y_pred, target_names=target_names))
    
    if evaluate:
        tipologi = {0 : 'Confined', 1 : 'RC Infilled', 2 : 'Timber', 3 : 'Unconfined'}
        generator.reset()
        print_index = 0
        showimg = 1
        plt.figure(figsize=(16, 32))
        while(print_index < len(y_pred)):
            x_batch, y_batch = next(generator)
            if(showimg == 1 or showimg == 32):
                plt.figure(figsize=(16, 32))
            for k, (img, lbl) in enumerate(zip(x_batch, y_batch)):
                if(showimg == 32):
                    showimg = 1
                if (y_pred[print_index] != np.argmax(lbl)):
                    if efficient_net:
                        plt.subplot(8, 4, showimg)#4 rows with 8 images.
                        showimg += 1
                        plt.title('Prediksi :' + str(tipologi[y_pred[print_index]]) + ', Aktual :' + str(tipologi[np.argmax(lbl)]), 
                                  fontsize = 9)
                        plt.imshow(img/255.)
                    else:
                        plt.subplot(8, 4, showimg)#4 rows with 8 images.
                        showimg += 1
                        plt.title('Prediksi :' + str(y_pred[print_index]) + ', Aktual :' + str(np.argmax(lbl)), 
                                  fontsize = 9)
                        plt.imshow(img)
                print_index += 1
    return

In [None]:
def create_datagen(data_path, target_size = (256,256), batch_size = 8, split = 0.3):
    train_datagen = image.ImageDataGenerator(
        rescale = 1.,
        horizontal_flip = True,
        rotation_range = 5,
        fill_mode = 'reflect',
        width_shift_range= 0.05,
        zoom_range = [0.8, 1.1],
        brightness_range = [0.8, 1.2],
        channel_shift_range= 10.0,
        #preprocessing_function= crop,
        validation_split = split
    )

    train_generator = train_datagen.flow_from_directory(
        data_path,
        target_size=target_size,
        batch_size= batch_size,
        class_mode='binary',
        subset='training')

    validation_generator = train_datagen.flow_from_directory(
        data_path,
        target_size=target_size,
        batch_size= batch_size,
        class_mode='binary',
        subset='validation') # set as validation data
    return train_generator, validation_generator

In [None]:
data_path = 'building_nonbuilding/train'

In [None]:
train_generator, validation_generator = create_datagen(data_path)

In [None]:
class_weights = compute_class_weight(
               class_weight = 'balanced',
                classes = np.unique(validation_generator.classes), 
                y = validation_generator.classes)

In [None]:
class_weights

In [None]:
d_class_weights = dict(enumerate(class_weights))
print(d_class_weights)

In [None]:
def experiment(model, train_gen, validation_gen, 
               epoch_sch, fine_tune_sch, lr_sch, 
               validation_sample, class_weight, model_name,
               label_smooth = 0.15, es = es, lrs = lrs, spe = None, save_epoch = 0):
    for i in range(len(epoch_sch)):
        fine_tune = fine_tune_sch[i]
        epochs = epoch_sch[i]
        learning_rates = lr_sch[i]
    
        for layer in model.layers[:fine_tune]:
            layer.trainable = False
        for layer in model.layers[fine_tune:]:
            layer.trainable = True
        model.compile(loss = BinaryCrossentropy(from_logits=False, label_smoothing=label_smooth, axis=-1, name='binary_crossentropy'), 
                                optimizer = Adam(learning_rate=learning_rates, beta_1=0.9, beta_2=0.999, 
                                            epsilon=None, amsgrad=False), 
                               metrics = ['accuracy'])
        print(f'Training Model for {epochs} epoch, fine-tuned at {fine_tune}, and with learning rate of {learning_rates}')
        model_title = model_name + str(i+1)
        csv_logger = CSVLogger('Misc/temp Graph/'+model_title+' Report.csv', append=True)
        hist = model.fit(
            train_gen,
            epochs = epochs,
            validation_data=validation_gen,
            class_weight = class_weight,
            callbacks=[es, lrs, csv_logger],
            steps_per_epoch = spe,
            verbose = 2
        )
        plot_model_history(hist, path = 'Misc/temp Graph/'+str(model_title)+' trainlog.jpg')
        if(i >= (len(epoch_sch)-5)):
            tf.keras.backend.clear_session()
        if(i >= save_epoch):
            model.save(r'Deep Learning Models/Typology Classifier/' + model_title + '.h5')
            gc.collect()

In [None]:
initializer = he_normal()

def add_regularization(model, regularizer = regularizers.l2(0.01)):
    if not isinstance(regularizer, regularizers.Regularizer):
        print("Regularizer must be a subclass of tf.keras.regularizers.Regularizer")
        return model

    for layer in model.layers:
        for attr in ['kernel_regularizer']:
            if hasattr(layer, attr):
                setattr(layer, attr, regularizer)

    model_json = model.to_json()

    tmp_weights_path = os.path.join(tempfile.gettempdir(), 'tmp_weights.h5')
    model.save_weights(tmp_weights_path)

    model = models.model_from_json(model_json)
    
    model.load_weights(tmp_weights_path, by_name=True)
    return model

In [None]:
def loadimage_path(source_dir,filename):
    file_path = source_dir + '/' + filename
    image = imread(file_path)
    plt.imshow(image)
    plt.show()
    return image

In [None]:
print(train_generator.class_indices)
print(validation_generator.class_indices)

In [None]:
x_batch, y_batch = next(train_generator)

plt.figure(figsize=(16, 32))
for k, (img, lbl) in enumerate(zip(x_batch, y_batch)):
    plt.subplot(8, 4, k+1)#4 rows with 8 images.
    plt.title(str(lbl))
    plt.imshow(img/255.)

In [None]:
batch_size = 32

In [None]:
K.clear_session()

## EfficientNet B0

In [None]:
def make_EfficientB0():
    base = EfficientNetB0(
        include_top = False,
        weights = "imagenet",
        input_shape = (256, 256, 3),
        pooling = "max",
        classes = 2
    )
    out = Dropout(0.5)(base.output)
    out = Dense(16, activation='relu', kernel_initializer=initializer)(out)
    out = BatchNormalization()(out)
    out = Dropout(0.5)(out)
    out = Dense(1, activation='sigmoid', kernel_initializer="glorot_uniform")(out)

    model = Model(inputs = base.input,outputs=out)
    add_regularization(model)

    return model

In [None]:
model = make_EfficientB0()

In [None]:
epoch_schedule = [10, 25, 25, 15]
fine_tune_schedule = [238, 234, 162, 3]
lr_schedule = [12e-4, 9e-4, 7e-4, 5e-4]

In [None]:
model.summary(show_trainable = True)

In [None]:
experiment(model, train_generator, validation_generator,
           epoch_schedule, fine_tune_schedule, lr_schedule, 
           validation_sample = 1178, class_weight = d_class_weights, 
           model_name = 'Model B-NB ',
           label_smooth = 0.2, es = es, lrs = lrs, save_epoch = 0)

## Prediksi Foto Baru

In [None]:
model_eff = models.load_model('Deep Learning Models/Model B-NB.h5')

In [None]:
test_path = 'Generator Results 3/'
save_path = 'Generator Results 3/'
for i, fname in enumerate(listdir(test_path)):
        fpath = os.path.join(test_path, fname)
        img = image.load_img(fpath, target_size=(256, 256))
        img_array = image.img_to_array(img)
        img_batch = np.expand_dims(img_array, axis=0)
        img_preprocessed = img_batch
        
        prediction = model_eff.predict(img_preprocessed)
        prediction = np.squeeze(prediction)
    
        file_name = save_path + "{:.4f}".format(prediction) + '_' + str(fname)
        img_array = img_array/255.
        matplotlib.image.imsave(file_name, img_array)
        
        if(i % 500 == 0):
            print("Prediksi ke-" + str(i))
        #im.save(file_name)