In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from PIL import Image
from tensorflow.keras import regularizers
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Dropout, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.utils import plot_model

import numpy as np

from io import BytesIO


In [None]:
import os
import random
import shutil

# Set the path to the main folder containing real and fake subfolders
main_folder = '/media/real/data/dataset'

# Set the paths to the train and test folders
train_folder = 'dataset/train'
test_folder = 'dataset/test'

# Set the train/test split ratio
train_ratio = 0.8

# Create train and test folders if they don't exist
os.makedirs(train_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

balance = True

subfolders = ['real', 'fake', 'blurred', 'low_quality', 'distorted']


# Function to copy files from source to destination folder
def copy_files(file_list, src_folder, dest_folder):
    for file_name in file_list:

        #check if the image file is valid
        try:
            img = Image.open(os.path.join(src_folder, file_name)) # open the image file
            img.verify() # verify that it is, in fact an image
        except (IOError, SyntaxError) as e:
            print('Bad file:', file_name) # print out the names of corrupt files
            continue
        src_path = os.path.join(src_folder, file_name)
        dest_path = os.path.join(dest_folder, file_name)
        shutil.copy(src_path, dest_path)

# Iterate through the subfolders in the main folder
for subfolder in subfolders:
    subfolder_path = os.path.join(main_folder, subfolder)
    files = os.listdir(subfolder_path)
    random.shuffle(files)

    # Split the files into train and test sets
    train_files = files[:int(train_ratio * len(files))]
    test_files = files[int(train_ratio * len(files)):]

    # Copy train files to the train folder
    train_subfolder_path = os.path.join(train_folder, subfolder)
    os.makedirs(train_subfolder_path, exist_ok=True)
    copy_files(train_files, subfolder_path, train_subfolder_path)

    # Copy test files to the test folder
    test_subfolder_path = os.path.join(test_folder, subfolder)
    os.makedirs(test_subfolder_path, exist_ok=True)
    copy_files(test_files, subfolder_path, test_subfolder_path)

In [None]:

# Load ResNet50 without the top classification layers
# Load InceptionV3 without the top classification layers
base_model = InceptionV3(input_shape=(299, 299, 3), include_top=False, weights='imagenet')
#base_model = ResNet50(input_shape=(224, 224, 3), include_top=False, weights='imagenet')

# Define the number of layers to unfreeze
num_layers_to_unfreeze = 150  # For example, unfreeze the first 100 layers

# Unfreeze layers starting from the specified index
for layer in base_model.layers[:num_layers_to_unfreeze]:
    layer.trainable = True
for layer in base_model.layers[num_layers_to_unfreeze:]:
    layer.trainable = False

# Add custom classification layers on top of the base model
num_classes = 5  # Number of classes: real, fake, blurred, low_quality, distorted
model = tf.keras.Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # Use softmax activation for multi-class
])
plot_model(model, to_file='model_architecture.png', show_shapes=True, show_layer_names=True, expand_nested=True)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, concatenate, GlobalAveragePooling2D, Dense, Dropout

def inception_module(x, filters):
    # 1x1 Conv
    conv1 = Conv2D(filters=filters[0], kernel_size=(1, 1), padding='same', activation='relu')(x)

    # 1x1 Conv followed by 3x3 Conv
    conv3 = Conv2D(filters=filters[1], kernel_size=(1, 1), padding='same', activation='relu')(x)
    conv3 = Conv2D(filters=filters[2], kernel_size=(3, 3), padding='same', activation='relu')(conv3)

    # 1x1 Conv followed by 5x5 Conv
    conv5 = Conv2D(filters=filters[3], kernel_size=(1, 1), padding='same', activation='relu')(x)
    conv5 = Conv2D(filters=filters[4], kernel_size=(5, 5), padding='same', activation='relu')(conv5)

    # 3x3 MaxPooling followed by 1x1 Conv
    pool = MaxPooling2D(pool_size=(3, 3), strides=(1, 1), padding='same')(x)
    pool = Conv2D(filters=filters[5], kernel_size=(1, 1), padding='same', activation='relu')(pool)

    # Concatenate all the branches
    output = concatenate([conv1, conv3, conv5, pool], axis=-1)
    return output

# Create the model
input_layer = Input(shape=(480, 640, 3))

x = Conv2D(filters=64, kernel_size=(7, 7), padding='same', strides=(2, 2), activation='relu')(input_layer)
x = MaxPooling2D(pool_size=(3, 3), padding='same', strides=(2, 2))(x)

x = inception_module(x, filters=[64, 128, 128, 32, 32, 32])
x = inception_module(x, filters=[128, 256, 256, 64, 64, 64])

x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(5, activation='softmax')(x)  # Assuming 5 classes as in your previous example

custom_model = tf.keras.Model(inputs=input_layer, outputs=x)
custom_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
plot_model(custom_model, to_file='./model_architecture.png', show_shapes=True, show_layer_names=True, expand_nested=True)

custom_model.summary()

In [None]:

# Specify the paths to the dataset folders
train_data_dir = 'dataset/train'
test_data_dir = 'dataset/test'

# Set the image size and batch size
image_size = (480, 640)
#image_size = (299, 299)
batch_size = 16

# Create an ImageDataGenerator for data augmentation and preprocessing
data_generator = ImageDataGenerator(rescale=1./255)

# List of class names
class_names = ['real', 'fake', 'blurred', 'low_quality', 'distorted']

# Load and prepare the training data
train_data = data_generator.flow_from_directory(
    train_data_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',  # Use 'categorical' for multi-class classification
    classes=class_names,       # List of class names
    shuffle=True
)
# Load and prepare the testing data
test_data = data_generator.flow_from_directory(
    test_data_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    classes=class_names,
    shuffle=False
)

In [None]:
 # Define the number of training and testing steps per epoch
train_steps_per_epoch = train_data.samples // batch_size
test_steps_per_epoch = test_data.samples // batch_size
# Define the paths for saving the checkpoints and the best model
checkpoint_path = "checkpoint/model_checkpoint"
best_model_path = "best_model/best_model_savedmodel"

# Ensure the checkpoint directory exists
os.makedirs(os.path.dirname(checkpoint_path), exist_ok=True)

# Create the ModelCheckpoint callback to save the model at the end of each epoch
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',  # Save based on validation loss
    save_best_only=True,  # Save only the best model
    save_weights_only=False,  # Save the entire model
    mode='min',  # Minimize validation loss
    verbose=1
)
# early_stopping_callback = EarlyStopping(
#     monitor='val_loss',
#     patience=5,  # Number of epochs with no improvement before stopping
#     mode='min',  # Minimize validation loss
#     verbose=1
# )
# Create the ReduceLROnPlateau callback
reduce_lr_callback = ReduceLROnPlateau(
    monitor='val_loss',  # Monitor validation loss
    factor=0.5,           # Reduce learning rate by half
    patience=3,           # Number of epochs with no improvement before reducing LR
    min_lr=1e-6,          # Minimum learning rate
    verbose=1
)

# Train the model
history=custom_model.fit(
    train_data,
    steps_per_epoch=train_steps_per_epoch,
    epochs=50,
    validation_data=test_data,
    validation_steps=test_steps_per_epoch,
    callbacks=[checkpoint_callback, reduce_lr_callback]
)

custom_model_best = tf.keras.models.load_model(checkpoint_path)  # Load the best model from the checkpoint


custom_model_best.save('best_model(480x640).savedmodel')
custom_model.save('last_model(480x640).savedmodel')

In [None]:

best_model = tf.keras.models.load_model(checkpoint_path)  # Load the best model from the checkpoint
best_model.save(best_model_path)

best_model.save('best_model(480x640).savedmodel')
model.save('last_model.savedmodel')

In [None]:
 #last_layer_name 
print(model.layers[-1].name)

In [None]:
# Plot the training and validation loss over epochs
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

# Plot the training and validation accuracy over epochs
plt.figure(figsize=(10, 6))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')
plt.show()

In [None]:
val_loss, val_acc = best_model.evaluate(test_data)
print("Validation Loss:", val_loss)
print("Validation Accuracy:", val_acc)

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
y_pre= best_model.predict(test_data)
y_pred = np.argmax(y_pre, axis=1)
print('Confusion Matrix')
print(confusion_matrix(test_data.classes, y_pred))
print('Classification Report')
target_names = ['real', 'fake', 'blurred', 'low_quality', 'distorted']
print(classification_report(test_data.classes, y_pred, target_names=target_names))

In [None]:
#round y_pre first prediction
y_pre_round = np.round(y_pre)
print(y_pre_round)

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Define the paths for the validation data
validation_folder = '/home/real/datasets/validation'
real_folder = os.path.join(validation_folder, 'real')
fake_folder = os.path.join(validation_folder, 'fake')


best_model_path = "best_model/best_model_savedmodel"
best_model = tf.keras.models.load_model(best_model_path)
# Load the model
# Function to preprocess the image
def preprocess_image(image_path, image_size):
    img = load_img(image_path, target_size=image_size)
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = img_array / 255.0
    return img_array

# Function to predict on the validation data
def predict_on_validation_data(validation_folder, image_size, model):
    real_images = []
    fake_images = []

    for image_file in os.listdir(real_folder):
        image_path = os.path.join(real_folder, image_file)
        real_images.append(preprocess_image(image_path, image_size))

    for image_file in os.listdir(fake_folder):
        image_path = os.path.join(fake_folder, image_file)
        fake_images.append(preprocess_image(image_path, image_size))

    real_images = np.vstack(real_images)
    fake_images = np.vstack(fake_images)

    real_predictions = model.predict(real_images)
    fake_predictions = model.predict(fake_images)

    return real_predictions, fake_predictions

# Set the image size for the model
# image_size = (640, 480)  # Replace with the appropriate image size based on your model's input requirements
image_size = (299, 299)
def plot_images_and_predictions(images, predictions, title):
    plt.figure(figsize=(12, 8))
    for i in range(len(images)):
        # plt.subplot( 2, len(images)//2, i+1)
        plt.subplot( len(images)//2, 4, i+1)
        # image in rgb format
        # plt.imshow(mpimg.imread(images[i]))
        plt.imshow(cv2.cvtColor(cv2.imread(images[i]), cv2.COLOR_BGR2RGB))
        plt.title(f'Prediction: {predictions[i][0]:.2f}')
        plt.axis('off')
    plt.suptitle(title, fontsize=16)
    plt.show()
# Run validation on the data
real_predictions, fake_predictions = predict_on_validation_data(validation_folder, image_size, model)

# Assuming the model is a binary classifier, you can calculate the accuracy for both classes
real_accuracy = np.mean(real_predictions > 0.5)
fake_accuracy = np.mean(fake_predictions < 0.5)

print("Real Accuracy:", real_accuracy)
print("Fake Accuracy:", fake_accuracy)
# Get the file paths for the real and fake images
real_images_paths = [os.path.join(real_folder, image_file) for image_file in os.listdir(real_folder)]
fake_images_paths = [os.path.join(fake_folder, image_file) for image_file in os.listdir(fake_folder)]

# Show images and associated predictions in a grid
plot_images_and_predictions(real_images_paths[:10], real_predictions[:10], title="Real Images and Predictions")
plot_images_and_predictions(fake_images_paths[:30], fake_predictions[:30], title="Fake Images and Predictions")

In [None]:
# get first layer name
print(model.layers[0].name)

# get last layer name
print(model.layers[-1].name)


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define your image size
image_size = (224, 224)  # Adjust this to your desired image size

# Define the path to the main folder containing 'real' and 'fake' subfolders
data_dir = 'path/to/dataset'

# Define the batch size for the data generator
batch_size = 32

# Use ImageDataGenerator for data loading and augmentation
datagen = ImageDataGenerator(rescale=1.0 / 255, validation_split=0.2)  # Normalize pixel values and split for validation

# Load the data from the main folder, split into training and validation sets
train_generator = datagen.flow_from_directory(
    data_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='training'
)

validation_generator = datagen.flow_from_directory(
    data_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='validation'
)
# Evaluate the model on the validation set
validation_loss, validation_accuracy = model.evaluate(validation_generator)
print("Validation Loss:", validation_loss)
print("Validation Accuracy:", validation_accuracy)

In [None]:
#save model;
model.save('model.h5')

In [None]:
#load model;
model = tf.keras.models.load_model('model.h5')

In [None]:
model.summary()

In [None]:
# run model on a single image
fake_image = 'dataset/test/real/0c73777d1a944251869584b56d0997ef.jpg'

fake_image = tf.keras.preprocessing.image.load_img(
    fake_image, target_size=image_size
)

fake_image = tf.keras.preprocessing.image.img_to_array(fake_image)
fake_image = np.expand_dims(fake_image, axis=0)
fake_image = fake_image / 255


prediction = model.predict(fake_image)
print("Prediction:", prediction)
print("predicted class:", "fake" if prediction < 0.5 else "real")

In [None]:
#save model tensortrt model for inference
import tensorflow as tf
from tensorflow.python.compiler.tensorrt import trt_convert as trt
from tensorflow.python.saved_model import tag_constants

# Convert the model
converter = trt.TrtGraphConverterV2(
    input_saved_model_dir='model.h5',
    conversion_params=trt.DEFAULT_TRT_CONVERSION_PARAMS)
converter.convert()
converter.save(output_saved_model_dir='model_trt')



In [None]:
#save model tensorflow saved model format
model.save('model.savedmodel')


In [None]:
# get first and last layer of the model name and s