<a href="https://colab.research.google.com/github/WilliamJWen/Project42/blob/main/colab_notebooks/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Project42: AI generated Image Detector**



# Data Preprocessing

# Baseline Model

## Architecture

In [3]:
import torch.nn as nn
import torch.nn.functional as F


class Baseline(nn.Module):
  def __init__(self):
    super(Baseline, self).__init__()

    # Hidden layer activation
    self.activation = F.relu

    # Average pooling
    self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2) # 2w x 2h -> w x h
    self.pool4 = nn.AvgPool2d(kernel_size=4, stride=4) # 4w x 4h -> w x h

    # Convolutional layers
    self.conv1 = nn.Conv2d(in_channels=3,
                           out_channels=64,
                           kernel_size=7,
                           padding=3,
                           stride=2)
    self.conv2 = nn.Conv2d(in_channels=64,
                           out_channels=64,
                           kernel_size=3,
                           padding=1,
                           stride=1)
    self.conv3 = nn.Conv2d(in_channels=64,
                           out_channels=64,
                           kernel_size=3,
                           padding=1,
                           stride=1)
    self.conv4 = nn.Conv2d(in_channels=64,
                           out_channels=128,
                           kernel_size=3,
                           padding=1,
                           stride=2)
    self.conv5 = nn.Conv2d(in_channels=128,
                           out_channels=128,
                           kernel_size=3,
                           padding=1,
                           stride=1)
    self.conv6 = nn.Conv2d(in_channels=128,
                           out_channels=256,
                           kernel_size=3,
                           padding=1,
                           stride=2)
    self.conv7 = nn.Conv2d(in_channels=256,
                           out_channels=256,
                           kernel_size=3,
                           padding=1,
                           stride=1)
    self.conv8 = nn.Conv2d(in_channels=256,
                           out_channels=512,
                           kernel_size=3,
                           padding=1,
                           stride=2)
    self.conv9 = nn.Conv2d(in_channels=512,
                           out_channels=512,
                           kernel_size=3,
                           padding=1,
                           stride=1)

    # FC layer
    self.fc = nn.Linear(512 * 2 * 2, 1)


  def forward(self, x):
    # Layer 1
    x = self.conv1(x)                     # Output: 64x128x128
    x = self.pool2(x)                     # Output: 64x64x64

    # Layer 2
    skip = x.detach().clone()
    x = self.conv2(x)                     # Output: 64x64x64

    # Layer 3
    x = self.pool2(skip) + self.conv3(x)  # Output: 64x64x64

    # Layer 4
    skip = x.detach.clone()
    x = self.conv4(x)                     # Output: 128x32x32

    # Layer 5
    x = self.pool2(skip) + self.conv5(x)  # Output: 128x32x32

    # Layer 6
    skip = x.detach.clone()
    x = self.conv6(x)                     # Output: 256x16x16

    # Layer 7
    x = self.pool2(skip) + self.conv7(x)  # Output: 256x16x16

    # Layer 8
    skip = x.detach.clone()
    x = self.conv8(x)                     # Output: 512x8x8

    # Layer 9
    x = self.pool2(skip) + self.conv9(x)  # Output: 512x8x8
    x = self.pool4(x)                     # Output: 512x2x2

    # Layer 10
    x = x.view(-1, 512 * 2 * 2)
    x = self.fc(x)
    x = x.squeeze(1)

    return x


baseline_model = Baseline()
num_params = 0
for param in baseline_model.parameters():
    num_params += param.numel()
print("There are", num_params, "parameters in the baseline model")


There are 4732033 parameters in the baseline model


## Training

### Non-Baseline-Specific Functions

In [None]:
import matplotlib.pyplot as plt
import torch


# Plot training curve from model path
def plot_training_curve(path):
  train_err = np.loadtxt("{}_train_err.csv".format(path))
  val_err = np.loadtxt("{}_val_err.csv".format(path))
  train_loss = np.loadtxt("{}_train_loss.csv".format(path))
  val_loss = np.loadtxt("{}_val_loss.csv".format(path))
  plt.title("Train vs Validation Error")
  n = len(train_err) # number of epochs
  plt.plot(range(1, n + 1), train_err, label="Train")
  plt.plot(range(1, n + 1), val_err, label="Validation")
  plt.xlabel("Epoch")
  plt.ylabel("Error")
  plt.legend(loc='best')
  plt.show()
  plt.title("Train vs Validation Loss")
  plt.plot(range(1, n + 1), train_loss, label="Train")
  plt.plot(range(1, n + 1), val_loss, label="Validation")
  plt.xlabel("Epoch")
  plt.ylabel("Loss")
  plt.legend(loc='best')
  plt.show()


# Given a tensor containing 2 possible values, normalize this to 0/1
def normalize_label(labels):
  max_val = torch.max(labels)
  min_val = torch.min(labels)
  norm_labels = (labels - min_val)/(max_val - min_val)
  return norm_labels


### Baseline-Specific Functions

In [4]:
import numpy as np
import time
import torch.optim as optim
from torch.utils.data import DataLoader


# Get name of baseline model checkpoint
def get_baseline_name(epoch):
  path = "model_baseline_epoch{0}".format(epoch)
  return path


# Evaluate data from loader on net
def evaluate_baseline(net, loader):
  # Enable GPU usage
  if torch.cuda.is_available():
    net = net.cuda()

  total_loss = 0.0
  total_err = 0.0
  num_samples = 0

  criterion = nn.BinaryCrossEntropy()

  for i, data in enumerate(loader):
    imgs, labels = data
    labels = normalize_label(labels)
    num_samples += len(labels)

    # Enable GPU usage
    if torch.cuda.is_available():
      imgs = imgs.cuda()
      labels = labels.cuda()

    # Forward pass
    outputs = net(imgs)
    loss = criterion(outputs, labels)

    # Update loss, error
    total_loss += loss.item()
    for pred, label in zip(outputs, labels):
      if pred != label:
        total_err += 1

  err = float(total_err) / num_samples
  loss = float(total_loss) / len(loader)

  return err, loss


# Train baseline model on given datasets
def train_baseline(net, train_set, val_set):
  torch.manual_seed(1000)

  # Note: we want to train over 3000 iterations. For now, I assume a batch size
  # of ~6k, meaning there will be ~100 iterations/per epoch, so I am using 30
  # epochs.

  # Enable GPU usage
  if torch.cuda.is_available():
    net = net.cuda()

  # Hyperparameters
  batch_size = 64
  initial_learning_rate = 0.01
  num_epochs = 30

  # Criterion
  criterion = nn.BinaryCrossEntropy()

  # Optimizer
  betas = (0.9, 0.99)
  milestones = [10, 20]
  lr_factor = 0.1
  optimizer = optim.Adam(net.parameters(),
                         lr=initial_learning_rate,
                         betas=betas)
  learning_rate_schedule = optim.lr_scheduler.MultiStepLR(optimizer,
                                                          milestones=milestones,
                                                          gamma=lr_factor)

  # Arrays to record loss and error
  train_err = np.zeros(num_epochs)
  train_loss = np.zeros(num_epochs)
  val_err = np.zeros(num_epochs)
  val_loss = np.zeros(num_epochs)

  # Load data
  train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
  val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

  # Train
  start_time = time.time()
  for epoch in range(num_epochs):
    total_train_loss = 0.0
    total_train_err = 0.0
    num_samples = 0

    # Iterate through mini batches
    for i, data in enumerate(train_loader):
      imgs, labels = data
      num_samples += len(labels)

      # Enable GPU usage
      if torch.cuda.is_available():
        imgs = imgs.cuda()
        labels = labels.cuda()

      # Forward pass, backward pass, update
      optimizer.zero_grad()
      outputs = net(imgs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()

      # Update training loss, error
      total_loss += loss.item()
      for pred, label in zip(outputs, labels):
        if pred != label:
          total_err += 1

    # Calculate total loss, error
    train_err[epoch] = total_train_err / num_samples
    train_loss[epoch] = total_train_loss / len(train_loader)
    val_err[epoch], val_loss[epoch] = evaluate_baseline(net,
                                                        val_loader,
                                                        criterion)

    # Print loss, error
    print(("Epoch {}: Train err: {}, Train loss: {} | " +
           "Validation err: {}, Validation loss: {}").format(
              epoch + 1,
              train_err[epoch],
              train_loss[epoch],
              val_err[epoch],
              val_loss[epoch]))

    # Save the current model (checkpoint) to a file
    model_path = get_baseline_name(epoch + 1)
    torch.save(net.state_dict(), model_path)

    # Update LR scheduler
    learning_rate_schedule.step()

  # Finish up
  print('Finished Training')
  end_time = time.time()
  elapsed_time = end_time - start_time
  print("Total time elapsed: {:.2f} seconds".format(elapsed_time))

  # Write the train/test loss/err into CSV file for plotting later
  np.savetxt("{}_train_err.csv".format(model_path), train_err)
  np.savetxt("{}_train_loss.csv".format(model_path), train_loss)
  np.savetxt("{}_val_err.csv".format(model_path), val_err)
  np.savetxt("{}_val_loss.csv".format(model_path), val_loss)
