# Group Project: AIGC Detection

**Name: Wenkai JiN, Yimian MA, Jacob Sydney Vincent APPLEBAUM, Nobel JAISON**

## Project introduction

### Background
1. Techniques for creating convincing naturalistic images have existed for decades
2. Recent advances in deep learning, particularly in generative adversarial networks (GANs) and diffusion models, have significantly increased the photorealism of generated content 
3. While these techniques have entertaining applications, their potential weaponization has raised serious concerns
4. Detecting AI-generated content (AIGC) has become a pressing issue and a prominent research topic

### Dataset Description
1. The dataset contains photographic and AI-generated images
2. Photographic images, with arbitrary sizes, are gathered from the ImageNet dataset
3. AI-generated images, with a fixed size of 512 × 512 × 3, are created using the text-to-image diffusion model Stable Diffusion v1.4, which is trained on the LAION dataset containing billions of image-text pairs
4. Photographic and AI-generated images have similar semantic content to avoid any content bias
5. Only binary labels are provided for training and testing


### The principle of ResNet
1. **Residual Learning Concept**:
    - In traditional deep neural networks, as the network becomes deeper, issues like vanishing gradients and degradation may arise. This means that simply adding more layers doesn't always result in improved performance; instead, it might even lead to a decline in accuracy.
2. **Introduction of Residual Blocks**:
    - ResNet tackles these problems by incorporating residual blocks. A residual block features a skip connection (also known as a shortcut connection).
    - In essence, within a residual block, rather than just mapping the input through a series of convolutional and other operations to obtain the output, the input is added to the result of the operations performed on it.
    - Mathematically, if we take the input to a residual block as \(x\) and the function representing the operations within the block (such as convolutions, activations, etc.) as \(F(x)\), the output \(y\) of the residual block is given by \(y = F(x) + x\). This addition of the original input makes it easier for the network to learn the residual function \(F(x)\), which is the difference between the desired mapping and the identity mapping.
    ![image.png](attachment:image.png)
3. **Example of How it Works**:
    - For instance, in a very deep ResNet architecture with multiple layers and residual blocks stacked together, if the network is already performing well at a certain stage and the additional operations in a residual block don't cause significant changes, the skip connection ensures that the information from earlier layers can still flow through and be utilized in subsequent layers. This way, the information isn't disrupted or overly distorted by the potentially negative impacts of adding more complex layers.
4. **Forward and Backward Pass**:
    - During the forward pass, the input data flows through these residual blocks. The skip connections play a crucial role in facilitating the flow and combination of information.
    - In the backward pass during training for gradient calculation, the skip connections also assist in propagating the gradients more effectively. This largely alleviates the vanishing gradient problem.
    - As a result, ResNet can train much deeper networks compared to traditional architectures and achieve better performance on tasks like image classification, object detection, and other computer vision tasks. In the context of the described dataset (used to distinguish between photographic and AI-generated images), a ResNet-based model could potentially be trained to perform this differentiation effectively.

## Code and analysis

### 1. Importing Necessary Libraries

In this part, we import a series of necessary Python libraries. `torch` is the core library of PyTorch. `sys` is used for handling some system-related operations (mainly for flushing the standard output here). The `nn` module is used to build neural network structures. `optim` is for defining optimizers to train the model. `DataLoader` and `random_split` help us load and split the dataset. `datasets` and `transforms` from the `torchvision` library are used to obtain common image datasets and perform preprocessing operations on image data. The `os` library is used for operations related to files and directories, such as loading and saving the checkpoint files of the model.

In [None]:
import torch
import sys
from torch import nn, optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import os

### 2. Defining the Bottleneck Block

Here, the `Bottleneck` class is defined, which inherits from `nn.Module` and is a commonly used basic block in the ResNet architecture. The `expansion` property specifies the expansion factor of the output channels relative to the intermediate channels. In the `__init__` method, a series of convolutional layers, batch normalization layers, and activation functions are initialized. The `forward` method defines the specific calculation flow of data passing through these components during the forward propagation process, which also includes the residual connection ( `out += identity` ), helping to train deep networks.

In [None]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

### 3. Defining the ResNet Architecture

The `ResNet` class also inherits from `nn.Module` and is used to build the complete ResNet network structure. In the `__init__` function, various layers of the network are initialized, including the initial convolutional layer, multiple layers composed of `Bottleneck` blocks built by the `_make_layer` method, and the final fully connected layer, etc. The `_make_layer` method is used to generate layers consisting of a specific number of `Bottleneck` blocks, handling channel number changes, downsampling, and other situations according to the input parameters. The `forward` method defines the forward propagation process of the entire network, calculating the input data layer by layer in sequence and finally outputting the prediction results.


In [None]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

### 4. Defining ResNet-50

This is a simple function for conveniently creating a `ResNet-50` network model. It calls the `ResNet` class and passes in corresponding parameters (using the `Bottleneck` block and a specific layer configuration `[3, 4, 6, 3]` ) to instantiate the `ResNet-50` model. Meanwhile, the number of output classes can be specified (the default is 1000).

In [None]:
def resnet50(num_classes=1000):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes)

### 5. Data Preprocessing and Loading

Firstly, the data preprocessing operation `transform` is defined. It resizes the images to `(224, 224)`, converts them into tensor format, and performs normalization processing (using the given mean and standard deviation). Then, the training set and validation set data are loaded from the specified directories using `ImageFolder` and the above preprocessing operation is applied. Next, the training set data is further divided into a training subset and a test subset according to a ratio of 8:2. Finally, the corresponding `DataLoader` objects are created respectively for loading data in batches during the training, validation, and testing phases, with parameters such as batch size and whether to shuffle the data set.

In [None]:
# Pre-process the data
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load the dataset
dataset = datasets.ImageFolder('./dataset/train', transform=transform) 
val_dataset = datasets.ImageFolder('./dataset/val', transform=transform)

train_size = int(0.8*len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

### 6. Model-related Settings (Device, Loss Function, Optimizer, Learning Rate Scheduler)

Here, the `ResNet-50` model instance is first created and the number of output classes is specified as 2. Then, depending on whether there is an available CUDA device, the model is placed on the corresponding device (GPU or CPU) to run. Next, the loss function is defined as the cross-entropy loss `CrossEntropyLoss`, the optimizer is chosen as the Adam optimizer with a learning rate set to 0.001, and a learning rate scheduler `StepLR` is also defined, which will multiply the learning rate by 0.1 every 7 training epochs to adjust the learning rate change during the training process.

In [None]:
# Use the custom-defined ResNet-50
model = resnet50(num_classes=2)

# CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

### 7. Functions for Loading and Saving Model Checkpoints

The `load_checkpoint` function is used to load the model parameters, optimizer state, learning rate scheduler state, as well as the recorded epoch number and the best accuracy information from the specified file (the default is `checkpoint.pth` ). If the file does not exist, the corresponding initialization operations will be carried out and a prompt will be given. The `save_checkpoint` function saves the current model, optimizer, learning rate scheduler states, as well as the current epoch number and the best accuracy information into the specified file, which is convenient for continuing training or evaluating the model later.

In [None]:
# Load checkpoint if exists
def load_checkpoint(model, optimizer, scheduler, filename='checkpoint.pth'):
    if os.path.isfile(filename):
        print(f"Loading checkpoint '{filename}'")
        sys.stdout.flush()
        checkpoint = torch.load(filename)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        start_epoch = checkpoint['epoch']
        best_acc = checkpoint['best_acc']
        print(f"Loaded checkpoint '{filename}' (epoch {start_epoch})")
        sys.stdout.flush()
    else:
        print(f"No checkpoint found at '{filename}'")
        sys.stdout.flush()
        start_epoch = 0
        best_acc = 0.0
    return model, optimizer, scheduler, start_epoch, best_acc

# Save checkpoint
def save_checkpoint(model, optimizer, scheduler, epoch, best_acc, filename='checkpoint.pth'):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'best_acc': best_acc,
    }, filename)

### 8. Function for Training and Validating the Model

The `train_model` function is the core function for training and validating the model in the entire code. First, it tries to load the relevant state information of the model, such as the model itself, the optimizer, and the learning rate scheduler, from the specified checkpoint file. Then it enters a loop for multiple training epochs. In each epoch, it processes the training phase and the validation phase separately, including setting the model mode (training or evaluation), loading data in batches, performing forward propagation, calculating the loss, performing backward propagation (only in the training phase), updating the optimizer and the learning rate scheduler, etc. At the same time, it records the loss and accuracy of each phase and updates the best model weights according to the accuracy in the validation phase. After the training is completed, the best model weights are saved, the testing phase evaluation is carried out, the loss and accuracy on the test set are calculated, and finally, the trained model is returned.

In [None]:
# Train and validate
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20, checkpoint_file='checkpoint.pth'):
    model, optimizer, scheduler, start_epoch, best_acc = load_checkpoint(model, optimizer, scheduler, checkpoint_file)
    
    best_model_wts = model.state_dict()
    best_acc = 0.0
    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)
        sys.stdout.flush()

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Train mode
                dataloader = train_loader
            else:
                model.eval()   # Evaluate mode
                dataloader = val_loader

            running_loss = 0.0
            running_corrects = 0

            # Go through all the data
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero grad
                optimizer.zero_grad()

                # Forward
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            sys.stdout.flush()

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

        # Save checkpoint
        save_checkpoint(model, optimizer, scheduler, epoch + 1, best_acc, checkpoint_file)

    print(f'Best val Acc: {best_acc:4f}')
    sys.stdout.flush()
    torch.save(best_model_wts, 'model.pth')
    model.load_state_dict(best_model_wts)

    # Testing phase
    model.eval()
    test_loss = 0.0
    test_corrects = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            test_loss += loss.item() * inputs.size(0)
            test_corrects += torch.sum(preds == labels.data)

    test_loss = test_loss / len(test_loader.dataset)
    test_acc = test_corrects.double() / len(test_loader.dataset)

    print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
    sys.stdout.flush()
    return model

### 9. Executing the Training Process

This line of code calls the `train_model` function, passing in the previously created model, data loaders, loss function, optimizer, learning rate scheduler, and other parameters, and starts the entire process of training, validating, and finally testing the model, and finally obtains the trained model.

In [None]:
model = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20)

## Result analysis

### More points for thoroughness and testing interesting cases (e.g., different parameter settings)
### More points for insightful observations and analysis (e.g., failure analysis)