In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
from PIL import Image
from torch.optim.lr_scheduler import ReduceLROnPlateau
import os
import numpy as np
import gc
import pandas as pd


In [15]:
mode="rgb"

In [16]:
class CustomImageDataset(Dataset):
    def __init__(self, directory):
        self.directory = directory
        self.image_paths = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith((".png", ".jpg", ".jpeg"))]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("RGB")

        transform = transforms.ToTensor()
        
        img= transform(img)
        # Extract label from the filename (fake -> 1, real -> 0)
        label = 1 if "fake" in os.path.basename(img_path).lower() else 0

        return img, label, img_path

In [17]:
# Load datasets using CustomImageDataset
train_data = CustomImageDataset(directory=f"data/processed/32/train/{mode}")
test_data = CustomImageDataset(directory=f"data/processed/32/valid/{mode}")

train_loader = DataLoader(train_data, batch_size=10000, shuffle=True)
test_loader = DataLoader(test_data, batch_size=10000, shuffle=False)

train_data.__getitem__(0)[0].shape

torch.Size([3, 32, 32])

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Classifier32(nn.Module):
    def __init__(self, IMAGE_SIZE):
        super(Classifier32, self).__init__()
        self.input_size = IMAGE_SIZE
        # First convolutional block
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding='same', stride=1)  # Input: 3xHxW, Output: 32xHxW
        self.bn1 = nn.BatchNorm2d(16)

        # Second convolutional block
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding='same', stride=1)  # Output: 64xH/2xW/2
        self.bn2 = nn.BatchNorm2d(32)

        # Third convolutional block
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding='same', stride=1)  # Output: 128xH/4xW/4
        self.bn3 = nn.BatchNorm2d(64)

        # Calculate the flattened size after convolutions
        self._flattened_size = self._compute_flattened_size(self.input_size)

        # Fully connected layers
        self.fc1 = nn.Linear(self._flattened_size, 512)
        self.fc2 = nn.Linear(512, 2)

    def _compute_flattened_size(self, input_size):
        """Compute the size of the tensor after all convolutional and pooling layers."""
        x = torch.zeros(1, 3, *input_size)
        # print(f"Initial size: {x.size()}")
        x = F.relu(self.bn1(self.conv1(x)))
        # print(f"After pool1: {x.size()}")
        x = F.relu(self.bn2(self.conv2(x)))
        # print(f"After pool2: {x.size()}")
        x = F.relu(self.bn3(self.conv3(x)))
        # print(f"After pool3: {x.size()}")
        return x.numel()

    def forward(self, x):
        # Convolutional layers
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        # Flatten the output for the fully connected layers
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x


In [19]:
torch.cuda.empty_cache()

In [20]:
total_bytes = sum(torch.cuda.memory_stats().values())
total_gbs = total_bytes / (1024 ** 3)
print(f"Total size: {total_gbs:.2f} GB")


Total size: 75951.30 GB


In [21]:
print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
IMAGE_SIZE=(32,32)

def train(train_loader):
    # Initialize the model, loss function, and optimizer
    model = Classifier32(IMAGE_SIZE).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.03)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=1)

    #early stopping variables
    prev_loss=float('inf')
    worse_loss_counter=0
    
    # Train the model
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        batches=0
        for inputs, labels, paths in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            print(f"Batches: {batches}", end="\r")
            batches+=1
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        prev_loss=running_loss    
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")
        if (running_loss>prev_loss):
            worse_loss_counter+=1
            if (worse_loss_counter>4):
                print("Early stopping, results are not improving fast enough")
                break;


    return model

True


In [22]:
# Test the model


def predict (model, test_loader):
    results={}
    
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels, paths in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            for path, true_label, pred_label in zip(paths, labels.cpu().numpy(), predicted.cpu().numpy()):
                results[path]=[true_label, pred_label]
    
    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy}%")
    return results


In [23]:
model=train(train_loader)

Batches: 11

KeyboardInterrupt: 

In [None]:
result= predict(model, test_loader)

In [None]:
df_result=pd.DataFrame(result).transpose()
df_result.reset_index(inplace=True)

In [None]:
df_result.to_csv(f"result_32_{mode}.csv", index=False)
