In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("xhlulu/140k-real-and-fake-faces")

print("Path to dataset files:", path)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import numpy as np

print(torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "None")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# import first dataset
import pandas as pd

df_train = pd.read_csv(f'{path}/train.csv',index_col=0)
df_val = pd.read_csv(f'{path}/valid.csv',index_col=0)
df_test = pd.read_csv(f'{path}/test.csv',index_col=0)

In [None]:
df_train.head()

In [None]:
df_val.head()

In [None]:
df_test.head()

In [None]:
# remove not needed columns and shuffle
df_train = df_train.drop(columns=['original_path','id','label_str']).sample(frac=1)
df_val = df_val.drop(columns=['original_path','id','label_str']).sample(frac=1)
df_test = df_test.drop(columns=['original_path','id','label_str']).sample(frac=1)
df_train.head()

In [None]:
# rectify labels
df_train['label'] = 1 - df_train['label']
df_val['label'] = 1 - df_val['label']
df_test['label'] = 1 - df_test['label']
df_train.head()

In [None]:
# map each image name to its path
df_train["path"] = df_train["path"].map(lambda x: f'{path}/real_vs_fake/real-vs-fake/' + x)
df_val["path"]= df_val["path"].map(lambda x: f'{path}/real_vs_fake/real-vs-fake/' + x)
df_test["path"] = df_test["path"].map(lambda x: f'{path}/real_vs_fake/real-vs-fake/' + x)
df_train.head()

In [None]:
# show a sample of images from the dataset with corresponding labels
import matplotlib.pyplot as plt
label_arg = {0:'real', 1:'fake'}
fig, axs = plt.subplots(2,3,figsize=(10,8))
for i in range(2):
    for j in range(3):
        random_idx = np.random.randint(0,df_train.shape[0],1)
        img = Image.open(df_train["path"].iloc[int(random_idx)])
        label = df_train['label'].iloc[int(random_idx)]
        axs[i,j].imshow(img)
        axs[i,j].set_title(label_arg[label])

In [None]:
# Define transforms with input size 224x224 and standard normalization
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),  # ImageNet normalization
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])


In [None]:
# Dataset class
class DeepfakeDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = row["path"]
        image = Image.open(img_path).convert("RGB")
        label = row["label"]
        if self.transform:
            image = self.transform(image)
        return image, label

# Create datasets and dataloaders (assuming df_train, df_val, df_test are already defined)
train_dataset = DeepfakeDataset(df_train, transform=train_transform)
val_dataset   = DeepfakeDataset(df_val, transform=test_transform)
test_dataset  = DeepfakeDataset(df_test, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

# Check shapes (for sanity)
for images, labels in train_loader:
    print(images.shape)  # Expect [batch_size, 3, 224, 224]
    print(labels.shape)  # Expect [batch_size]
    break


In [None]:
# Define the drop_connect function for stochastic depth (as used in EfficientNet)
def drop_connect(inputs: torch.Tensor, p: float, training: bool) -> torch.Tensor:
    if not training or p <= 0.0:
        return inputs
    keep_prob = 1.0 - p
    batch_size = inputs.shape[0]
    # Generate binary mask
    random_tensor = keep_prob + torch.rand([batch_size, 1, 1, 1], dtype=inputs.dtype, device=inputs.device)
    binary_tensor = torch.floor(random_tensor)
    # Scale output
    outputs = inputs.div(keep_prob) * binary_tensor
    return outputs

# Squeeze-and-Excitation block
class SqueezeExcitation(nn.Module):
    def __init__(self, in_ch: int, se_ratio: float = 0.25, activation=nn.SiLU):
        super().__init__()
        squeezed_ch = max(1, int(in_ch * se_ratio))
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Conv2d(in_ch, squeezed_ch, kernel_size=1),
            activation(),
            nn.Conv2d(squeezed_ch, in_ch, kernel_size=1),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        s = self.avg_pool(x)
        s = self.fc(s)
        return x * s

# MBConvBlock (Mobile Inverted Residual Block with SE and drop connect)
class MBConvBlock(nn.Module):
    def __init__(self, in_ch: int, out_ch: int, kernel_size: int, stride: int,
                 expand_ratio: int, se_ratio: float = 0.25, drop_connect_rate: float = 0.0,
                 norm_layer=nn.BatchNorm2d, activation=nn.SiLU):
        super().__init__()
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.stride = stride
        self.expand_ratio = expand_ratio
        self.has_residual = (stride == 1 and in_ch == out_ch)
        self.drop_connect_rate = drop_connect_rate

        hidden_dim = in_ch * expand_ratio
        layers = []

        # Expansion phase
        if expand_ratio != 1:
            layers += [
                nn.Conv2d(in_ch, hidden_dim, kernel_size=1, bias=False),
                norm_layer(hidden_dim),
                activation(),
            ]

        # Depthwise convolution
        layers += [
            nn.Conv2d(hidden_dim, hidden_dim, kernel_size=kernel_size, stride=stride,
                      padding=kernel_size // 2, groups=hidden_dim, bias=False),
            norm_layer(hidden_dim),
            activation(),
        ]

        # Squeeze-and-Excitation
        if se_ratio is not None and 0.0 < se_ratio <= 1.0:
            layers.append(SqueezeExcitation(hidden_dim, se_ratio=se_ratio, activation=activation))

        # Projection phase
        layers += [
            nn.Conv2d(hidden_dim, out_ch, kernel_size=1, bias=False),
            norm_layer(out_ch),
        ]

        self.block = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out = self.block(x)
        if self.has_residual:
            if self.drop_connect_rate and self.training:
                out = drop_connect(out, p=self.drop_connect_rate, training=self.training)
            out = out + x
        return out

# EfficientNet-B0 definition
class EfficientNetB0(nn.Module):
    """
    EfficientNet-B0 model without pretrained weights. Uses MBConv blocks and SE blocks.
    """
    def __init__(self, num_classes: int = 1, drop_connect_rate: float = 0.2):
        super().__init__()
        norm_layer = nn.BatchNorm2d
        activation = nn.SiLU

        # Stem: initial convolution layer
        self.stem = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1, bias=False),
            norm_layer(32),
            activation(),
        )

        # EfficientNet-B0 block settings: (expansion, out_channels, repeats, kernel, stride)
        b0_settings = [
            (1, 16, 1, 3, 1),
            (6, 24, 2, 3, 2),
            (6, 40, 2, 5, 2),
            (6, 80, 3, 3, 2),
            (6, 112, 3, 5, 1),
            (6, 192, 4, 5, 2),
            (6, 320, 1, 3, 1),
        ]

        # Build MBConv blocks
        blocks = []
        in_ch = 32
        total_blocks = sum([r for (_, _, r, _, _) in b0_settings])
        block_id = 0
        for expansion, out_ch, repeats, k, s in b0_settings:
            for i in range(repeats):
                stride = s if i == 0 else 1
                # Linearly scale drop connect rate
                dcr = drop_connect_rate * float(block_id) / max(1, total_blocks - 1)
                blocks.append(
                    MBConvBlock(in_ch=in_ch, out_ch=out_ch,
                                kernel_size=k, stride=stride,
                                expand_ratio=expansion, se_ratio=0.25,
                                drop_connect_rate=dcr,
                                norm_layer=norm_layer, activation=activation)
                )
                in_ch = out_ch
                block_id += 1

        self.blocks = nn.Sequential(*blocks)

        # Head: final layers
        self.head = nn.Sequential(
            nn.Conv2d(in_ch, 1280, kernel_size=1, bias=False),
            norm_layer(1280),
            activation(),
        )
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(1280, num_classes)
        )

        # Initialize weights
        self._initialize_weights()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.stem(x)
        x = self.blocks(x)
        x = self.head(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)


In [None]:
# Instantiate the custom EfficientNet-B0 for binary classification (num_classes=1)
channels = 3
num_classes = 1
model = EfficientNetB0(num_classes=num_classes).to(device)
print(model)


In [None]:
# Use BCEWithLogitsLoss for binary classification and compute pos_weight if needed
learning_rate=1e-3
num_pos = df_train['label'].sum()
num_neg = len(df_train) - num_pos
pos_weight = torch.tensor([num_neg / num_pos]).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
from tqdm import tqdm
from sklearn.metrics import roc_curve, auc

num_epochs = 20
start_epoch = 0
checkpoint_path = "checkpoint.pth"

for epoch in range(start_epoch, num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    train_pbar = tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] Training", leave=False)
    for images, labels in train_pbar:
        images = images.to(device)
        labels = labels.float().unsqueeze(1).to(device)  # BCEWithLogitsLoss expects float

        optimizer.zero_grad()
        outputs = model(images)                   # raw logits
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        # Apply sigmoid to get probabilities, then threshold
        preds = (torch.sigmoid(outputs) > 0.5).float()
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        train_pbar.set_postfix({
            "Loss": f"{loss.item():.4f}",
            "Acc": f"{(correct / total):.4f}"
        })

    train_loss = running_loss / total
    train_acc = correct / total

    # -------------------- Validation --------------------
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    all_labels = []
    all_probs = []

    val_pbar = tqdm(val_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] Validation", leave=False)
    with torch.no_grad():
        for images, labels in val_pbar:
            images = images.to(device)
            labels = labels.float().unsqueeze(1).to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * images.size(0)
            preds = (torch.sigmoid(outputs) > 0.5).float()
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

            val_pbar.set_postfix({
                "Val Loss": f"{loss.item():.4f}",
                "Val Acc": f"{(val_correct / val_total):.4f}"
            })

            # Collect labels and predicted probabilities
            all_probs.extend(torch.sigmoid(outputs).cpu().numpy().flatten())
            all_labels.extend(labels.cpu().numpy().flatten())

    val_loss /= val_total
    val_acc = val_correct / val_total

    print(f"\nEpoch [{epoch+1}/{num_epochs}] "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    # Compute ROC AUC if needed
    all_labels_np = np.array(all_labels)
    all_probs_np = np.array(all_probs)
    fpr, tpr, _ = roc_curve(all_labels_np, all_probs_np)
    roc_auc = auc(fpr, tpr)
    print(f"Epoch {epoch+1} AUC: {roc_auc:.4f}")

    # (Optional) Save checkpoint
    torch.save({'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict()},
               checkpoint_path)
    print(f"Checkpoint saved at epoch {epoch+1}")


In [None]:
model.eval()
test_loss = 0.0
test_correct = 0
test_total = 0

all_test_labels = []
all_test_probs = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.float().unsqueeze(1).to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

        test_loss += loss.item() * images.size(0)
        preds = (torch.sigmoid(outputs) > 0.5).float()
        test_correct += (preds == labels).sum().item()
        test_total += labels.size(0)

        all_test_probs.extend(torch.sigmoid(outputs).cpu().numpy().flatten())
        all_test_labels.extend(labels.cpu().numpy().flatten())

test_loss /= test_total
test_acc = test_correct / test_total
print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")

# Optional: compute test ROC AUC
fpr, tpr, _ = roc_curve(np.array(all_test_labels), np.array(all_test_probs))
test_auc = auc(fpr, tpr)
print(f"Test AUC: {test_auc:.4f}")
