In [None]:
!pip install torch torchvision pandas scikit-learn matplotlib tqdm torchmetrics
!pip install torchmetrics
!pip install -U albumentations

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models import MobileNet_V3_Large_Weights
# from torchvision.models import VGG16_BN_Weights
# from torchvision.models import ConvNeXt_Base_Weights
# from torchvision.models import Swin_B_Weights
from torch.utils.data import ConcatDataset, DataLoader, Dataset, random_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from torchmetrics.classification import BinaryAccuracy, BinaryF1Score, BinaryAUROC
from datasets import load_dataset
import pandas as pd
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import torch.amp as amp
from glob import glob
import albumentations as augment
from albumentations.pytorch import ToTensorV2
import cv2
import time

In [3]:
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.image_paths = []
        self.annotations = []
        self.transform = transform
        # Traverse each folder and gather image paths
        for folder in img_dir:
            print(os.path.basename(folder))
            class_val = 1 if os.path.basename(folder) == "real" else 0 # Using folder name to get class value
            print(class_val)
            for root, _, files in os.walk(folder):
                for file in files:
                    if file.endswith(('jpg', 'jpeg', 'png')):
                        self.image_paths.append(os.path.join(root, file))
                        self.annotations.append(class_val)


    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.annotations[idx]

        if self.transform:
            image = self.transform(image = image)["image"]

        return image, label

In [4]:
class PexelsDataset(Dataset):
    def __init__(self, img_dir, transform=None, max_images=None):
        self.image_paths = []
        self.transform = transform
        count = 0
        for root, _, files in os.walk(img_dir):
            for file in files:
                if file.lower().endswith(('jpg', 'jpeg', 'png')):
                    self.image_paths.append(os.path.join(root, file))
                    count += 1
                    if max_images and count >= max_images:
                        break
            if max_images and count >= max_images:
                break

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = 1  # Assign a distinct label for Pexels images
        if self.transform:
            image = self.transform(image=image)["image"]
        return image, label

In [5]:
# Custom PyTorch dataset for HuggingFace Dataset
class GameIRDataset(Dataset):
    def __init__(self, dataset_dict, transform=None, split="train"):
        self.dataset = dataset_dict[split]
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image = self.dataset[idx]["rgb.png"]
        image = np.array(image)
        # Check if the image has an alpha channel (4 channels)
        if image.shape[-1] == 4:
            # Convert RGBA to RGB by discarding the alpha channel
            image = image[:, :, :3]
        if self.transform:
            image = self.transform(image=image)["image"]
        label = 0 
        return image, label

In [6]:
def create_dataloaders(cust_img_dir, pexels_img_dir, batch_size=32, val_split=0.2):

    # Define different transforms for train and validation datasets
    train_transform = augment.Compose([
        augment.RandomCrop(width=370, height=370),
        augment.HorizontalFlip(p=0.5),
        augment.VerticalFlip(p=0.5),
        augment.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        augment.RandomGamma(gamma_limit=(90, 110), p=0.5),
        augment.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

    val_transform = augment.Compose([
        augment.Resize(width=370, height=370),
        augment.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

    # Initialize custom dataset
    original_dataset = CustomImageDataset(img_dir=cust_img_dir, transform=train_transform)

    # Initialize HuggingFace dataset
    # Specify only the tar files you want to load
    data_files = {
        "train": [
            "full_dataset/train/GameIR-SR/GameIR-SR-000000.tar",
            "full_dataset/train/GameIR-SR/GameIR-SR-000001.tar",
            "full_dataset/train/GameIR-SR/GameIR-SR-000002.tar",
            "full_dataset/train/GameIR-SR/GameIR-SR-000003.tar",
            "full_dataset/train/GameIR-SR/GameIR-SR-000004.tar",
            "full_dataset/train/GameIR-SR/GameIR-SR-000005.tar",
        ]
    }
    # Load dataset from Hugging Face
    gameIR_SR = load_dataset("LLLebin/GameIR", data_files=data_files)
    gameir_dataset = GameIRDataset(gameIR_SR, transform=train_transform)

    # Initialize the Pexels dataset with a limit of k images
    pexels_dataset = PexelsDataset(img_dir=pexels_img_dir, transform=train_transform, max_images=12500)
    
    # Combine both datasets
    combined_dataset = ConcatDataset([original_dataset, gameir_dataset, pexels_dataset])
    print("Total length of combined dataset:", len(combined_dataset))

    # Create train/validation split
    val_size = int(len(combined_dataset) * val_split)
    train_size = len(combined_dataset) - val_size
    train_dataset, val_dataset = random_split(combined_dataset, [train_size, val_size])

    # Assign different transforms
    val_dataset.dataset.transform = val_transform

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

    return train_loader, val_loader


In [7]:
def get_model(weights=MobileNet_V3_Large_Weights.DEFAULT):
    model = models.mobilenet_v3_large(weights=weights)
    # Modify the classifier for binary classification
    model.classifier[3] = nn.Linear(model.classifier[3].in_features, 1)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    return model.float()

# def get_model(weights=VGG16_BN_Weights.DEFAULT):
#     model = models.vgg16_bn(weights=weights)
#     # Modify the classifier for binary classification
#     model.classifier[6] = nn.Linear(4096, 1)
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.to(device)
#     return model.float()

# def get_model(weights=ConvNeXt_Base_Weights.DEFAULT):
#     model = models.convnext_base(weights=weights)
#     # Modify the classifier for binary classification
#     model.classifier[2] = nn.Linear(model.classifier[2].in_features, 1)
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.to(device)
#     return model.float()

# def get_model(weights=Swin_B_Weights.DEFAULT):
#     model = models.swin_b(weights=weights)
#     # Modify the classifier for binary classification
#     model.head = nn.Linear(model.head.in_features, 1)
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.to(device)
#     return model.float()

In [8]:
def train_one_epoch(model, train_loader, criterion, optimizer, scaler, device):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader):
        images, labels = images.to(device).clone().detach().to(torch.float32), labels.to(device).float().unsqueeze(1)

        optimizer.zero_grad()

        with amp.autocast(device_type=device):
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def validate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_labels = []
    all_outputs = []
    times = []
    avg_time = 0

    with torch.no_grad():
        for images, labels in tqdm(val_loader):
            images, labels = images.to(device).clone().detach().to(torch.float32), labels.to(device).float().unsqueeze(1)
            
            start_time = time.time()
            outputs = model(images)
            end_time = time.time()
            
            times.append(end_time - start_time)
            
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            all_labels.append(labels.cpu().numpy())
            all_outputs.append(outputs.cpu().numpy())

    epoch_loss = running_loss / len(val_loader.dataset)

    all_labels = np.concatenate(all_labels)
    all_outputs = np.concatenate(all_outputs)
    all_outputs = torch.sigmoid(torch.tensor(all_outputs)).numpy()  # Convert logits to probabilities

    avg_time = np.mean(times)
    return epoch_loss, all_labels, all_outputs, avg_time


In [9]:
def train_model(cust_img_dir, pexels_img_dir, num_epochs=10, batch_size=32, lr=1e-4, val_split=0.2, device='cuda'):
    train_loader, val_loader = create_dataloaders(cust_img_dir, pexels_img_dir, batch_size=batch_size, val_split=val_split)

    model = get_model(weights=MobileNet_V3_Large_Weights.DEFAULT).to(device)
    # model = get_model(weights=VGG16_BN_Weights.DEFAULT).to(device)
    # model = get_model(weights=ConvNeXt_Base_Weights.DEFAULT).to(device)
    # model = get_model(weights=Swin_B_Weights.DEFAULT).to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scaler = amp.GradScaler()

    avg_time = 0.0
    train_losses = []
    val_losses = []
    accuracies = [] 
    f1_scores = []

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, scaler, device)
        val_loss, val_labels, val_outputs, avg_time = validate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        # Calculate metrics on validation set
        val_preds = (val_outputs > 0.5).astype(int)
        accuracy = accuracy_score(val_labels, val_preds)
        f1 = f1_score(val_labels, val_preds)
        auc = roc_auc_score(val_labels, val_outputs)

        accuracies.append(accuracy) 
        f1_scores.append(f1)

        print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        print(f'Val Accuracy: {accuracy:.4f}, F1: {f1:.4f}, AUC-ROC: {auc:.4f}')

        # Save metrics as .npy
        np.save(f'train_loss_epoch_{epoch}.npy', train_losses)
        np.save(f'val_loss_epoch_{epoch}.npy', val_losses)

    # Save model weights 
    torch.save(model.state_dict(), '/kaggle/working/pretrained_MobileNet_V3_Large.pth')
    print("Model saved successfully as model.pth")

    # Plot Loss
    plt.figure()
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.legend()
    plt.savefig('loss_plot.png')
    plt.show()

    plt.figure()
    plt.plot(accuracies, label='Accuracy')
    plt.legend()
    plt.savefig('acc_plot.png')
    plt.show()
    
    plt.figure()
    plt.plot(f1_scores, label='F1-Score')
    plt.legend()
    plt.savefig('f1_plot.png')
    plt.show()
    
    # Print Average Time
    print("Average time to process 1 image = " + str(avg_time)) 
    return model


In [10]:
cust_img_dir = ['/kaggle/input/playing-for-data/01_images/images',
                '/kaggle/input/playing-for-data/02_images/images',
                '/kaggle/input/playing-for-data/03_images/images',
                '/kaggle/input/playing-for-data/04_images/images',
                '/kaggle/input/playing-for-data/05_images/images',
                '/kaggle/input/playing-for-data/06_images/images',
                '/kaggle/input/playing-for-data/07_images/images',
                '/kaggle/input/playing-for-data/08_images/images',
                '/kaggle/input/gameandrealclassificationdatasets/GtaCityscapesKittiDataset/real']
pexels_img_dir = '/kaggle/input/pexels-110k-768p-min-jpg'

In [None]:
model = train_model(cust_img_dir, pexels_img_dir, num_epochs=10, batch_size=16, lr=1e-4)