## Imports

In [None]:
# For OS interaction and system-specific parameters
import os
import sys

# PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, Dataset, WeightedRandomSampler, Subset
from torch.optim.lr_scheduler import StepLR

# Torchvision
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import torchvision.datasets as datasets


# Albumentations for Data Augmentation
import albumentations as A
from albumentations.pytorch import ToTensorV2

# PIL for image operations
from PIL import Image

# Matplotlib for plotting and visualizations
import matplotlib.pyplot as plt

# Import numpy
import numpy as np

# TensorBoardX - TensorBoard for PyTorch
from tensorboardX import SummaryWriter

# CodeCarbon for tracking our carbon emissions
from codecarbon import EmissionsTracker

# tqdm for showing progress bars
from tqdm.notebook import tqdm

# Import Netron for visualizing our model
import netron

# # Add scripts to directory
sys.path.append('/Users/jacob/OneDrive/Desktop/SyntheticEye/Development/src/utils')
# Import custom helper functions from the scripts directory
import helper_functions

## Gain Insights Regarding Data
This this is so we can better understand our data and helps us to decide which fixed image size to choose

In [None]:
# Import necessary function from helper_functions.py
from helper_functions import plot_image_dimensions_bar_graph
from helper_functions import plot_class_distribution
from helper_functions import check_accuracy_aletheia4

### Plot Image Dimensions

In [None]:
# Plotting dimensions of ai-generated images
img_dir = "/Users/jacob/OneDrive/Desktop/Aletheia4Dataset/AI/"
plot_image_dimensions_bar_graph(img_dir, heading='AI Image Dimensions')

In [None]:
# Plotting dimensions of GAN images
img_dir = "/Users/jacob/OneDrive/Desktop/Aletheia4Dataset/GAN/"
plot_image_dimensions_bar_graph(img_dir, heading='GAN Image Dimensions')

In [None]:
# Plotting dimensions of real images
img_dir = "/Users/jacob/OneDrive/Desktop/Aletheia4Dataset/REAL/"
plot_image_dimensions_bar_graph(img_dir, heading='Real Image Dimensions')

### Plot Class Distribution

In [None]:
plot_class_distribution('/Users/jacob/OneDrive/Desktop/Aletheia4Dataset/')

## Prepare Data

In [None]:
# Imports
from helper_functions import show_img

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transforms=None):
        self.img_dir = img_dir
        self.transforms = transforms
        self.img_labels = []
        self.img_names = []

        # Iterate through all classes
        for class_id, class_name in enumerate(os.listdir(img_dir)):
            class_dir = os.path.join(img_dir, class_name)
            # Iterate through all images of a class
            for img in os.listdir(class_dir):
                if img.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
                    self.img_names.append(os.path.join(class_name, img))
                    self.img_labels.append(class_id)

    def __len__(self):
        return len(self.img_names) # Length of the dataset

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_names[idx])
        image = Image.open(img_path).convert('RGB')

        # Apply transforms
        if self.transforms:
            image = np.array(image)  # Convert PIL image to numpy array
            image = self.transforms(image=image)['image']  # Apply albumentations transforms

        label = self.img_labels[idx]
        return image, label

### Apply Data Augmentation
We augment the images in our dataset to make sure they are robust and to prevent overfitting.

In [None]:
train_transforms = A.Compose([
    A.SmallestMaxSize(max_size=304), 
    A.CenterCrop(256, 256), 
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, p=0.5),
    A.PixelDropout(dropout_prob=0.015, p=0.35),
    A.Normalize(mean=[0.499, 0.415, 0.372], std=[0.245, 0.223, 0.220]),
    ToTensorV2()
])

test_transforms = A.Compose([
    A.SmallestMaxSize(max_size=304), 
    A.CenterCrop(256, 256), 
    A.Normalize(mean=[0.499, 0.415, 0.372], std=[0.245, 0.223, 0.220]),
    ToTensorV2()
])

In [None]:
def show_images(dataset, num_images=12):
    # Set up the figure
    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 3, 3))
    
    for i in range(num_images):
        # Get an image from the dataset
        image = dataset[i]
        
        # If the image is a tensor, convert it to a numpy array
        if torch.is_tensor(image):
            image = image.numpy().transpose((1, 2, 0))

        # Display the image
        axes[i].imshow(image)
        axes[i].axis('off')

    plt.show()

### Create Dataset

In [None]:
# Create Dataset
dataset = CustomImageDataset(img_dir="C:\\Users\\jacob\\OneDrive\\Desktop\\Aletheia4Dataset")

In [None]:
num_samples = len(dataset)
print(f"Number of samples in the dataset: {num_samples}")

In [None]:
# Define weights for classes in dataset

classes = 3 # Number of classes in dataset

def define_class_weights(labels, classes):
    count = [0] * classes

    # Count frequency of class labels
    for label in labels:
        count[label] += 1
    class_weights = [0.] * classes

    # Calculate number of samples in dataset
    samples = float(sum(count))

    # Calculate weight for each class
    for i in range(classes):
        if count[i] == 0:
            class_weights[i] = 0 
        else:
            class_weights[i] = samples / float(count[i])
    weight = [class_weights[label] for label in labels]
    return weight

weights = define_class_weights(dataset.img_labels, classes)
weights = torch.DoubleTensor(weights)
sampler = WeightedRandomSampler(weights, len(dataset))

In [None]:
# Set manual seed to ensure reproducibility
torch.manual_seed(42)

# Split dataset into train, test, and validation sets
train_size = int(0.85 * len(dataset))
val_size = int(0.05 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Apply transforms by creating new instances of CustomImageDataset with the appropriate subset and transform
transformed_train_dataset = CustomImageDataset(img_dir="C:\\Users\\jacob\\OneDrive\\Desktop\\Aletheia4Dataset", transform=train_transforms)
transformed_val_dataset = CustomImageDataset(img_dir="C:\\Users\\jacob\\OneDrive\\Desktop\\Aletheia4Dataset", transform=test_transforms)
transformed_test_dataset = CustomImageDataset(img_dir="C:\\Users\\jacob\\OneDrive\\Desktop\\Aletheia4Dataset", transform=test_transforms)

# Extract labels for the training set
train_labels = [dataset.img_labels[idx] for idx in train_dataset.indices]

# Calculate weights for the training set
weights = define_class_weights(train_labels, classes)
weights = torch.DoubleTensor(weights)

# Create a sampler for the training set
sampler = WeightedRandomSampler(weights, len(weights))

# Now create the DataLoaders with the transformed datasets
train_loader = DataLoader(transformed_train_dataset, batch_size=32, sampler=sampler, shuffle=False)
val_loader = DataLoader(transformed_val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(transformed_test_dataset, batch_size=64, shuffle=False)

## Create Neural Networks
We experimented with multiple model architectures. The "AdjustedResCNN" is the architecture of Aletheia 2.5 and currently in use on our website.

In [None]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout_prob=0.3):
        super(ResBlock, self).__init__()

        # Half the out_channels for each branch
        branch_channels = out_channels // 2

        # 3x3 convolution branch
        self.branch3x3 = nn.Sequential(
            nn.Conv2d(in_channels, branch_channels, 3, 1, 1),
            nn.BatchNorm2d(branch_channels),
            nn.LeakyReLU(),
            nn.Dropout(dropout_prob)
        )

        # 5x5 convolution branch
        self.branch5x5 = nn.Sequential(
            nn.Conv2d(in_channels, branch_channels, 5, 1, 2), 
            nn.BatchNorm2d(branch_channels),
            nn.LeakyReLU(),
            nn.Dropout(dropout_prob)
        )

        # Define skip connection and adapt channels
        self.residual = nn.Conv2d(in_channels, out_channels, 1) if in_channels != out_channels else nn.Identity()

    def forward(self, x):
        # Apply both branches
        out3x3 = self.branch3x3(x)
        out5x5 = self.branch5x5(x)

        # Concatenate along channel dimension
        out = torch.cat([out3x3, out5x5], dim=1)

        # Apply the residual connection
        res = self.residual(x)
        return out + res

class Aletheia4Net(nn.Module):
    def __init__(self, dropout_prob=0.3):
        super(Aletheia4Net, self).__init__()

        # Convolutional layers with residual blocks and max-pooling
        self.conv_layers = nn.Sequential(
            ResBlock(3, 16),
            nn.MaxPool2d(2),
            ResBlock(16, 32),
            nn.MaxPool2d(2),
            ResBlock(32, 64),
            nn.MaxPool2d(2),
            ResBlock(64, 128),
            nn.MaxPool2d(2),
            ResBlock(128, 256),
            nn.MaxPool2d(2),
            ResBlock(256, 512)
        )

        # Global Average Pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))

        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(512, 1024),
            nn.LeakyReLU(),
            nn.Dropout(dropout_prob),
            nn.Linear(1024, 512),
            nn.LeakyReLU(),
            nn.Dropout(dropout_prob),
            nn.Linear(512, 3)
        )

    def feature_size(self):
        # Testing feature size with 256x256 input
        return self.conv_layers(torch.zeros(1, 3, 256, 256)).view(1, -1).size(1)

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

## Set Up Device Agnostic Code

In [None]:
# Set the device to GPU if available, else use the CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

## Hyperparamters

In [None]:
# Define Hyperparameters
num_classes = 2
learning_rate = 0.0002
batch_size = 32
num_epochs = 36

## Training

### Prepare Training 

In [None]:
# Set a random seed for reproducibility
torch.manual_seed(42)

# Initialize model and transfer it to the GPU if available
model = Aletheia4Net().to(device)

In [None]:
# Use BCEWithLogitsLoss for our binary classification problem
loss_function = torch.nn.CrossEntropyLoss()
# Define NAdam (a veriant of the Adam optimizer) as our optimizer
optimizer = optim.NAdam(model.parameters(), lr=learning_rate)
# Define learning rate scheduler to adjust our learning rate
scheduler = StepLR(optimizer, step_size=1, gamma=0.9)

In [None]:
# Initialize TensorBoard summary writer
writer = SummaryWriter(f'runs/Aletheia4_0')
step = 0

In [None]:
# Import necessary function for checking accuracy of our model from helper_functions.py
from helper_functions import check_accuracy

### Train Model

In [None]:
# Initialize EmissionsTracker to tracker to monitor carbon emissions using the CodeCarbon library
carbon_tracker = EmissionsTracker(project_name="Aletheia4_0", log_level="critical")
carbon_tracker.start()

# Initialize tracking of correct predictions and total predictions
correct = 0
samples = 0

torch.manual_seed(3)

model = model.to(device)

# Set up log interval for recording metrics
metrics_interval = 100

# Start training
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    train_progress = tqdm(enumerate(train_loader), total=len(train_loader))
    for batch_idx, (inputs, labels) in train_progress:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = loss_function(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if (batch_idx + 1) % metrics_interval == 0:
            writer.add_scalar('Training Loss', running_loss / metrics_interval, epoch * len(train_loader) + batch_idx)
            writer.add_scalar('Training Accuracy', 100 * correct / total, epoch * len(train_loader) + batch_idx)
            running_loss = 0.0
            # Print training results
            print(f'Epoch {epoch+1}/{num_epochs}, Training Accuracy: {100 * correct / total:.2f}%', flush=True)


    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_function(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = 100 * correct / total
    writer.add_scalar('Validation Loss', val_loss, epoch)
    writer.add_scalar('Validation Accuracy', val_accuracy, epoch)

    # Print validation results
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

    # Scheduler step
    scheduler.step()

    # Save model checkpoint
    torch.save(model.state_dict(), f'model_epoch_{epoch}.pth')

# Finalize carbon tracking
emissions = carbon_tracker.stop()
print(f"Emissions: {emissions:.5f} kgCO2eq")

# Close TensorBoard writer
writer.close()

## Test state_dict of Trained Model

In [None]:
model = Aletheia4Net().to(device)

# Create a dummy input and perform a forward pass to create the fc1 layer
sample_input = torch.randn(1, 3, 256, 256).to(device)
model(sample_input)

# Specify path to the trained model weights
model_path = "./model_epoch_18.pth"

# Load trained weights into the model
model.load_state_dict(torch.load(model_path))

model = model.to(device)

In [None]:
# Check accuracy of trained model on the test data
check_accuracy_aletheia4(val_loader, model, device)

In [None]:
# Visualize Model using Netron
netron.start("C:\\Users\\jacob\\OneDrive\\Desktop\\SyntheticEye\\SyntheticEyeLocal\\StateDicts\\Aletheia\\3_0\\al3_0_epoch_35_correct240789.pth")

### Test on Specific Source

In [None]:
# Load the new dataset
new_root_directory = "/Users/jacob/OneDrive/StyleGAN images"

new_full_dataset = datasets.ImageFolder(root=new_root_directory)

# Apply data augmentation and images transformations
new_test_dataset = CustomDataset(
    new_full_dataset, 
    albumentations_transform=test_augmentation
)

# Create a DataLoader for the new dataset
new_test_loader = DataLoader(new_test_dataset, batch_size=batch_size, shuffle=False)

# Evaluate accuracy on new dataset
check_accuracy(new_test_loader, model, device)

### Test on Custom Images

In [None]:
# Load model that will be used to predict individual images
model_path = "C:\\Users\\jacob\\OneDrive\\Desktop\\SyntheticEye\\SyntheticEyeLocal\\StateDicts\\Aletheia\\3_0\\al3_0_epoch_33_correct240773.pth"
model.load_state_dict(torch.load(model_path))
model.to(device)

In [None]:
def single_image_transforms():
    """
    Combine torchvision and albumentations transforms for an individual image
    """

    tv_transform = tv_transform = TorchvisionBridge(torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224))]))
    alb_transform = AlbumentationsTransform(test_augmentation)
    
    # Apply both transformations to the given image
    def combined_transforms(img):
        img = tv_transform(img)
        return alb_transform(img)
    
    return combined_transforms

In [None]:
# Import function for getting predictions on a single image
from helper_functions import predict_single_image
# Import function for displaying predictions on multiple images
from helper_functions import display_folder_images

In [None]:
img_path = "/Users/jacob/OneDrive/Desktop/clearlyai.webp"

# Use predict_single_imge function to predict a single image
predicted_label = predict_single_image(
    img_path, 
    model,
    # Use Albumentations to transform the image as needed
    AlbumentationsTransform(test_augmentation)
)

# Print prediction for the given image
print(f"Predicted probability for image: {predicted_label}")

In [None]:
# Load combined transformations and display images with their predicted probabilities
combined_transforms = single_image_transforms()
display_folder_images("/Users/jacob/OneDrive/Desktop/SyntheticEyeLocal/EvalData/Aletheia/MultipleEval/", model, combined_transforms)