Group:

Members:

In [None]:
# Mount google drive

from google.colab import drive

drive.mount('/content/drive')

In [None]:
!rm -r data/
!rm -r __MACOSX/
!rm -r test/
!rm -r train/
!rm -r valid/
!rm data.*
!rm README.*

In [None]:
!mkdir data
!cp drive/MyDrive/CS4487/data.zip ./data.zip

In [None]:
!unzip ./data.zip

In [None]:
import os
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm import tqdm
from timm import create_model

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
# ===============================
# Self-Defined Dataloader
# ===============================
class data_loader(Dataset):
    def __init__(self, data_dir):

        real = os.path.join(data_dir, '0_real')
        fake = os.path.join(data_dir, '1_fake')

        file_names_real = os.listdir(real)
        file_names_fake = os.listdir(fake)

        self.full_filenames_real = [os.path.join(real, f) for f in file_names_real]
        self.full_filenames_fake = [os.path.join(fake, f) for f in file_names_fake]
        self.full_filenames = self.full_filenames_real + self.full_filenames_fake

        self.labels_real = [0 for _ in file_names_real]
        self.labels_fake = [1 for _ in file_names_fake]
        self.labels = self.labels_real + self.labels_fake

        self.transform_original = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
        ])

        self.transform_aug = transforms.Compose([
            transforms.RandomResizedCrop(size=(224, 224)),
            transforms.ToTensor(),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.RandomResizedCrop(224),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        ])

    def __len__(self):
        return len(self.full_filenames)

    def __getitem__(self, idx):
        image = Image.open(self.full_filenames[idx]).convert("RGB")
        image_aug = self.transform_aug(image)
        image_original = self.transform_original(image)
        label = self.labels[idx]
        return image_original, image_aug, label


In [None]:
# ===============================
# Neural NetWork
# ===============================
class CNN(nn.Module):
    def __init__(self, pretrained=True, freeze_backbone=True, dropout=0.3):
        super(CNN, self).__init__()

        # === ViT-B ===
        self.vit = create_model('vit_base_patch16_224', pretrained=pretrained, num_classes=0)  # 768-dim
        # === Swin-B ===
        self.swin = create_model('swin_base_patch4_window7_224', pretrained=pretrained, num_classes=0)  # 1024-dim

        # Freeze backbones (recommended for AIGC detection with limited data)
        if freeze_backbone:
            for param in self.vit.parameters():
                param.requires_grad = False
            for param in self.swin.parameters():
                param.requires_grad = False



        # Fusion MLP: 768 + 1024 = 1792 → 512 → 128 → 2
        self.fusion = nn.Sequential(
            nn.Linear(768 + 1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),

            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),

            nn.Linear(128, 2)  # Exactly 2 classes: real vs synthetic
        )

    def forward(self, x):
        # Extract features
        vit_feat = self.vit(x)           # [B, 768]
        swin_feat = self.swin(x)         # [B, 1024]

        # For Swin, forward_features returns [B, H*W, C] → global avg pool if needed
        if len(swin_feat.shape) == 3:
            swin_feat = swin_feat.mean(1)  # [B, 1024]

        # Concatenate
        combined = torch.cat([vit_feat, swin_feat], dim=1)  # [B, 1792]

        # Final classification
        out = self.fusion(combined)
        return out

In [None]:
# ===============================
# Train-Validate (Fixed + High-Performance Version)
# ===============================
def main():
    data_root   = "data"
    batch_size  = 16          # Smaller batch = better generalization with dual-view
    epochs      = 15
    lr          = 3e-5        # Lower LR works much better when unfreezing layers

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on {device}")

    # ===============================
    # Dataset & Dataloader
    # ===============================
    train_dataset = data_loader(os.path.join(data_root, "train"))
    val_dataset   = data_loader(os.path.join(data_root, "val"))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                              num_workers=4, pin_memory=True, drop_last=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False,
                              num_workers=4, pin_memory=True)

    # ===============================
    # Model + Optimizer + Loss
    # ===============================
    model = CNN().to(device)

    # Unfreeze last few blocks → huge boost!
    for p in model.vit.blocks[-3:].parameters():   p.requires_grad = True
    for p in model.swin.layers[-2:].parameters():  p.requires_grad = True

    optimizer = torch.optim.AdamW(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=lr,
        weight_decay=1e-4
    )
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)  # Magic for AIGC detection
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-6)

    best_val_acc = 0.0

    for epoch in range(epochs):
        # ------------------- Training -------------------
        model.train()
        total_loss = 0.0
        train_correct = 0
        train_total = 0

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1:02d}/{epochs} [Train]")

        for img_clean, img_aug, labels in pbar:
            img_clean = img_clean.to(device, non_blocking=True)
            img_aug   = img_aug.to(device, non_blocking=True)
            labels    = labels.to(device, non_blocking=True)

            optimizer.zero_grad()

            # Forward both views
            logits_clean = model(img_clean)
            logits_aug   = model(img_aug)

            loss = criterion(logits_clean, labels) + criterion(logits_aug, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            pred = (logits_clean + logits_aug).argmax(dim=1) / 2  # Ensemble both views
            train_correct += (pred == labels).sum().item()
            train_total += labels.size(0)

            pbar.set_postfix({
                "Loss": f"{total_loss/(pbar.n+1)}",
                "Acc": f"{train_correct/train_total:.4f}"
            })

        # ------------------- Validation -------------------
        model.eval()
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for img_clean, img_aug, labels in val_loader:
                img_clean = img_clean.to(device)
                labels = labels.to(device)

                logits = model(img_clean)  # Only clean images for validation
                pred = logits.argmax(dim=1)

                val_correct += (pred == labels).sum().item()
                val_total += labels.size(0)

        val_acc = val_correct / val_total

        # Scheduler step
        scheduler.step()

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "model.pth")
            print(f"New best model saved! Val Acc: {val_acc:.4f}")

        print(f"Epoch {epoch+1:02d} | "
              f"Train Loss: {total_loss/len(train_loader):.4f} | "
              f"Train Acc: {train_correct/train_total:.4f} | "
              f"Val Acc: {val_acc:.4f} | "
              f"Best Val: {best_val_acc:.4f}")

    # Final save
    torch.save(model.state_dict(), "model_final.pth")
    print(f"\nTraining completed! Best Validation Accuracy: {best_val_acc:.4f}")
    print("Models saved as 'model.pth' (recommended) and 'model_final.pth'")

In [None]:
if __name__ == "__main__":
    main()


In [None]:
!cp model.pth drive/MyDrive/CS4487/model.pth