In [None]:
#  Copyright 2024 Marko Shevchuk, Yevhen Selepii, Marian Starovoitov, Vadym
#  Tsvyk, Nataliia Tymkiv, Nadiia Honcharyk 

#  This file is part of cifar-10-stable-diffusion-detection on GitHub.

#  cifar-10-stable-diffusion-detection is free software: you can redistribute
#  it and/or modify it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.

#  cifar-10-stable-diffusion-detection is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU General Public License
#  along with cifar-10-stable-diffusion-detection. If not, see http://www.gnu.org/licenses/.


In [11]:
import pandas as pd
import numpy as np
import os
import tensorflow as tf
from keras.utils import image_dataset_from_directory
from tensorflow import keras
from keras import layers
from keras.activations import swish
from keras.callbacks import TensorBoard
import time
import matplotlib.pyplot as plt
from matplotlib import gridspec

In [12]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
!ls "/content/drive/MyDrive/COLAB/cifake"

logs  projects	test  train


In [14]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  0


In [15]:
input_dir = "/content/drive/MyDrive/COLAB/cifake/" #! Drive path
working_dir = "/content/drive/MyDrive/COLAB/cifake/" #! Drive path

In [16]:
NAME = "cifakeCNN{}".format(time.strftime("%Y%m%d-%H%M%S"))
tensorboard = TensorBoard(log_dir="logs/{}".format(NAME))

In [17]:
# gpus = tf.config.list_physical_devices('GPU')
# if gpus:
#   # """Restrict TensorFlow to only allocate 1GB of memory on the first GPU"""
#   try:
#     tf.config.set_logical_device_configuration(
#         gpus[0],
#         [tf.config.LogicalDeviceConfiguration(memory_limit=1024)])
#     logical_gpus = tf.config.list_logical_devices('GPU')
#     print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
#   except RuntimeError as e:
#     # Virtual devices must be set before GPUs have been initialized
#     print(e)

In [18]:
#! """When running locally, limits the threads to 1 (only uses one thread of one CPU core)"""
# tf.config.threading.set_intra_op_parallelism_threads(1)
# tf.config.threading.set_inter_op_parallelism_threads(1)

In [19]:
#! """When running on Kaggle, copies the input directory cifake_hyperband to the working directory for
#! tuner continuity / cumulative session training (loading backups from the BackupAndRestore callback)."""
# from distutils.dir_util import copy_tree
# source_dir = "/kaggle/input/cifake_hyperband/"
# destination_dir = "/kaggle/working/"
# copy_tree(source_dir, destination_dir)

In [None]:
ds_train = image_dataset_from_directory(
    f'{input_dir}train',
    labels='inferred',
    label_mode='binary',
    image_size=[32,32],
    interpolation='nearest',
    batch_size=32,
    shuffle=True,
#     seed=69,
    #validation_split=None,
    subset=None,
    follow_links=False,
    crop_to_aspect_ratio=False,
)

In [None]:
ds_test = image_dataset_from_directory(
    f'{input_dir}test',
    labels='inferred',
    label_mode='binary',
    image_size=[32,32],
    interpolation='nearest',
    batch_size=32,
    shuffle=True,
#     seed=69,
    #validation_split=None,
    subset=None,
    follow_links=False,
    crop_to_aspect_ratio=False,
)

In [None]:
def convert_to_float(image, label):
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    return image, label

AUTOTUNE = tf.data.experimental.AUTOTUNE


In [None]:
ds_train = (
    ds_train
    .map(convert_to_float)
    .cache()
    .prefetch(buffer_size=AUTOTUNE)
)


In [None]:
ds_valid = (
    ds_test
    .map(convert_to_float)
    .cache()
    .prefetch(buffer_size=AUTOTUNE)
)

In [None]:
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=14,
    min_delta=0.001,
    restore_best_weights=True)

In [None]:
from keras.callbacks import ReduceLROnPlateau

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=7, min_lr=0.0008)

In [None]:
from keras.callbacks import ModelCheckpoint
timestamp = time.strftime("%Y%m%d-%H%M%S")
model_checkpoint = ModelCheckpoint(
    f'{working_dir}/best/best_model_{timestamp}.keras', monitor='val_loss', save_best_only=True, mode='min'
)

In [None]:
from keras.callbacks import LearningRateScheduler
def scheduler(epoch, lr):
    if epoch < 26:
        return lr
    else:
        return lr * 0.97 if lr > 0.001 else lr * 2

schedule_lr = LearningRateScheduler(scheduler)

In [None]:
backup_dir = f"{working_dir}/backup/latest"
backup_callback = keras.callbacks.BackupAndRestore(
    backup_dir, save_freq="epoch", delete_checkpoint=True
)

In [None]:
def MBConv6(input_tensor, hp=None, out_channels=32, expansion=6, use_se=True, dropout_rate=0.0):
    # inverted residual structure
    # pointwise convolution 1 expansion
    x = layers.Conv2D(expansion * input_tensor.shape[-1], (1,  1), padding='same', use_bias=False)(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(swish)(x)

    # Depthwise Separable Convolution
    x = layers.DepthwiseConv2D(kernel_size=(3,  3), strides=(1,  1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(swish)(x)

    # pointwise convolution 2 bottleneck
    x = layers.Conv2D(out_channels, (1,  1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    # Dropout
    if dropout_rate >  0.0:
        x = layers.Dropout(dropout_rate)(x)

    # bottleneck
    x = layers.Conv2D(out_channels, (1,  1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    # Squeeze-and-Excitation
    if use_se:
        se_shape = (1,  1, out_channels)
        se = layers.GlobalAveragePooling2D()(x)
        se = layers.Reshape(se_shape)(se)
        se = layers.Conv2D(out_channels //  4, (1,  1), padding='same', use_bias=True)(se)
        se = layers.Activation(swish)(se)
        se = layers.Conv2D(out_channels, (1,  1), padding='same', use_bias=True)(se)
        se = layers.Activation('sigmoid')(se)
        x = layers.Multiply()([x, se])

    # Residual
    if input_tensor.shape[-1] == out_channels:
        shortcut = input_tensor
    else:
        shortcut = layers.Conv2D(out_channels, (1,  1), strides=(1,  1), padding='same', use_bias=False)(input_tensor)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])

    return x
input_tensor = layers.Input(shape=(32,  32,  16))
output_tensor = MBConv6(input_tensor)
model = keras.models.Model(inputs=input_tensor, outputs=output_tensor)

model.summary()

In [None]:

def MBConv1(input_tensor, hp=None, expansion=4, out_channels=16, strides=(1,   1), use_se=True):

    x = layers.Conv2D(expansion, (1,   1), padding='same', use_bias=False)(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(swish)(x)


    x = layers.DepthwiseConv2D(kernel_size=(3,   3), strides=strides, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(swish)(x)


    x = layers.Conv2D(out_channels, (1,   1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    # Squeeze-and-Excitation
    if use_se:
        se_shape = (1,  1, out_channels)
        se = layers.GlobalAveragePooling2D()(x)
        se = layers.Reshape((1,  1, out_channels))(se)
        se = layers.Conv2D(out_channels //  2, (1,  1), padding='same', use_bias=True)(se)
        se = layers.Activation(swish)(se)
        se = layers.Conv2D(out_channels, (1,  1), padding='same', use_bias=True)(se)
        se = layers.Activation('sigmoid')(se)
        se = layers.Reshape(se_shape[1:])(se)  # Reshape to original
        x = layers.Multiply()([x, se])

    # Residual
    if input_tensor.shape[-1] == out_channels and strides == (1,   1):
        x = layers.Add()([x, input_tensor])
    else:
        input_tensor = layers.Conv2D(out_channels, (1,   1), strides=strides, padding='same', use_bias=False)(input_tensor)
        x = layers.Add()([x, input_tensor])

    return x

input_tensor = layers.Input(shape=(32,  32,  3))
output_tensor = MBConv1(input_tensor)
model = keras.models.Model(inputs=input_tensor, outputs=output_tensor)

model.summary()

In [None]:
!pip install keras-tuner
import keras_tuner
from keras_tuner import RandomSearch, Hyperband
from keras.models import Model
from keras.metrics import Accuracy, Precision, Recall, F1Score

def build_model(hp=None):
    activation_function = 'swish'  #gelu, silu, leaky_relu etc were worse
    input_tensor = layers.Input(shape=[32,  32,  3])
    x = layers.Reshape(target_shape=(32, 32, 3))(input_tensor)

    if hp.Boolean('PREPROCESSING_horizontal'):
        x = layers.RandomFlip(mode='horizontal')(x)
    if hp.Boolean('PRECONV_norm'):
        x = layers.BatchNormalization()(x)
    x = MBConv6(input_tensor, hp=hp, out_channels=hp.Int('out_channels_2', min_value=20, max_value=44, step=12),
                expansion=hp.Int('expansion_2', min_value=4, max_value=8, step=2),
                use_se=hp.Boolean('use_se_2'),
                dropout_rate=hp.Float('conv6_dropout_rate_2', min_value=0.0, max_value=0.2, step=0.1))

    x = layers.Conv2D(filters=hp.Int('conv_filter', min_value=32, max_value=96, step=64),
                      kernel_size=hp.Choice('conv_kernel', values=[3,  7]),
                      activation=activation_function, padding='same',
                      kernel_regularizer=keras.regularizers.l2(0.003) if hp.Boolean('l2_reg') else None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPool2D()(x)
    x = layers.Conv2D(filters=hp.Int('conv_filter', min_value=32, max_value=96, step=64),
                      kernel_size=hp.Choice('conv_kernel', values=[3,  7]),
                      activation=activation_function, padding='same',
                      kernel_regularizer=keras.regularizers.l2(0.003) if hp.Boolean('l2_reg') else None)(x)
    if hp.Boolean('use_maxpool'):
        x = layers.MaxPool2D()(x)
    x = layers.MaxPool2D()(x)
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)


    for i in range(hp.Int('num_dense_layers', min_value=4, max_value=12, step=2)):
        x = layers.Dense(hp.Int(f'dense_units', min_value=256, max_value=512, step=256),
                         activation=activation_function,
                         kernel_regularizer=keras.regularizers.l2(0.003) if hp.Boolean('l2_reg') else None)(x)
        x = layers.Dropout(hp.Float(f'dropout_{i}_rate', min_value=0.0, max_value=0.6, step=0.1))(x)

    x = layers.Dense(1, activation='sigmoid')(x)
    optimizer_choice = hp.Choice('optimizer', values=['Adam', 'Adamax'])#'Adagrad', 'Adadelta'

    if optimizer_choice == 'Adamax':
        optimizer = tf.keras.optimizers.Adamax(learning_rate=hp.Float('adamax_learning_rate', min_value=0.002, max_value=0.01, sampling='LOG'))
    elif optimizer_choice == 'Adam':
        optimizer = tf.keras.optimizers.Adam(learning_rate=hp.Float('adam_learning_rate', min_value=0.0035, max_value=0.01, sampling='LOG'))


    model = Model(inputs=input_tensor, outputs=x)
    model.compile(optimizer=optimizer_choice,
                  loss='binary_crossentropy',
                  metrics=['binary_accuracy',
                      Precision(name='precision'),
                      Recall(name='recall'),
                      F1Score(name='f1_score')])
    return model


In [None]:
tuner = Hyperband(
    build_model,
    objective='val_binary_accuracy',
    max_epochs=2,
    factor=3,
    directory=f'{working_dir}/projects',
    project_name='cifake_hyperband')

In [None]:
tuner.search(ds_train, epochs=2, validation_data=ds_valid, callbacks=[early_stopping,  reduce_lr, schedule_lr, backup_callback]) #,tensorboard, verbose=0 model_checkpoint

best_model = tuner.get_best_models(num_models=1)[0]

In [None]:
best_model.summary()

In [None]:
tuner.results_summary()

In [None]:
best_model.save(f"{working_dir}/BEST{NAME}.keras")

In [None]:

best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]

# Output the best model's metrics
best_metrics = best_model.evaluate(ds_valid)
print("Validation Loss:", best_metrics[0])
print("Validation Binary Accuracy:", best_metrics[1])
print("Validation Precision:", best_metrics[2])
print("Validation Recall:", best_metrics[3])
print("Validation F1 Score:", best_metrics[4])

In [None]:
drive.flush_and_unmount()
print('All changes made in this colab session should now be visible in Drive.')