Name : KEUNG Yat LONG

*   List item
*   List item


SID : 5714 6792

#Background
___
- Image generation techniques have existed for decades.
- Recent deep learning advances (GANs, diffusion models) have boosted photorealism in AI-generated content (AIGC).
- While these advancements have entertainment value, they also pose risks of weaponization.
- Detecting AIGC is now a critical issue and a prominent research focus.

#Basic Information
---
- **Main task**: Binary classification (detect whether an image is AI-generated or not).
- **Input**: RGB images.
- **Output**: Binary label indicating if the image is AI-generated.
- **Training set**: 45,000 images.
- **Validation set**: 5,000 images.

#Dataset Exploration
---
- Dataset includes both photographic and AI-generated images.
- Photographic images are sourced from ImageNet with varying sizes.
- AI-generated images are 512 × 512 × 3, created using Stable Diffusion v1.4, trained on the LAION dataset.
- Photographic and AI-generated images have similar semantic content to avoid content bias.
- Only binary labels are available for training and testing.

##Load dataset

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
import zipfile
import os

# zip_file_path = '/content/drive/MyDrive/CS4487/CS4487 Project/AIGC-Detection-Dataset.zip'

# # Create a directory to extract the dataset
# output_dir = 'AIGC_Dataset'
# os.makedirs(output_dir, exist_ok=True)

# # Extract the zip file
# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
#     zip_ref.extractall(output_dir)

# print(f'Dataset extracted to {output_dir}')

##Data Preprocessing

In [3]:
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

  from .autonotebook import tqdm as notebook_tqdm


Define transformations for training data (with augmentation)

In [4]:
train_transforms = transforms.Compose([
    transforms.Resize((512, 512)),  # Resize to 512x512
    transforms.RandomRotation(20),  # Random rotation between 0-20 degrees
    transforms.RandomHorizontalFlip(),  # Randomly flip images horizontally
    transforms.RandomAffine(0, translate=(0.2, 0.2), shear=20),  # Random horizontal and vertical shifts, shear
    transforms.RandomResizedCrop(512, scale=(0.8, 1.2)),  # Random zoom
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize([0.4778, 0.4559, 0.4175], [0.2794, 0.2739, 0.2902])  # Normalize using ImageNet's mean and std
])

Define transformations for validation data (no augmentation, only resizing and normalization)

In [5]:
val_transforms = transforms.Compose([
    transforms.Resize((512, 512)),  # Resize all validation images to 512x512
    transforms.ToTensor(),
    transforms.Normalize([0.4778, 0.4559, 0.4175], [0.2794, 0.2739, 0.2902])
])

Load the datasets with the ImageFolder utility

In [6]:
import torch

# Step 1: Split the Training Dataset into Train and Dev Subsets
train_dataset = datasets.ImageFolder('AIGC_Dataset/AIGC-Detection-Dataset/train', transform=train_transforms)
val_dataset = datasets.ImageFolder(f'AIGC_Dataset/AIGC-Detection-Dataset/val', transform=val_transforms)

def random_split(dataset, lengths):
    indices = torch.randperm(len(dataset)).tolist()
    return [torch.utils.data.Subset(dataset, indices[offset - length:offset]) for offset, length in zip(torch._utils._accumulate(lengths), lengths)]
# Split training dataset into train and dev subsets
train_size = int(0.9 * len(train_dataset))  # 90% training
dev_size = len(train_dataset) - train_size  # 10% development
train_subset, dev_subset = random_split(train_dataset, [train_size, dev_size])

In [7]:
# DataLoaders
batch_size = 32
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=4)
dev_loader = DataLoader(dev_subset, batch_size=batch_size, shuffle=False, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

##Model Training

In [8]:
from tqdm import tqdm  # Progress bar library
import numpy as np

def train_model(model, train_loader, dev_loader, val_loader, criterion, optimizer, scheduler, num_epochs=50, patience=5, device='cuda'):
    model.to(device)
    best_val_loss = np.inf
    epochs_without_improvement = 0
    best_model_state = None

    for epoch in range(num_epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        # Progress bar for training phase
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        train_loader_tqdm = tqdm(train_loader, desc="Training", leave=False)
        
        # Training Phase
        for inputs, labels in train_loader_tqdm:
            inputs, labels = inputs.to(device), labels.float().to(device)
            optimizer.zero_grad()
            outputs = model(inputs).squeeze(1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            preds = (outputs > 0).float()  # Binary threshold
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Update progress bar with current loss
            train_loader_tqdm.set_postfix(loss=loss.item())

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")

        # Validation Phase on Dev Subset
        print("Evaluating on Dev Set...")
        dev_loss, dev_acc = evaluate_model_with_progress(model, dev_loader, criterion, device)
        print(f"Dev Loss: {dev_loss:.4f}, Dev Accuracy: {dev_acc:.4f}")

        # Adjust learning rate based on validation loss
        scheduler.step(dev_loss)

        # Early Stopping Logic
        if dev_loss < best_val_loss:
            best_val_loss = dev_loss
            epochs_without_improvement = 0
            best_model_state = model.state_dict()  # Save best model
            print("Validation loss improved. Saving model...")
        else:
            epochs_without_improvement += 1
            print(f"No improvement for {epochs_without_improvement} epochs.")

        if epochs_without_improvement >= patience:
            print(f"Early stopping triggered after {patience} epochs without improvement.")
            break

    # Restore the best model state
    if best_model_state:
        model.load_state_dict(best_model_state)

    # Final Test on the Validation Set
    print("Evaluating on Test Set...")
    val_loss, val_acc = evaluate_model_with_progress(model, val_loader, criterion, device)
    print(f"Final Test Loss: {val_loss:.4f}, Test Accuracy: {val_acc:.4f}")

# Evaluation with progress bar
def evaluate_model_with_progress(model, loader, criterion, device='cuda'):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0

    with torch.no_grad():
        loader_tqdm = tqdm(loader, desc="Evaluating", leave=False)
        for inputs, labels in loader_tqdm:
            inputs, labels = inputs.to(device), labels.float().to(device)

            outputs = model(inputs).squeeze(1)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            preds = (outputs > 0.5).float()  # Binary threshold
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Update progress bar with current loss
            loader_tqdm.set_postfix(loss=loss.item())

    loss = running_loss / len(loader)
    acc = correct / total
    return loss, acc


#Model Evaluation

In [9]:
def evaluate_model(model, loader, criterion, device='cuda'):
    model.eval()  # Set model to evaluation mode
    running_loss, correct, total = 0.0, 0, 0

    with torch.no_grad():  # Disable gradient computation
        for i, (inputs, labels) in enumerate(loader):
            # Print progress every 10 batches
            if i % 10 == 0:
                print(f"Processing batch {i + 1}/{len(loader)}")

            # Move data to the specified device
            inputs, labels = inputs.to(device), labels.float().to(device)

            # Forward pass
            outputs = model(inputs)

            # Handle tuple outputs (e.g., InceptionV3)
            if isinstance(outputs, tuple):
                outputs = outputs[0]

            # Apply sigmoid if model outputs raw logits
            outputs = torch.sigmoid(outputs).squeeze(1)

            # Compute loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Compute predictions and update metrics
            preds = (outputs > 0.5).float()  # Binary threshold
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    # Calculate average loss and accuracy
    val_loss = running_loss / len(loader)
    val_acc = correct / total
    return val_loss, val_acc

##Model Selection

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

In [11]:
# Save the model to a specified path
def save_model(model, model_name):
    save_path = f"{model_name}_model.pth"  # Save each model with a unique name
    torch.save(model.state_dict(), save_path)
    print(f"{model_name} model saved to {save_path}")


In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DenseNet121

In [13]:
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from collections import Counter

# Build DenseNet121 Model
def build_densenet_model():
    base_model = models.densenet121(weights='IMAGENET1K_V1')  # Load pretrained DenseNet121
    num_features = base_model.classifier.in_features
    base_model.classifier = nn.Sequential(
        nn.Linear(num_features, 512),
        nn.ReLU(),
        nn.Dropout(0.5),  # Dropout for regularization
        nn.Linear(512, 1)
    )
    return base_model

# Unfreeze specific layers
def unfreeze_layers(model, unfreeze_layer_names):
    for name, param in model.named_parameters():
        if any(layer in name for layer in unfreeze_layer_names):
            param.requires_grad = True

# Calculate class imbalance
def compute_class_weights(dataset):
    class_counts = Counter(dataset.dataset.targets if hasattr(dataset, 'dataset') else dataset.targets)
    class_0 = class_counts[0]
    class_1 = class_counts[1]
    pos_weight = class_0 / class_1
    return torch.tensor([pos_weight]).to(device)

In [None]:
# Initialize DenseNet121 Model
densenet_model = build_densenet_model()

# Unfreeze layers
unfreeze_layers(densenet_model, ["denseblock4", "transition3"])

# Define optimizer with layer-wise learning rates
optimizer = optim.Adam([
    {'params': densenet_model.features.parameters(), 'lr': 1e-4},  # Fine-tune base
    {'params': densenet_model.classifier.parameters(), 'lr': 1e-3}  # Train head
], weight_decay=1e-4)

# Define scheduler for dynamic learning rate adjustment
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

# Compute class weights for BCEWithLogitsLoss
pos_weight = compute_class_weights(train_subset)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

In [None]:
# Call the training function
train_model(
    densenet_model,
    train_loader,
    dev_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    num_epochs=50,
    patience=5,
    device=device
)

# Save the best model
save_model(densenet_model, "DenseNet121")


Epoch 1/50


Training:   2%|▏         | 19/1266 [26:16<28:50:47, 83.28s/it, loss=0.44] 

## Show Accuracy Metric

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, roc_auc_score
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Function to evaluate model and compute metrics
def evaluate_metrics(model, loader, device='cuda'):
    model.eval()
    y_true = []  # Ground truth labels
    y_scores = []  # Predicted probabilities

    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(device)
            labels = labels.cpu().numpy()
            outputs = model(inputs).squeeze(1).cpu().numpy()  # Get logits

            # Append labels and sigmoid-transformed outputs
            y_true.extend(labels)
            y_scores.extend(outputs)

    # Convert to NumPy arrays
    y_true = np.array(y_true)
    y_scores = np.array(y_scores)

    # Predictions based on threshold
    y_preds = (y_scores > 0).astype(int)

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_preds)
    print("Confusion Matrix:")
    print(cm)

    # Classification Report
    report = classification_report(y_true, y_preds, target_names=["Class 0 (Real)", "Class 1 (Fake)"])
    print("\nClassification Report:")
    print(report)

    # F1 Score, Recall, and Precision
    f1 = (2 * cm[1, 1]) / (2 * cm[1, 1] + cm[1, 0] + cm[0, 1])
    recall = cm[1, 1] / (cm[1, 1] + cm[1, 0])
    precision = cm[1, 1] / (cm[1, 1] + cm[0, 1])
    print(f"\nF1 Score: {f1:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Precision: {precision:.4f}")

    # ROC AUC Score
    roc_auc = roc_auc_score(y_true, y_scores)
    print(f"ROC AUC Score: {roc_auc:.4f}")

    # ROC Curve
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc:.4f})")
    plt.plot([0, 1], [0, 1], linestyle="--", color="gray")  # Diagonal line
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend(loc="lower right")
    plt.grid()
    plt.show()

    # Confusion Matrix Heatmap
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Real", "Fake"], yticklabels=["Real", "Fake"])
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title("Confusion Matrix")
    plt.show()

# Call the function with your model and validation loader
evaluate_metrics(densenet_model, val_loader, device=device)

## Test Code

In [None]:
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from sklearn import metrics
from torch.utils.data import Dataset

In [None]:
# Define a custom dataset loader for binary classification
class TestDataset(Dataset):
    def __init__(self, data_dir):
        real_dir = os.path.join(data_dir, '0_real')
        fake_dir = os.path.join(data_dir, '1_fake')

        # Load file paths and labels
        self.image_paths = [os.path.join(real_dir, f) for f in os.listdir(real_dir)] + \
                           [os.path.join(fake_dir, f) for f in os.listdir(fake_dir)]
        self.labels = [0] * len(os.listdir(real_dir)) + [1] * len(os.listdir(fake_dir))

        # Image transformations
        self.transform = transforms.Compose([
            transforms.Resize((512, 512)),
            transforms.ToTensor(),
            transforms.Normalize([0.4778, 0.4559, 0.4175], [0.2794, 0.2739, 0.2902])  # Normalize using ImageNet's mean and std
        ])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        image = self.transform(image)
        return image, label

In [None]:
# Define the test function
def test(model, test_dataset_path):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    model.eval()

    # Load test dataset
    test_dataset = TestDataset(test_dataset_path)
    y_true, y_pred = [], []

    for img, label in test_dataset:
        img = img.unsqueeze(0).to(device)  # Add batch dimension
        output = model(img).item()  # Forward pass

        pred = 1 if output > 0.5 else 0  # Sigmoid threshold
        y_true.append(label)
        y_pred.append(pred)

    # Calculate accuracy
    accuracy = metrics.accuracy_score(y_true, y_pred)
    return accuracy

In [None]:
# Replace these paths before running the test
test_dataset_path = ''  # Replace with the path to the test dataset folder
model_path = 'DenseNet121_model.pth'  # Replace with the path to the saved model file

In [None]:
# Load the trained DenseNet121 model
model = build_densenet_model()
model.load_state_dict(torch.load(model_path))
model.eval()

In [None]:
# Evaluate the model
accuracy = test(model, test_dataset_path)
print(f"Test Accuracy: {accuracy}")