In [None]:
import os
import numpy as np
import tensorflow as tf
import glob
import matplotlib.pyplot as plt

from keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras import optimizers
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint
from MLD import multi_lens_distortion

os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' 
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  # Select GPU

In [None]:
IMG_SIZE = (224, 224)
IMG_SHAPE = IMG_SIZE + (3,)

def network_1():
    # Load pre-trained DenseNet201 and ResNet101V2 models
    dense_net_full = tf.keras.applications.DenseNet201(input_shape=IMG_SHAPE, include_top=False, weights='imagenet')
    res_net_full = tf.keras.applications.ResNet101V2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet')

    # Create a new model with only the first 54 layers of DenseNet201
    dense_net = tf.keras.Model(inputs=dense_net_full.input, outputs=dense_net_full.layers[178].output)

    # Create a new model with only the first 54 layers of ResNet101V2
    res_net = tf.keras.Model(inputs=res_net_full.input, outputs=res_net_full.layers[178].output)

    # Define input layer
    input = layers.Input(shape=IMG_SHAPE)

    # Extract features using the modified models
    den_features = dense_net(input)
    res_features = res_net(input)

    # Global Average Pooling (GAP)
    den_features = layers.GlobalAveragePooling2D()(den_features)
    res_features = layers.GlobalAveragePooling2D()(res_features)

    # Concatenate extracted features
    concatenated = layers.Concatenate()([den_features, res_features])

    # Dense layers for classification
    z = layers.Dropout(0.4)(concatenated)
    z = layers.Dense(512, activation='relu')(z)
    # z = layers.Dropout(0.2)(z)
    z = layers.Dense(2, activation='softmax')(z)

    # Final model
    model = Model(inputs=input, outputs=z)
    model.compile(optimizer=optimizers.Adam(1e-4), loss="CategoricalCrossentropy", metrics=['accuracy'])

    model.summary()
    return model

model = network_1()


In [None]:
from os import walk
filenames = next(walk('./NLCB/Data3/'), (None, None, []))[2]  # [] if no file
filenames_val = next(walk('./NLCB/Data3/Validation/'), (None, None, []))[2]  # [] if no file

In [None]:
def custom_data_generator(directory):
    for filepath in glob.glob(os.path.join(directory, '*.png')):  # assuming jpeg images
        image = tf.io.read_file(filepath)
        image = tf.image.decode_jpeg(image, channels=3)
        label = []
        label[0] = 1 if filepath[4] == 'n' else 0  # Check the 5th character from the end for 'n'
        label[1] = 0 if filepath[4] != 'n' else 1
        yield image, label



def custom_preprocessing_function(img):

    if tf.random.uniform((), minval= 0, maxval=1) > 0.5:
        nbr_rot = tf.random.uniform(shape=[], minval=1, maxval=4, dtype=tf.int32)
        img =tf.image.rot90(img, k=nbr_rot)

    img = tf.image.random_hue(img, 0.08)
    img = tf.image.random_contrast(img, 0.7, 1.3)
    img = tf.image.random_brightness(img, 0.2)
    img = tf.image.random_saturation(img, 0.7, 1.3)
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_flip_up_down(img)
    # print(img.shape)
    # img = tf.image.random_crop(img, (int(img.shape[0]/2),int(img.shape[1]/2), 3))
    img = tf.image.random_crop(img, (224, 224, 3))
    img = img/255.
    img = tf.image.resize(img,(224,224))
    img = tf.numpy_function(
        multi_lens_distortion, 
        [img, 4, (80, 110), (-0.4, 0.4)],   
        tf.uint8
    )

    return img

def validation_preprocessing_function(img):
    # img = tf.image.random_crop(img, (224, 224, 3))
    img = img/255.
    img = tf.image.resize(img,(224,224))

# Paths
train_data_dir = "./NLCB/Data3/Training/"
validation_data_dir = "./NLCB/Data3/Validation/"

# Create a data generator for training data
train_datagen = ImageDataGenerator(
    # rescale=1/255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    preprocessing_function=custom_preprocessing_function  # Add more augmentations here
)

# Create a data generator for validation data
validation_datagen = ImageDataGenerator(preprocessing_function=validation_preprocessing_function)

# Use custom data generator for training and validation datasets
train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='categorical',
    shuffle=True
)

validation_generator = validation_datagen.flow_from_directory(
    validation_data_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='categorical',
    shuffle=False
)



In [None]:
model.compile(optimizer=optimizers.Adamax(1e-4), loss="CategoricalCrossentropy", metrics=['accuracy'])

# # considering you want to monitor accuracy:
# acc_thresh = 0.95

# class MyCallback(Callback):
#     def on_epoch_end(self, epoch, logs=None):
#         logs = logs or {}
#         val_accuracy = logs.get('val_accuracy')
#         if val_accuracy is not None and val_accuracy > acc_thresh:
#             print(f'\nEpoch {epoch}: Early stopping as val_accuracy is {val_accuracy}')
#             self.model.stop_training = True


# history = model.fit(
#     train_generator,
#     steps_per_epoch=len(train_generator),
#     validation_data=validation_generator,
#     validation_steps=len(validation_generator),
#     epochs=200,
#     callbacks=[MyCallback()]
# )

# model.save('./PWCModel/')


# Parameters for EarlyStopping and ModelCheckpoint
patience = 40

# Setting up callbacks for early stopping on minimum validation loss and saving the best model
early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=patience,
    verbose=1,
    mode='min',
    restore_best_weights=True  # Restores model weights from the epoch with the best value of the monitored quantity.
)

model_checkpoint_callback = ModelCheckpoint(
    './PWCModel/best_model.h5',  # Path where the model will be saved
    monitor='val_loss',
    save_best_only=True,  # Only the best model according to the validation loss is saved
    mode='min',
    verbose=1
)

history = model.fit(
    train_generator,
    steps_per_epoch=len(train_generator),
    validation_data=validation_generator,
    validation_steps=len(validation_generator),
    epochs=200,
    callbacks=[early_stopping_callback, model_checkpoint_callback]
)

# Save the overall model after training (optional, as the best model is already saved)
model.save('./PWCModel/best_PWC_model.h5')


In [None]:
import tf2onnx
import onnx

onnx_model, _ = tf2onnx.convert.from_keras(model, opset=13)
onnx.save(onnx_model, "./PWCModel.onnx")

In [None]:
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()