# Installation and importing of relevant packages

In [None]:
# !pip install numpy
# !pip install pandas
# !pip install tensorflow
# !pip install scikit-image

In [None]:
# Import relevant libraries
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import skimage as skimg
import matplotlib.pyplot as plt
from scipy.signal import convolve2d
import os

## Setup file paths

In [None]:
# Show file directory structure
# print(os.listdir('/kaggle/input/cifake-real-and-ai-generated-synthetic-images'))

# Setup file directories
fake_test_dir = '/kaggle/input/cifake-real-and-ai-generated-synthetic-images/test/FAKE/'
real_test_dir = '/kaggle/input/cifake-real-and-ai-generated-synthetic-images/test/REAL/'
fake_train_dir = '/kaggle/input/cifake-real-and-ai-generated-synthetic-images/train/FAKE/'
real_train_dir = '/kaggle/input/cifake-real-and-ai-generated-synthetic-images/train/REAL/'

fake_test_files = os.listdir(fake_test_dir)
real_test_files = os.listdir(real_test_dir)
fake_train_files = os.listdir(fake_train_dir)
real_train_files = os.listdir(real_train_dir)

# Basic data pre-processing

## Construct training data

In [None]:
# Construct real training set
real_train_paths = []

for filename in real_train_files:
    real_train_paths.append(real_train_dir + filename)          # Append full path of each file to real_train_paths

real_train_paths = pd.DataFrame(real_train_paths)               # Construct dataframe using real_train_paths
real_train_paths.columns = ['path']                             # Label column 'path'
real_train_paths['label'] = 0                                   # Add column 'label' with value = 0

print(f"Length of real_train_paths: {len(real_train_paths)}")

# Construct fake training set
fake_train_paths = []

for filename in fake_train_files:
    fake_train_paths.append(fake_train_dir + filename)          # Append full path of each file to fake_train_paths

fake_train_paths = pd.DataFrame(fake_train_paths)               # Construct dataframe using real_train_paths
fake_train_paths.columns = ['path']                             # Label column 'path'
fake_train_paths['label'] = 1                                   # Add column 'label' with value = 1

print(f"Length of fake_train_paths: {len(fake_train_paths)}")

# Combine both sets for general training set
train_set = pd.concat((real_train_paths, fake_train_paths), axis=0)
print(f"train_set shape: {train_set.shape}")

In [None]:
# Shuffle rows
train_set = train_set.sample(frac=1).reset_index(drop=True)

train_set.head()

In [None]:
# Split into training and validation sets
train_set, val_set = np.split(train_set, [int(0.8*len(train_set))])

print(train_set.head())
print(val_set.head())

## Extract pixel data

In [None]:
# Extract training image data
train_set_data = []

for image in train_set['path']:
    image_data = skimg.io.imread(image)             # Extract image data
    train_set_data.append(image_data)               # Append to list for construction of numpy array later on

image_array = np.array(train_set_data)              # Construct numpy array using the list
X_train = image_array/255                           # Normalize values to [0, 1]
print(X_train.shape)                                # Check that we have the right shape

# Extract training image labels
y_train = train_set['label']
y_train.head()

In [None]:
# Extract validation image data
val_set_data = []

for image in val_set['path']:
    image_data = skimg.io.imread(image)             # Extract image data
    val_set_data.append(image_data)                 # Append to list for construction of numpy array later on

image_array = np.array(val_set_data)                # Construct numpy array using the list
X_val = image_array/255                             # Normalize values to [0, 1]
print(X_val.shape)                                         # Check that we have the right shape

# Extract training image labels
y_val = val_set['label']
y_val.head()

## Construct testing data

In [None]:
# Construct real testing set
real_test_paths = []

for filename in real_test_files:
    real_test_paths.append(real_test_dir + filename)            # Append full path of each file to real_test_paths

real_test_paths = pd.DataFrame(real_test_paths)                 # Construct dataframe using real_test_paths
real_test_paths.columns = ['path']                              # Label column 'path'
real_test_paths['label'] = 0                                    # Add column 'label' with value = 0

print(f"Length of real_test_paths: {len(real_test_paths)}")

# Construct fake testing set
fake_test_paths = []

for filename in fake_test_files:
    fake_test_paths.append(fake_test_dir + filename)            # Append full path of each file to fake_test_paths

fake_test_paths = pd.DataFrame(fake_test_paths)                 # Construct dataframe using real_test_paths
fake_test_paths.columns = ['path']                              # Label column 'path'
fake_test_paths['label'] = 1                                    # Add column 'label' with value = 1

print(f"Length of fake_test_paths: {len(fake_test_paths)}")

# Combine both sets for general training set
test_set = pd.concat((real_test_paths, fake_test_paths), axis=0)

print(f"test_set shape: {test_set.shape}")
test_set.sample(5)

## Extract pixel data

In [None]:
# Extract testing image data
test_set_data = []

for image in test_set['path']:
    image_data = skimg.io.imread(image)         # Extract image data
    test_set_data.append(image_data)            # Append to list for construction of numpy array later on

image_array = np.array(test_set_data)           # Construct numpy array using the list
X_test = image_array/255                        # Normalize values to [0, 1]
print(X_test.shape)                             # Check that we have the right shape

# Extract training image labels
y_test = test_set['label']
y_test.head()

# Build baseline model

In [None]:
# Model architecture
base_model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,3)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

base_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

## Train model

In [None]:
# Train model
history = base_model.fit(
    X_train,
    y_train,
    epochs=5,
    validation_data=(X_val, y_val)
)

In [None]:
# Evaluate model
base_model.evaluate(X_test, y_test)

# Grayscale testing

## Convert image data to grayscale

In [None]:
# Grayscale conversion function
def toGrayscale(imgArr):
    ratio = [0.299, 0.587, 0.114]                            # NTSC Formula
    
    grayImgArr = []
    
    for img in imgArr:
        grayImg = []
        
        for y_ind in img:
            grayRow = []
            
            for x_ind in y_ind:
                grayVal = np.dot(x_ind, ratio)               # Use numpy dot to apply ratio/formula
                grayPix = grayVal
                grayRow.append(grayPix)
                
            grayImg.append(grayRow)
            
        grayImgArr.append(grayImg)
        
    return np.array(grayImgArr)

In [None]:
# Convert data to grayscale
X_gs_train = toGrayscale(X_train)
y_gs_train = y_train
X_gs_val = toGrayscale(X_val)
y_gs_val = y_val
X_gs_test = toGrayscale(X_test)
y_gs_test = y_test

## Build model

In [None]:
# Model architecture
gs_model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,1)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

gs_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
gs_history = gs_model.fit(
    X_gs_train,
    y_gs_train,
    epochs=5,    # 50 for real training
    validation_data=(X_gs_val, y_gs_val)
)

In [None]:
gs_model.evaluate(X_gs_test, y_test)

# Frequency Domain extraction testing

## Convert image to frequency domain

In [None]:
def toFreqDomain(imgArr):
    gsArr = toGrayscale(imgArr)
    freqDomArr = []
    
    for img in gsArr:
        fftImg = np.fft.fft2(img)
        fftImgShift = np.fft.fftshift(fftImg)
        imgMagnitude = 20 * np.log(np.abs(fftImgShift))
        
        minMag = np.min(imgMagnitude)
        maxMag = np.max(imgMagnitude)
        normalized = (imgMagnitude - minMag) / (maxMag - minMag)
        
        freqDomArr.append(normalized)
    
    return np.array(freqDomArr)

In [None]:
X_fft_train = toFreqDomain(X_train)
y_fft_train = y_train
X_fft_val = toFreqDomain(X_val)
y_fft_val = y_val
X_fft_test = toFreqDomain(X_test)
y_fft_test = y_test

In [None]:
X_fft_train[0]

## Build model

In [None]:
# Model architecture
fft_model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,1)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

fft_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
fft_history = fft_model.fit(
    X_fft_train,
    y_fft_train,
    epochs=5,    # 50 for real training
    validation_data=(X_fft_val, y_fft_val)
)

# Edge detection testing

## Perform edge detection on image

In [None]:
def edgeDetect(imgArr):
    edgeArr = []
    gsImgArr = toGrayscale(imgArr)
    
    xKernel = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
    yKernel = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]])
    
    for img in gsImgArr:
        xGrad = convolve2d(img, xKernel, mode='same', boundary='symm')
        yGrad = convolve2d(img, yKernel, mode='same', boundary='symm')
        
        edgeMagnitude = np.sqrt(xGrad**2 + yGrad**2)
        
        edgeArr.append(edgeMagnitude)
        
    return np.array(edgeArr)

In [None]:
# Convert data to grayscale
X_edge_train = edgeDetect(X_train)
y_edge_train = y_train
X_edge_val = edgeDetect(X_val)
y_edge_val = y_val
X_edge_test = edgeDetect(X_test)
y_edge_test = y_test

In [None]:
# Model architecture
edge_model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,1)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

edge_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
X_edge_train.shape

In [None]:
edge_history = edge_model.fit(
    X_edge_train,
    y_edge_train,
    epochs=5,    # 50 for real training
    validation_data=(X_edge_val, y_edge_val)
)

In [None]:
edge_model.evaluate(X_edge_test, y_test)

# Hyperparameter testing

## Model A

In [None]:
# Model architecture
model_a = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(128, (3,3), activation='relu', input_shape=(32,32,3)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

model_a.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
# Train model
history_a = model_a.fit(
    X_train,
    y_train,
    epochs=50,    # 50 for real training
    validation_data=(X_val, y_val)
)

In [None]:
model_a.evaluate(X_test, y_test)

## Model B

In [None]:
# Model architecture
model_b = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(64, (3,3), activation='relu', input_shape=(32,32,3)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(16, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

model_b.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
# Train model
history_b = model_b.fit(
    X_train,
    y_train,
    epochs=50,    # 50 for real training
    validation_data=(X_val, y_val)
)

In [None]:
# Evaluate model
model_b.evaluate(X_test, y_test)

# Final model build

In [None]:
# Model architecture
final_model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(64, (3,3), activation='relu', input_shape=(32,32,3)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(16, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

final_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
# Train model
final_history = final_model.fit(
    X_train,
    y_train,
    epochs=50,    # 50 for real training
    validation_data=(X_val, y_val)
)

In [None]:
final_model.evaluate(X_test, y_test)

In [None]:
final_model.save("ai_image_prediction_model.keras")

# DUMMY TESTING

# Building a model that handles multiple inputs

In [None]:
# Base layers
baseInput = keras.Input(shape=(32,32,3))
baseLayers = keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,3))(baseInput)
baseLayers = keras.layers.MaxPooling2D(pool_size=(2, 2))(baseLayers)
baseLayers = keras.layers.Flatten()(baseLayers)
# baseLayers = keras.layers.Dense(10, activation='relu')(baseLayers)
# baseLayers = keras.layers.Dense(1, activation='sigmoid')(baseLayers)

baseModel = keras.Model(inputs=baseInput, outputs=baseLayers, name="Base_model")

# Grayscale layers
gsInput = keras.Input(shape=(32,32,1))
gsLayers = keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu', input_shape=(32,32,3))(gsInput)
gsLayers = keras.layers.MaxPooling2D(pool_size=(2, 2))(gsLayers)
gsLayers = keras.layers.Flatten()(gsLayers)
# gsLayers = keras.layers.Dense(10, activation='relu')(gsLayers)
# gsLayers = keras.layers.Dense(1, activation='sigmoid')(gsLayers)

gsModel = keras.Model(inputs=gsInput, outputs=gsLayers, name="Grayscale_model")

# # FFT layers
# fftInput = keras.Input(32,32,1)

# # Edge detection layers
# edgeInput = keras.Input(32,32,1)

combinedOutput = keras.layers.concatenate([baseModel.output, gsModel.output])
denseLayers = keras.layers.Dense(64)(combinedOutput)
denseLayers = keras.layers.Dense(1)(denseLayers)

combinedModel = keras.Model(inputs=[baseModel.input, gsModel.input], outputs=denseLayers, name="Combined_model")

In [None]:
keras.utils.plot_model(combinedModel, "combinedModel.png")

In [None]:
combinedModel.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(curve='ROC'), 'acc'],
)

In [None]:
combinedHistory = combinedModel.fit(
    x=[X_train, X_gs_train],
    y=y_train,
    epochs=5,
    validation_data=([X_val, X_gs_val], y_val)
)