In [56]:
#!pip install torchinfo
#!pip install scikit-image

In [57]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
#from torch.nn.Functional
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import cv2 as cv
from torchinfo import summary
import einops
from skimage import feature
import timm

In [58]:
# Define paths to dataset files
path = '/kaggle/input/ai-vs-human-generated-dataset'
train_csv = '/kaggle/input/detect-ai-vs-human-generated-images/train.csv'
test_csv = '/kaggle/input/detect-ai-vs-human-generated-images/test.csv'

# Load the training and test datasets
train = pd.read_csv(train_csv)
test = pd.read_csv(test_csv)

# Print dataset shapes
print(f'Training dataset shape: {train.shape}')
print(f'Test dataset shape: {test.shape}')

# Preprocess column names for consistency
train = train[['file_name', 'label']]
train.columns = ['id', 'label']

# Display columns for reference
print("Train columns:", train.columns)
print("Test columns:", test.columns)

Training dataset shape: (79950, 3)
Test dataset shape: (5540, 1)
Train columns: Index(['id', 'label'], dtype='object')
Test columns: Index(['id'], dtype='object')


In [59]:
print("To check the data distribution for training")
train['label'].value_counts()

To check the data distribution for training


label
1    39975
0    39975
Name: count, dtype: int64

In [60]:
# Split the training data into training and validation sets (95% train, 5% validation)
train_df, val_df = train_test_split(
    train, 
    test_size=0.05, 
    random_state=42,  
    stratify=train['label'] 
)

# Print shapes of the splits
print(f'Train shape: {train_df.shape}')
print(f'Validation shape: {val_df.shape}')

# Check class distribution in both sets
print("\nTrain class distribution:")
print(train_df['label'].value_counts(normalize=True))

print("\nValidation class distribution:")
print(val_df['label'].value_counts(normalize=True))

Train shape: (75952, 2)
Validation shape: (3998, 2)

Train class distribution:
label
0    0.5
1    0.5
Name: proportion, dtype: float64

Validation class distribution:
label
0    0.5
1    0.5
Name: proportion, dtype: float64


In [61]:
# Training augmentations
train_transforms = transforms.Compose([
    transforms.Resize((250,250)),  # Resize to match ConvNeXt preprocessing
    #transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Validation and Test transforms
val_test_transforms = transforms.Compose([
    transforms.Resize((250,250)),  # Resize to 232 as per ConvNeXt documentation
    #transforms.CenterCrop(224), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [62]:
# Dataset class for training and validation
class AIImageDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.dataframe.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        label = self.dataframe.iloc[idx, 1]
        return image, label

# Dataset class for inference (validation and test)
class TestAIImageDataset(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, os.path.basename(img_path)  # Return image and filename

In [63]:
# Create datasets
train_dataset = AIImageDataset(train_df, root_dir=path, transform=train_transforms)

# For validation, create a list of file paths and store labels separately
val_file_list = [os.path.join(path, fname) for fname in val_df['id']]
val_labels = val_df['label'].values  # Store labels separately for later use
val_dataset = TestAIImageDataset(file_list=val_file_list, transform=val_test_transforms)

# For testing, create a list of file paths
test_file_list = [os.path.join(path, fname) for fname in test['id']]
test_dataset = TestAIImageDataset(file_list=test_file_list, transform=val_test_transforms)

print(f"Training dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

Training dataset size: 75952
Validation dataset size: 3998
Test dataset size: 5540


In [64]:
class DetectionModel(nn.Module):
    def __init__(self, num_classes, backbone='Resnet-50', 
                 freeze_backbone=True, add_magnitude_channel=True, add_fft_channel=True, add_lbp_channel=True,
                 learning_rate=1e-4, pos_weight=1):
        super(DetectionModel, self).__init__()
        self.num_classes = num_classes
        self.learning_rate = learning_rate
        self.epoch_outs = []
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.add_magnitude_channel = add_magnitude_channel
        self.add_fft_channel = add_fft_channel
        self.add_lbp_channel = add_lbp_channel
        self.new_channels = sum([self.add_magnitude_channel, self.add_fft_channel, self.add_lbp_channel])
        self.adapter = nn.Conv2d(in_channels=3+self.new_channels, out_channels=3, 
                                     kernel_size=3, stride=1, padding=1)
        self.base_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.inplanes = self.base_model.fc.in_features
        #self.base_model.deactive_last_layer=True
        for param in model.parameters():
            param.requires_grad = False
        # for param in self.base_model.layer1.parameters():
        #     param.requires_grad = False
        # for param in self.base_model.layer2.parameters():
        #     param.requires_grad = False
        self.base_model.fc = nn.Identity()
        #self.freeze_backbone = freeze_backbone
        self.fc1 = nn.Linear(self.inplanes, 512)
        self.activation=nn.ReLU() 
        self.fc2=nn.Linear(512,1)
        
    def _add_new_channels_worker(self, image):
            # convert the image to grayscale
            gray = cv.cvtColor((image.cpu().numpy() * 255).astype(np.uint8), cv.COLOR_BGR2GRAY)
            
            new_channels = []
            if self.add_magnitude_channel:
                new_channels.append(np.sqrt(cv.Sobel(gray,cv.CV_64F,1,0,ksize=7)**2 + cv.Sobel(gray,cv.CV_64F,0,1,ksize=7)**2) )
            
            #if fast_fourier is required, calculate it
            if self.add_fft_channel:
                new_channels.append(20*np.log(np.abs(np.fft.fftshift(np.fft.fft2(gray))) + 1e-9))
            
            #if localbinary pattern is required, calculate it
            if self.add_lbp_channel:
                new_channels.append(feature.local_binary_pattern(gray, 3, 6, method='uniform'))
    
            new_channels = np.stack(new_channels, axis=2) / 255
            return torch.from_numpy(new_channels).to(self.device).float()
    
    def add_new_channels(self, images):
            #copy the input image to avoid modifying the originalu
            images_copied = einops.rearrange(images, "b c h w -> b h w c")
            
            # parallelize over each image in the batch using pool
            new_channels = torch.stack([self._add_new_channels_worker(image) for image in images_copied], dim=0)
            
            # concatenates the new channels to the input image in the channel dimension
            images_copied = torch.concatenate([images_copied, new_channels], dim=-1)
            # cast img again to torch tensor and then reshape to (B, C, H, W)
            images_copied = einops.rearrange(images_copied, "b h w c -> b c h w")
            return images_copied
        
    def forward(self, x):
        out = {}
        # eventually concat the edge sharpness to the input image in the channel dimension
        #print(x.shape)
        if self.add_magnitude_channel or self.add_fft_channel or self.add_lbp_channel:
            x = self.add_new_channels(x)
        #print(x.shape)
        # extracts the features
        x_adapted = self.adapter(x)
        x_adapted=self.activation(x_adapted)
        #print(x.shape)
        # normalizes the input image
        #x_adapted = (x_adapted - torch.as_tensor(timm.data.constants.IMAGENET_DEFAULT_MEAN, device=self.device).view(1, -1, 1, 1)) / torch.as_tensor(timm.data.constants.IMAGENET_DEFAULT_STD, device=self.device).view(1, -1, 1, 1)
        features = self.base_model(x_adapted)
        
        # outputs the logits
        fc1_out = self.fc1(features)
        fc1_out=self.activation(fc1_out)
        out=self.fc2(fc1_out)
        return out

In [65]:
# device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# print(device)
# model = DetectionModel(num_classes=2, backbone='Resnet-50', 
#                  freeze_backbone=True, add_magnitude_channel=False, add_fft_channel=True, add_lbp_channel=True,
#                  learning_rate=1e-4, pos_weight=1).to(device)

# summary(model, input_size=(1, 3, 250, 250))


In [66]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = DetectionModel(num_classes=2, backbone='Resnet-50', freeze_backbone=True, add_magnitude_channel=False, add_fft_channel=True, add_lbp_channel=True,learning_rate=1e-4, pos_weight=1).to(device)
#model = model.to(device)
print(device)
# Define loss function, optimizer, and learning rate scheduler
# optimizer = torch.optim.AdamW([
#     {'params': model.features[-2:].parameters(), 'lr': 1e-5},  # Lower LR for backbone
#     {'params': model.classifier.parameters(), 'lr': 1e-4}      # Higher LR for classifier
# ])
optimizer=torch.optim.AdamW(model.parameters(),lr=1e-4)

#criterion = nn.CrossEntropyLoss()
criterion=nn.BCEWithLogitsLoss()
scheduler = StepLR(optimizer, step_size=5, gamma=0.7)

In [None]:

# Training Loop
epochs = 25

train_losses, train_accuracies, val_losses, val_accuracies, val_f1s = [], [], [], [], []

for epoch in range(epochs):
    # -- Training --
    model.train()
    epoch_loss = 0.0
    epoch_accuracy = 0.0
    
    for data, label in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
        data, label = data.to(device), label.to(device)
        #print(label.shape)
        optimizer.zero_grad()
        output = model(data)
        #print(output.shape)
        label = label.unsqueeze(1).float()
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        #print(f"Train Loss: {loss.item():.4f}")
        epoch_loss += loss.item()
        #preds = output.argmax(dim=1)
        preds = (torch.sigmoid(output) >= 0.5).int()
        acc = (preds == label).float().mean().item()
        epoch_accuracy += acc
    
    epoch_loss /= len(train_loader)
    epoch_accuracy /= len(train_loader)
    
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_accuracy)
    
    # -- Validation --
    model.eval()
    val_loss = 0.0
    val_acc = 0.0
    val_pred_classes = []  # To store predictions
    val_labels_list = []   # To store true labels
    
    with torch.no_grad():
        for i, (data, _) in enumerate(tqdm(val_loader, desc=f"Validation Epoch {epoch+1}")):
            data = data.to(device)
            output = model(data)
            
            # Get true labels from val_df
            batch_labels = val_labels[i * val_loader.batch_size : (i + 1) * val_loader.batch_size]
            batch_labels = torch.tensor(batch_labels, device=device)
            batch_labels = batch_labels.unsqueeze(1).float()
            # Compute loss
            loss = criterion(output, batch_labels)
            val_loss += loss.item()
            
            # Compute predictions and accuracy
            #preds = output.argmax(dim=1)
            
            preds = (torch.sigmoid(output) >= 0.5).int()
            acc = (preds == batch_labels).float().mean().item()
            val_acc += acc
            
            # Store predictions and true labels
            val_pred_classes.extend(preds.cpu().numpy())
            val_labels_list.extend(batch_labels.cpu().numpy())
    
    # Compute average validation metrics
    val_loss /= len(val_loader)
    val_acc /= len(val_loader)
    val_f1 = f1_score(val_labels_list, val_pred_classes, average='binary')  # Binary classification
    
    # Append metrics
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    val_f1s.append(val_f1)
    
    print(
        f"Epoch [{epoch+1}/{epochs}] "
        f"Train Loss: {epoch_loss:.4f} | Train Acc: {epoch_accuracy:.4f} | "
        f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}"
    )
    torch.save(model.state_dict(), f"/kaggle/working/{epoch+1}_{epoch_loss:.4f}_{val_loss:.4f}_{val_f1:.4f}.pth")
    # Step the learning rate scheduler
    scheduler.step()

Training Epoch 1: 100%|██████████| 2374/2374 [52:59<00:00,  1.34s/it]
Validation Epoch 1: 100%|██████████| 125/125 [01:05<00:00,  1.92it/s]


Epoch [1/25] Train Loss: 0.1425 | Train Acc: 0.9441 | Val Loss: 0.0573 | Val Acc: 0.9795 | Val F1: 0.9797


Training Epoch 2: 100%|██████████| 2374/2374 [1:08:23<00:00,  1.73s/it]
Validation Epoch 2: 100%|██████████| 125/125 [01:08<00:00,  1.82it/s]


Epoch [2/25] Train Loss: 0.0718 | Train Acc: 0.9737 | Val Loss: 0.0509 | Val Acc: 0.9830 | Val F1: 0.9829


Training Epoch 3: 100%|██████████| 2374/2374 [1:12:52<00:00,  1.84s/it]
Validation Epoch 3: 100%|██████████| 125/125 [01:10<00:00,  1.78it/s]


Epoch [3/25] Train Loss: 0.0526 | Train Acc: 0.9806 | Val Loss: 0.0482 | Val Acc: 0.9812 | Val F1: 0.9814


Training Epoch 4:  77%|███████▋  | 1825/2374 [52:06<15:46,  1.72s/it] 

In [None]:
#!rm -rf /kaggle/working/*

In [None]:
# Generate predictions and logits for the test set
model.eval()
test_logits = []  # To store logits
test_pred_classes = []

with torch.no_grad():
    for data, _ in tqdm(test_loader, desc="Generating Test Predictions"):
        data = data.to(device)
        output = model(data)  # Raw logits (before softmax)
        
        # Save logits
        #test_logits.extend(output.cpu().numpy())  # Store raw logits
        
        # Get predicted class (0 or 1)
        #preds = output.argmax(dim=1)
        preds = (output >= 0.5).float()
        test_pred_classes.extend(preds.cpu().numpy())

# Convert logits to a DataFrame
#logits_df = pd.DataFrame(test_logits, columns=['logit_class_0', 'logit_class_1'])
#logits_df['id'] = test['id'].values  # Add image IDs for reference

# Save logits to a CSV file
#logits_df.to_csv('test_logits.csv', index=False)

# Add predictions to the test DataFrame
test['label'] = test_pred_classes
test[['id', 'label']].to_csv('/kaggle/working/submission.csv', index=False)

print("Test logits saved to 'test_logits.csv'")
print("Test predictions saved to 'submission.csv'")

In [None]:
chkpt_file="/kaggle/working/3_0.0957_0.0518_0.9816.pth"
checkpoint = torch.load(chkpt_file, map_location=torch.device('cpu'))
pretrained_dict = checkpoint
model.load_state_dict(pretrained_dict)

In [None]:
# Generate predictions and logits for the test set using a checkpoint
model.eval()
test_pred_classes = []

with torch.no_grad():
    for data, _ in tqdm(test_loader, desc="Generating Test Predictions"):
        data = data.to(device)
        output = model(data)
        #print(output)
        output=torch.sigmoid(output)
        #print(output)
        preds = (output >= 0.5).int()
        #print(preds)
        test_pred_classes.extend(preds.cpu().numpy().flatten())
test['label'] = test_pred_classes
test[['id', 'label']].to_csv('/kaggle/working/submission.csv', index=False)

print("Test predictions saved to 'submission.csv'")

In [None]:
pd.read_csv('submission.csv')['label'].value_counts()