In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers

import warnings
warnings.filterwarnings('ignore')

In [None]:
class DataGenerator():
    def __init__(self, batch_size, target_size=(256, 256)):
        self.batch_size = batch_size
        self.target_size = target_size

    def generate_data(self, subset, directory):

        print("Loading", subset, "...")

        shuffle = False
        if subset == 'train' or subset == 'validation':
            shuffle = True

        dataset = tf.keras.preprocessing.image_dataset_from_directory(
            directory,
            labels='inferred',
            label_mode='categorical',
            class_names=['0', '1'],
            color_mode='rgb',
            batch_size=self.batch_size,
            image_size=self.target_size,
            shuffle=shuffle,
            seed=42,
        )

        return dataset

In [None]:
def augmentation_layer(dataset):
    data_augmentation = tf.keras.Sequential(
        [
            tf.keras.layers.RandomFlip('horizontal'),
            tf.keras.layers.RandomContrast(0.2),
            tf.keras.layers.RandomBrightness(0.2),
        ]
    )
    dataset = dataset.map(lambda x, y: (data_augmentation(x, training=True), y))
    return dataset

In [None]:
batch_size = 256
target_size = (256, 256)
data_gen = DataGenerator(batch_size, target_size)

train_set = data_gen.generate_data('train', '/kaggle/input/real-vs-ai-generated-faces-dataset/dataset/dataset/train')
val_set = data_gen.generate_data('validation', '/kaggle/input/real-vs-ai-generated-faces-dataset/dataset/dataset/validate')
test_set = data_gen.generate_data('test', '/kaggle/input/real-vs-ai-generated-faces-dataset/dataset/dataset/test')

In [None]:
labels = np.array([])
for x, y in test_set:
    labels = np.concatenate([labels, np.argmax(y.numpy(), axis=-1)])

print(len(labels))

In [None]:
train_set = augmentation_layer(train_set)

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Dropout
from tensorflow.keras.applications import MobileNetV3Large, EfficientNetB2

from tensorflow.keras.utils import plot_model
from IPython.display import Image


class CustomModels:
    """Custom Model Class"""

    def __init__(self, num_classes=1000, input_shape=(256, 256, 3), name='resnet'):
        self.num_classes = num_classes
        self.input_shape = input_shape
        self.name = name
        self.model = None

    def build(self):
        if "mobilenet" in self.name:
            base_model = MobileNetV3Large(include_top=False, weights='imagenet')

        elif "efficientnet" in self.name:
            base_model = EfficientNetB2(include_top=False, weights='imagenet')

        for layer in base_model.layers:
            layer.trainable = False

        inputs = Input(shape=self.input_shape)
        x = base_model(inputs, training=False)
        x = GlobalAveragePooling2D()(x)
        x = Dropout(0.1)(x)
        x = Dense(units=2048, activation='relu')(x)
        outputs = Dense(units=self.num_classes, activation='softmax')(x)
        model = Model(inputs=inputs, outputs=outputs, name=self.name)

        self.model = model
        return model

    def visualize(self):
        plot_model(self.model, show_shapes=True, to_file=self.name + '.png')
        image = Image(self.name + '.png')
        display(image)

    def summary(self):
        print(self.model.summary(show_trainable=True))


In [None]:
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import os


class Trainer:
    def __init__(
        self, model, loss, optimizer=Adam(learning_rate=0.001), metrics=['accuracy']
    ):
        self.model = model
        self.optimizer = optimizer
        self.loss = loss
        self.metrics = metrics
        self.history = None
        self.epochs = 0
        self.fine_tune_epochs = 0

    def compile(self):
        """
        Method to compile model with optimizer, loss and metrics
        """
        self.model.compile(
            optimizer=self.optimizer, loss=self.loss, metrics=self.metrics
        )

    def fit(self, train_set, val_set, batch_size, epochs, callbacks=[]):
        """
        Method to train model with dataset and return history
        """
        train_history = self.model.fit(
            train_set,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=val_set,
            callbacks=callbacks,
        )
        # history
        self.history = train_history.history
        self.epochs = len(train_history.history['val_loss'])

    def get_model_info(self):
        """Method to print all informations of model"""
        print("- Model name: ", self.model.name)
        print("- Number of layers: ", len(self.model.layers))
        print("- Base model name: ", self.model.layers[1].name)
        print("- Number of layers of base model: ", len(self.model.layers[1].layers))
        print("- Number of parameters: ", self.model.count_params())

    def fine_tune(
        self,
        train_set,
        val_set,
        batch_size,
        epochs,
        new_optimizer,
        callbacks=[],
        unfreeze_position=0,
    ):
        """
        Method to fine tune model with dataset and return history
        """
        # unfreeze layers
        for layer in self.model.layers[1].layers[unfreeze_position:]:
            if not isinstance(layer, layers.BatchNormalization):
                layer.trainable = True

        # recompile model
        self.model.compile(
            optimizer=new_optimizer, loss=self.loss, metrics=self.metrics
        )

        print("------------")
        print(
            f"Unfreezed from position {unfreeze_position} to {len(self.model.layers[1].layers) - 1}"
        )
        print(self.model.summary(show_trainable=True))

        output_file = "/kaggle/working/finetune_layer_info.txt"
        with open(output_file, "w") as f:
            for i, layer in enumerate(self.model.layers[1].layers):
                f.write(f"{i} {layer.name} {layer.trainable}\n")
        print("------------")
        # print total trainable param of basemodel
        total_trainable_param = 0
        for layer in self.model.layers[1].layers:
            if layer.trainable:
                total_trainable_param += layer.count_params()
        print(f"Total trainable param of base model: {total_trainable_param}")

        # fine tune model
        fine_tune_history = self.model.fit(
            train_set,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=val_set,
            callbacks=callbacks,
        )

        # add fine tune history to train history
        self.history['accuracy'] += fine_tune_history.history['accuracy']
        self.history['loss'] += fine_tune_history.history['loss']
        self.history['val_accuracy'] += fine_tune_history.history['val_accuracy']
        self.history['val_loss'] += fine_tune_history.history['val_loss']

        epochs = len(fine_tune_history.history['val_loss'])
        self.epochs += epochs
        self.fine_tune_epochs += epochs

    def plot_history(self):
        """
        Method to plot model training history
        """

        fig, axes = plt.subplots(2, 1, figsize=(16, 8), sharex=True)

        axes[0].plot(self.history['accuracy'])
        axes[0].plot(self.history['val_accuracy'])
        # draw line at fine tune position
        if self.fine_tune_epochs > 0:
            axes[0].axvline(
                x=self.epochs - self.fine_tune_epochs, color='red', linestyle='--'
            )

        axes[0].set_ylabel('Accuracy')

        axes[1].plot(self.history['loss'])
        axes[1].plot(self.history['val_loss'])

        # draw line at fine tune position
        if self.fine_tune_epochs > 0:
            axes[1].axvline(
                x=self.epochs - self.fine_tune_epochs, color='red', linestyle='--'
            )

        axes[1].set_xlabel('Epoch')
        axes[1].set_ylabel('Loss')

        fig.suptitle('Model performance during training')
        axes[0].legend(('Train accuracy', 'Validation accuracy'))
        axes[1].legend(('Train loss', 'Validation loss'))

    def save_model(self, path='/kaggle/working/new_saved_model'):
        """
        Method to save model after training
        """
        save_path = os.path.join(path, f'{self.model.name}.h5')
        self.model.save(save_path)
        return save_path


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.metrics import classification_report


class Evaluator:

    def __init__(self, model, test_set, y_true):
        self.model = model

        self.test_loss, self.test_acc = self.model.evaluate(
            test_set, verbose=1)

        y_pred = self.model.predict(test_set)
        y_pred = np.argmax(y_pred, axis=1)

        self.cm = confusion_matrix(y_true, y_pred)

        self.report = classification_report(y_true, y_pred)

    def evaluate(self):
        """
        Method to evaluate model and return loss and accuracy
        """
        return self.test_loss, self.test_acc

    def confusion_matrix(self):
        """
        Method to plot confusion matrix
        """
        sns.heatmap(self.cm, annot=True, fmt="d")
        plt.show()

    def classification_report(self):
        """
        Method to print classification report on test set
        """
        print(self.report)

In [None]:
custom_model = CustomModels(
    num_classes=2, input_shape=(256, 256, 3), name='mobilenet_new_50_12'
)
custom_model.build()
custom_model.visualize()

In [None]:
custom_model.summary()

In [None]:
# Define hyperparameters for model trainer
import math
import os

epochs = 30
batch_size = 256

metrics = ['accuracy']
loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)

initial_learning_rate = 0.01
optimizer = tf.keras.optimizers.Adam(initial_learning_rate)

def lr_step_decay(epoch, lr):
    drop_rate = 0.5
    epochs_drop = 5.0
    return initial_learning_rate * math.pow(drop_rate, math.floor(epoch / epochs_drop))

checkpoint_filepath = os.path.join(
    '/kaggle/working/checkpoints',
    custom_model.model.name,
    "training",
    "{epoch:03d}-{val_loss:.4f}.keras",
)

os.makedirs(checkpoint_filepath, exist_ok=True)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False,
    monitor='val_loss',
    mode='min',
    save_best_only=True,
)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', mode='min', verbose=1, patience=10
)
lr_schedule = tf.keras.callbacks.LearningRateScheduler(lr_step_decay, verbose=1)

In [None]:
# Compile and train model
trainer = Trainer(custom_model.model, loss, optimizer, metrics)

# Compile model
trainer.compile()

# Train model
trainer.fit(train_set=train_set, val_set=val_set, batch_size=batch_size, epochs=epochs, callbacks=[lr_schedule, early_stopping, checkpoint])

In [None]:
trainer.plot_history()

In [None]:
evaluate = Evaluator(trainer.model, test_set, labels)

In [None]:
evaluate.confusion_matrix()

In [None]:
evaluate.classification_report()

In [None]:
trainer.get_model_info()

In [None]:
# Define hyperparameters for finetuning
import os
import math

epochs = 12
batch_size = 256


metrics = ['accuracy']
loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)

initial_learning_rate = 0.001
new_optimizer = tf.keras.optimizers.Adam(initial_learning_rate)

def lr_step_decay_2(epoch, lr):
    drop_rate = 0.5
    epochs_drop = 3.0
    return initial_learning_rate * math.pow(drop_rate, math.floor(epoch / epochs_drop))

checkpoint_filepath = os.path.join(
    '/kaggle/working/checkpoints',
    custom_model.model.name,
    "finetuning",
    "{epoch:03d}-{val_loss:.4f}.keras",

)

os.makedirs(checkpoint_filepath, exist_ok=True)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False,
    monitor='val_loss',
    mode='min',
    save_best_only=True,
)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', mode='min', verbose=1, patience=6
)
lr_schedule = tf.keras.callbacks.LearningRateScheduler(lr_step_decay_2, verbose=1)

In [None]:
# Fine tune model
trainer.fine_tune(train_set=train_set, val_set=val_set, batch_size=batch_size, epochs=epochs, new_optimizer= new_optimizer,
                  callbacks=[lr_schedule, early_stopping, checkpoint], unfreeze_position=200)

In [None]:
trainer.plot_history()

In [None]:
evaluate = Evaluator(trainer.model, test_set, labels)

In [None]:
evaluate.confusion_matrix()

In [None]:
evaluate.classification_report()

In [None]:
trainer.model.save("/kaggle/working/new_saved_model/model.keras")

In [None]:
import numpy as np
from tensorflow.keras.utils import load_img, img_to_array
from keras.models import load_model


def preprocess_image(image_path, target_size=(256, 256)):
    img = load_img(image_path, target_size=target_size)
    x = img_to_array(img)
    x = np.expand_dims(x, axis=0)
    return x


def predict_image(image_data, model_path):
    model = load_model(model_path, compile=False)
    prediction = model.predict(image_data)
    print(prediction)
    predicted_class = np.argmax(prediction)

    if predicted_class == 0:
        predicted_label = "real"
    else:
        predicted_label = "fake"

    return predicted_label

In [None]:
# download a fake image to test
from IPython.display import Image
import requests

URL = "https://cbsaustin.com/resources/media/8b2d4079-8b82-465b-9514-8da9af8d5b49-full1x1_AP23326229180956.jpg"
response = requests.get(URL)

image_name = 'real.png'

with open(image_name, "wb") as f:
    f.write(response.content)

image = Image(image_name)
display(image)

In [None]:
# Preprocess image
image_name = 'real.png'
image = preprocess_image(image_name)
model_path = '/kaggle/working/new_saved_model/model.keras'
predicted_class = predict_image(image, model_path)
print("Predicted class:", predicted_class)

In [None]:
# download a fake image to test
from IPython.display import Image
import requests
URL = "https://thispersondoesnotexist.com/"
response = requests.get(URL)

image_name = 'fake1.png'

with open(image_name, "wb") as f:
    f.write(response.content)

image = Image(image_name)
display(image)

In [None]:
# Preprocess image
image_name = 'fake1.png'
image = preprocess_image(image_name)
model_path = '/kaggle/working/new_saved_model/model.keras'
predicted_class = predict_image(image, model_path)
print("Predicted class:", predicted_class)