# Final Project

#### ``Objectives``


#### ``Motivation``

#### ``Data``

---
### Step 1: Import libraries

In [None]:
# standard
import os
import numpy as np
import pandas as pd

# tf and keras
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

# sklearn
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn import utils as sk_utils

# plots
import seaborn as sns
import matplotlib.pyplot as plt

---
### Step 2: Data ingestion

In [None]:
# Define paths containing the images
data_dir = "data"
fake_dir = os.path.join(data_dir, "fake")
real_dir = os.path.join(data_dir, "real")

def get_dataset(fake_dir, real_dir, balance=False):
    fake_images = [os.path.join(fake_dir, f) for f in os.listdir(fake_dir)]
    real_images = [os.path.join(real_dir, f) for f in os.listdir(real_dir)]

    if balance:
        """Truncates each class to the minimum count"""
        min_images = min(len(fake_images), len(real_images))
        fake_images = fake_images[:min_images]
        real_images = real_images[:min_images]

    image_paths = fake_images + real_images
    labels = ["fake"] * len(fake_images) + ["real"] * len(real_images)
    return image_paths, labels

def split_indices(indices, split_ratio):
    train = int(split_ratio[0] * len(indices))
    val = int(split_ratio[1] * len(indices))

    train_idx = indices[:train]
    val_idx = indices[train : train + val]
    test_idx = indices[train + val :]
    return train_idx, val_idx, test_idx

def split_data(image_paths, labels, split_ratio, seed=42, balance=False):
    image_paths, labels = np.array(image_paths), np.array(labels)

    np.random.seed(seed)

    if not balance:
        idx = np.arange(len(image_paths))
        np.random.shuffle(idx)
        train_idx, val_idx, test_idx = split_indices(idx, split_ratio)

    else:
        """Balance class by class"""
        fake_idx = np.where(labels == "fake")[0]
        real_idx = np.where(labels == "real")[0]

        np.random.shuffle(fake_idx)
        np.random.shuffle(real_idx)

        fake_train, fake_val, fake_test = split_indices(fake_idx, split_ratio)
        real_train, real_val, real_test = split_indices(real_idx, split_ratio)

        train_idx = np.concatenate([fake_train, real_train])
        val_idx = np.concatenate([fake_val, real_val])
        test_idx = np.concatenate([fake_test, real_test])

        np.random.shuffle(train_idx)
        np.random.shuffle(val_idx)
        np.random.shuffle(test_idx)

    return (
        (image_paths[train_idx], labels[train_idx]),
        (image_paths[val_idx], labels[val_idx]),
        (image_paths[test_idx], labels[test_idx]),
    )

def print_dataset(name, labels_array):
    print(f"{name} Dataset: {len(labels_array)}")
    print(f"fake: {np.sum(labels_array == 'fake')}")
    print(f"real: {np.sum(labels_array == 'real')}")

image_paths, labels = get_dataset(fake_dir, real_dir, balance=True)
(train_x, train_y), (val_x, val_y), (test_x, test_y) = split_data(
    image_paths, labels, split_ratio=[0.6, 0.2, 0.2], balance=True
)

print_dataset("Train", train_y)
print_dataset("Validation", val_y)
print_dataset("Test", test_y)
print_dataset("Full", np.concatenate([train_y, val_y, test_y]))

---
### Step 3: Exploratory Data Analysis (EDA)

In [None]:
plt.hist(labels, bins=len(set(labels)), edgecolor='black')
plt.xlabel('Class')
plt.ylabel('Count')
plt.title('Histogram of Labels')
plt.show()

In [None]:
# load images
images = []

for path in image_paths:
    img = load_img(path)
    images.append(img)

real_ind = [i for i, x in enumerate(labels) if x == "real"]
fake_ind = [i for i, x in enumerate(labels) if x == "fake"]

real_images = [images[i] for i in real_ind]
fake_images = [images[i] for i in fake_ind]

In [None]:
# Show real and fake faces
plt.figure(figsize=(10, 5))

# Show 4 real faces
for i in range(4):
    plt.subplot(2, 4, i+1)
    plt.imshow(real_images[i])
    plt.axis('off')
    plt.title("Real")

# Show 4 fake faces
for i in range(4):
    plt.subplot(2, 4, 4 + i + 1)
    plt.imshow(fake_images[i])
    plt.axis('off')
    plt.title("Fake")

plt.suptitle("Real vs Fake Faces", fontsize=16)
plt.show()

In [None]:
# Avg pixel intesity
avg_pixels_real = [np.mean(img) for img in np.array(real_images)]
avg_pixels_fake = [np.mean(img) for img in np.array(fake_images)]

fig, axs = plt.subplots(1, 2, figsize=(12, 5))

# Real
axs[0].hist(avg_pixels_real, range(0, 256+5, 5), color='gray', edgecolor='black')
axs[0].set_xlabel('Average Pixel Intensity')
axs[0].set_ylabel('Number of Images')
axs[0].set_title('Histogram of Average Pixel Intensities (Real)')
axs[0].set_xlim(0, 255)

# Fake
axs[1].hist(avg_pixels_fake, range(0, 256+5, 5), color='gray', edgecolor='black')
axs[1].set_xlabel('Average Pixel Intensity')
axs[1].set_ylabel('Number of Images')
axs[1].set_title('Histogram of Average Pixel Intensities (fake)')
axs[1].set_xlim(0, 255)

plt.show()

In [None]:
fig, ax = plt.subplots(figsize=(8, 5))

ax.hist(avg_pixels_real, bins=range(0, 256+5, 5), label='real', color='blue', alpha=0.5)
ax.hist(avg_pixels_fake, bins=range(0, 256+5, 5), label='fake', color='red', alpha=0.5)
ax.set_title("Histogram of Average Pixel Intensities")
ax.set_xlabel("Average Pixel Intensity")
ax.set_ylabel("Number of Images")
ax.set_xlim(0,255)
ax.legend()

plt.show()

In [None]:
# Average Per channel (RGB)

avg_per_channel_real = [img.mean(axis=(0, 1)) for img in np.array(real_images)]
avg_per_channel_fake = [img.mean(axis=(0, 1)) for img in np.array(fake_images)]

fig, axs = plt.subplots(1, 2, figsize=(12, 5))

axs[0].hist([arr[0] for arr in avg_per_channel_real], bins=range(0, 256+5, 5), color='red', alpha=0.5, label='Red')
axs[0].hist([arr[1] for arr in avg_per_channel_real], bins=range(0, 256+5, 5), color='green', alpha=0.5, label='Green')
axs[0].hist([arr[2] for arr in avg_per_channel_real], bins=range(0, 256+5, 5), color='blue', alpha=0.5, label='Blue')
axs[0].set_title("Histogram of Average Intensity per Channel (Real)")
axs[0].set_xlabel("Average Intensity")
axs[0].set_ylabel("Number of Images")
axs[0].set_xlim(0,255)
axs[0].legend()

axs[1].hist([arr[0] for arr in avg_per_channel_fake], bins=range(0, 256+5, 5), color='red', alpha=0.5, label='Red')
axs[1].hist([arr[1] for arr in avg_per_channel_fake], bins=range(0, 256+5, 5), color='green', alpha=0.5, label='Green')
axs[1].hist([arr[2] for arr in avg_per_channel_fake], bins=range(0, 256+5, 5), color='blue', alpha=0.5, label='Blue')
axs[1].set_title("Histogram of Average Intensity per Channel (Fake)")
axs[1].set_xlabel("Average Intensity")
axs[1].set_ylabel("Number of Images")
axs[1].set_xlim(0,255)
axs[1].legend()
plt.show()

---
### Step 4: Data preprocessing

In [None]:
train_df = pd.DataFrame({"image_path": train_x, "label": train_y})
val_df = pd.DataFrame({"image_path": val_x, "label": val_y})
test_df = pd.DataFrame({"image_path": test_x, "label": test_y})

# Image properties
img_height, img_width = 300, 300
batch_size = 32

# Data augmentation and loading
train_datagen = ImageDataGenerator(
    rescale=1.0 / 255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
)
val_test_datagen = ImageDataGenerator(rescale=1.0 / 255)

train_set = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col="image_path",
    y_col="label",
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode="binary",
)

val_set = val_test_datagen.flow_from_dataframe(
    dataframe=val_df,
    x_col="image_path",
    y_col="label",
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode="binary",
)

test_set = val_test_datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col="image_path",
    y_col="label",
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode="binary",
    shuffle=False,
)

---
### Step 5: Modeling

In [None]:
# Define a custom early stopping class
# Early stopping callback: stops training when both train and validation accuracy are high enough
class CustomEarlyStopping(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        stop_acc = 0.9
        train_acc = logs.get("accuracy")
        val_acc = logs.get("val_accuracy")
        if train_acc >= stop_acc and val_acc >= stop_acc:
            print(
                f"\nStopping early at epoch {epoch + 1} - train accuracy: {train_acc:.4f}, validation accuracy: {val_acc:.4f}"
            )
            self.model.stop_training = True

In [None]:
# Baseline FFNN Model
baseline_model = Sequential()

baseline_model.add(Flatten())
baseline_model.add(Dense(units=128, activation="relu"))
baseline_model.add(Dense(units=128, activation="relu"))
baseline_model.add(Dense(units=1, activation="sigmoid"))

# Compile the model
learning_rate = 0.0001
baseline_model.compile(
    optimizer=Adam(learning_rate=learning_rate),
    loss="binary_crossentropy",
    metrics=["accuracy"],
)

# Train the model
max_epochs = 50
history = baseline_model.fit(
    train_set,
    validation_data=val_set,
    epochs=max_epochs,
    callbacks=[CustomEarlyStopping()],
)

In [None]:
# Define the CNN
model = Sequential()

model.add(
    Conv2D(
        filters=32,
        kernel_size=(3, 3),
        activation="relu",
        input_shape=(img_height, img_width, 3),
    )
)
model.add(MaxPooling2D(pool_size=2, strides=2))

model.add(Conv2D(filters=64, kernel_size=(3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=2, strides=2))

model.add(Conv2D(filters=128, kernel_size=(3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=2, strides=2))

model.add(Flatten())
model.add(Dense(units=128, activation="relu"))
model.add(Dropout(rate=0.5))

model.add(Dense(units=1, activation="sigmoid"))  # Binary classification

# Compile the model
learning_rate = 0.0001
model.compile(
    optimizer=Adam(learning_rate=learning_rate),
    loss="binary_crossentropy",
    metrics=["accuracy"],
)

# Train the model
max_epochs = 50
history = model.fit(
    train_set,
    validation_data=val_set,
    epochs=max_epochs,
    callbacks=[CustomEarlyStopping()],
)

---
### Step 6: Evaluation

In [None]:
# Evaluate on test set
test_loss, test_acc = model.evaluate(test_set)
print(f"Test Accuracy: {test_acc:.4f}")