In [None]:
import torch
from PIL import Image
import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet50
from google.colab import drive
import os
import random
import shutil
from torchvision import models


In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
BASE_PATH = '/content/drive/MyDrive/AI_Real_Dataset/raw/'

src_dir = {
    'ai': os.path.join(BASE_PATH, 'generatedAI'),
    'real': os.path.join(BASE_PATH, 'realPic/pic')
}

dataset = {
    'train': {
            'ai': os.path.join(BASE_PATH, 'train/ai'),
            'real': os.path.join(BASE_PATH, 'train/real')
    },
        'validation': {
            'ai': os.path.join(BASE_PATH, 'validation/ai'),
            'real': os.path.join(BASE_PATH, 'validation/real')
    },
        'test': {
            'ai': os.path.join(BASE_PATH, 'test/ai'),
            'real': os.path.join(BASE_PATH, 'test/real')
    }
}


SPLIT = (0.7, 0.15, 0.15)

In [None]:
def split_and_copy(src_dir, dst_dirs):
    files = [f for f in os.listdir(src_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))]
    random.shuffle(files)

    n = len(files)
    n_train = int(SPLIT[0] * n)
    n_val = int(SPLIT[1] * n)

    splits = {
        "train": files[:n_train],
        "validation": files[n_train:n_train + n_val],
        "test": files[n_train + n_val:]
    }

    for split_name, split_files in splits.items():
        os.makedirs(dst_dirs[split_name], exist_ok=True)
        for f in split_files:
            shutil.copy(
                os.path.join(src_dir, f),
                os.path.join(dst_dirs[split_name], f)
            )

    print(f"{src_dir} -> "
          f"train: {len(splits['train'])}, "
          f"val: {len(splits['validation'])}, "
          f"test: {len(splits['test'])}")

In [None]:
split_and_copy(src_dir["ai"], {k: dataset[k]["ai"] for k in dataset})

/content/drive/MyDrive/AI_Real_Dataset/raw/generatedAI -> train: 11349, val: 2431, test: 2433


In [None]:
split_and_copy(src_dir["real"], {k: dataset[k]["real"] for k in dataset})

/content/drive/MyDrive/AI_Real_Dataset/raw/realPic/pic -> train: 22248, val: 4767, test: 4768


In [None]:
!echo "TRAIN AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/train/ai | wc -l
!echo "VAL AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/validation/ai | wc -l
!echo "TEST AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/test/ai | wc -l

!echo "TRAIN REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/train/real | wc -l
!echo "VAL REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/validation/real | wc -l
!echo "TEST REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/test/real | wc -l

TRAIN AI:
11349
VAL AI:
2431
TEST AI:
2433
TRAIN REAL:
22248
VAL REAL:
4767
TEST REAL:
4768


In [None]:
random.seed(42)

BASE = BASE_PATH

TARGET = {
    "train/real": 11349,
    "validation/real": 2431,
    "test/real": 2433
}

for rel_path, target_count in TARGET.items():
    dir_path = os.path.join(BASE, rel_path)
    files = [f for f in os.listdir(dir_path)
             if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

    current_count = len(files)
    to_delete = current_count - target_count

    if to_delete <= 0:
        print(f"{rel_path}: nothing to delete")
        continue

    delete_files = random.sample(files, to_delete)

    for f in delete_files:
        os.remove(os.path.join(dir_path, f))

    print(f"{rel_path}: deleted {to_delete}, remaining {target_count}")

train/real: deleted 10899, remaining 11349
validation/real: deleted 2336, remaining 2431
test/real: deleted 2335, remaining 2433


In [None]:
!echo "TRAIN AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/train/ai | wc -l
!echo "VAL AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/validation/ai | wc -l
!echo "TEST AI:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/test/ai | wc -l

!echo "TRAIN REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/train/real | wc -l
!echo "VAL REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/validation/real | wc -l
!echo "TEST REAL:" && ls /content/drive/MyDrive/AI_Real_Dataset/raw/test/real | wc -l

TRAIN AI:
11349
VAL AI:
2431
TEST AI:
2433
TRAIN REAL:
11349
VAL REAL:
2431
TEST REAL:
2433


In [None]:
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


transform_val = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


In [None]:
train_dataset = datasets.ImageFolder(root=BASE_PATH +'train', transform=transform_train)
validation_dataset = datasets.ImageFolder(root=BASE_PATH +'validation', transform=transform_val)
test_dataset = datasets.ImageFolder(root=BASE_PATH +'test', transform=transform_test)

Dataset ImageFolder
    Number of datapoints: 4862
    Root location: /content/drive/MyDrive/AI_Real_Dataset/raw/validation
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
               Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
           )


In [None]:
train_loader = DataLoader(train_dataset, batch_size=24, shuffle=True, num_workers=2, pin_memory=True)
validation_loader = DataLoader(validation_dataset, batch_size=24, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=24, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
images, labels = next(iter(train_loader))
print(images.shape)
print(labels)

torch.Size([24, 3, 224, 224])
tensor([1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 0])


In [None]:
device=''
if torch.cuda.is_available():
    device='cuda'
else:
    device='cpu'

print(device)

cuda


In [None]:
model = models.resnet50(weights=None)
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, 1)
model = model.to(device)

In [None]:
ckpt_path = '/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/resnet50_final.pth'

model.load_state_dict(torch.load(ckpt_path, map_location=device))


<All keys matched successfully>

In [None]:
for param in model.parameters():
    param.requires_grad = False

for param in model.fc.parameters():
    param.requires_grad = True

In [None]:
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=1e-4)

In [None]:
scaler = torch.amp.GradScaler()

In [None]:
def train(model, loader, optimizer, criterion):

    model.train()
    train_loss = 0.0

    for i, (inputs, labels) in enumerate(loader):

        inputs = inputs.to(device)
        labels = labels.float().unsqueeze(1).to(device, non_blocking=True)

        optimizer.zero_grad()

        with torch.amp.autocast(device_type=device):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item() * inputs.size(0)

    return train_loss / len(loader.dataset)

In [None]:
def validate(model, loader, criterion):

    model.eval()
    running_loss  = 0.0
    correct = 0
    total = 0

    with torch.no_grad():

        for i, (inputs, labels) in enumerate(loader):

            inputs = inputs.to(device)
            labels = labels.float().unsqueeze(1).to(device, non_blocking=True)

            outputs = model(inputs)
            loss = criterion(outputs, labels)


            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).float()

            correct += (preds.squeeze(1) == labels.squeeze(1)).sum().item()
            total += labels.size(0)
            running_loss += loss.item() * inputs.size(0)


    avg_loss = running_loss / total
    accuracy = correct / total
    return avg_loss, accuracy

In [None]:
epochs = 6

for epoch in range(epochs):

    train_loss = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = validate(model, validation_loader, criterion)

    print(f"""Epoch [{epoch+1}/{epochs}]
            Train Loss: {train_loss:.4f}
            Val Loss:   {val_loss:.4f}
            Val Acc:    {val_acc:.4f}
    """)

Epoch [1/6]
            Train Loss: 0.6662
            Val Loss:   0.6523
            Val Acc:    0.6035
    
Epoch [2/6]
            Train Loss: 0.6463
            Val Loss:   0.6380
            Val Acc:    0.6543
    
Epoch [3/6]
            Train Loss: 0.6327
            Val Loss:   0.6213
            Val Acc:    0.6958
    
Epoch [4/6]
            Train Loss: 0.6192
            Val Loss:   0.6371
            Val Acc:    0.6456
    


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), "/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/restnet_train_Head.pth")


In [None]:
for param in model.parameters():
    param.requires_grad = False

for param in model.fc.parameters():
    param.requires_grad = True

for param in model.layer4.parameters():
    param.requires_grad = True

In [None]:
optimizer = torch.optim.AdamW(
    [
        {"params": model.layer4.parameters(), "lr": 1e-5},
        {"params": model.fc.parameters(),     "lr": 1e-4},
    ],
    weight_decay=1e-4
)

criterion = nn.BCEWithLogitsLoss()


In [None]:
EPOCHS = 10
PATIENCE = 3
best_val_acc = 0.0
patience_counter = 0


for epoch in range(EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = validate(model, validation_loader, criterion)

    print(
        f"Epoch [{epoch+1}/{EPOCHS}] | "
        f"Train Loss: {train_loss:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Val Acc: {val_acc:.4f}"
    )

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= PATIENCE:
        print("Early stopping triggered")
        break

Epoch [1/10] | Train Loss: 0.5197 | Val Loss: 0.5115 | Val Acc: 0.7361
Epoch [2/10] | Train Loss: 0.4697 | Val Loss: 0.4826 | Val Acc: 0.7727
Epoch [3/10] | Train Loss: 0.4526 | Val Loss: 0.4182 | Val Acc: 0.8058
Epoch [4/10] | Train Loss: 0.4419 | Val Loss: 0.4146 | Val Acc: 0.8083
Epoch [5/10] | Train Loss: 0.4333 | Val Loss: 0.5595 | Val Acc: 0.7240
Epoch [6/10] | Train Loss: 0.4252 | Val Loss: 0.3969 | Val Acc: 0.8190
Epoch [7/10] | Train Loss: 0.4205 | Val Loss: 0.5060 | Val Acc: 0.7657
Epoch [8/10] | Train Loss: 0.4128 | Val Loss: 0.3873 | Val Acc: 0.8241
Epoch [9/10] | Train Loss: 0.4047 | Val Loss: 0.3941 | Val Acc: 0.8229
Epoch [10/10] | Train Loss: 0.4049 | Val Loss: 0.3774 | Val Acc: 0.8359


In [None]:
torch.save(model.state_dict(), "/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/restnet_train_Head_and_layer.pth")


In [None]:
device=''
if torch.cuda.is_available():
    device='cuda'
else:
    device='cpu'

print(device)

cuda


In [None]:
model = models.resnet50(weights=None)
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, 1)
model = model.to(device)

In [None]:
ckpt_path = '/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/restnet_train_Head_and_layer.pth'

model.load_state_dict(torch.load(ckpt_path, map_location=device))


<All keys matched successfully>

In [None]:
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [None]:
transform_test

Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)

In [None]:
test_loader

<torch.utils.data.dataloader.DataLoader at 0x79bf997e0c20>

In [None]:
criterion = nn.BCEWithLogitsLoss()






In [None]:
test_loss, test_acc = validate(
    model,
    test_loader,
    criterion
)

In [None]:
print(f"TEST Loss: {test_loss:.4f}")
print(f"TEST Accuracy: {test_acc:.4f}")

TEST Loss: 0.3777
TEST Accuracy: 0.8333


In [None]:
torch.save(
    model.state_dict(),
    "/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/resnet50_ai_real_final.pth"
)

In [None]:
torch.save(
    {
        "epoch": 10,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "val_acc": 0.8359,
        "val_loss": 0.3774,
        "loss": 0.3777,
        "test_acc": test_acc
    },
    "/content/drive/MyDrive/AI_Real_Dataset/pretrainedRestnet/resnet50_checkpoint_stage2_best.pth"
)