In [1]:
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
from torchvision import transforms

FRAMES = 16

class VideoFrameDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.samples = []
        self.transform = transform

        for label, cls in enumerate(["fake", "real"]):
            cls_path = os.path.join(root_dir, cls)
            for video in sorted(os.listdir(cls_path)):
                self.samples.append((os.path.join(cls_path, video), label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path, label = self.samples[idx]
        frame_files = sorted(os.listdir(video_path))

        # hard assertion — fail fast, not silently
        assert len(frame_files) == FRAMES, f"{video_path} has {len(frame_files)} frames"

        frames = []
        for f in frame_files:
            img = Image.open(os.path.join(video_path, f)).convert("RGB")
            if self.transform:
                img = self.transform(img)
            frames.append(img)

        x = torch.stack(frames)  # [16, 3, 224, 224]
        y = torch.tensor(label).long()
        return x, y


In [2]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


In [3]:
ROOT = r"J:\Chapter\IEEE-CS\frames_new\train"

dataset = VideoFrameDataset(ROOT, transform)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=4, shuffle=False, num_workers=0)

# sanity check — if this fails, stop
x, y = next(iter(train_loader))
print(x.shape)  # MUST be [4, 16, 3, 224, 224]
print(y)


torch.Size([4, 16, 3, 224, 224])
tensor([0, 1, 0, 0])


In [4]:
import os

ROOT = r"J:\Chapter\IEEE-CS\frames_new\train"
BAD = []

for cls in ["fake", "real"]:
    cls_path = os.path.join(ROOT, cls)
    for vid in os.listdir(cls_path):
        vid_path = os.path.join(cls_path, vid)
        n = len(os.listdir(vid_path))
        if n != 16:
            BAD.append((vid_path, n))

print("Broken videos:", len(BAD))
for b in BAD[:10]:
    print(b)


Broken videos: 0


In [4]:
import torch
import torch.nn as nn
import torchvision.models as models

class VideoMaxPoolModel(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        # Frame-level feature extractor
        resnet = models.resnet18(pretrained=pretrained)
        self.cnn = nn.Sequential(*list(resnet.children())[:-1])  # remove FC
        self.feature_dim = resnet.fc.in_features

        # Classifier
        self.fc = nn.Linear(self.feature_dim, 1)

    def forward(self, x):
        """
        x: [B, T, 3, 224, 224]  (B=batch, T=16 frames)
        """
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)           # treat frames as batch
        feats = self.cnn(x)                  # [B*T, feat,1,1]
        feats = feats.view(B, T, -1)         # [B, T, feat]
        feats, _ = torch.max(feats, dim=1)   # temporal max pooling
        out = self.fc(feats)                 # [B, 1]
        return torch.sigmoid(out).squeeze(1) # [B]


In [5]:
import torch.optim as optim

device = "cuda" if torch.cuda.is_available() else "cpu"

model = VideoMaxPoolModel(pretrained=True).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

EPOCHS = 10

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0

    for x, y in train_loader:
        x = x.to(device)
        y = y.float().to(device)

        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)

    avg_loss = total_loss / len(train_loader.dataset)

    # validation
    model.eval()
    correct = 0
    with torch.no_grad():
        for x_val, y_val in val_loader:
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            out_val = model(x_val) > 0.5
            correct += (out_val == y_val).sum().item()

    val_acc = correct / len(val_loader.dataset)
    print(f"Epoch {epoch+1}/{EPOCHS} | Loss: {avg_loss:.4f} | Val Acc: {val_acc:.3f}")




Epoch 1/10 | Loss: 0.8242 | Val Acc: 0.571
Epoch 2/10 | Loss: 0.4811 | Val Acc: 0.411
Epoch 3/10 | Loss: 0.3874 | Val Acc: 0.375
Epoch 4/10 | Loss: 0.3386 | Val Acc: 0.571
Epoch 5/10 | Loss: 0.3446 | Val Acc: 0.446
Epoch 6/10 | Loss: 0.3036 | Val Acc: 0.518
Epoch 7/10 | Loss: 0.2748 | Val Acc: 0.482
Epoch 8/10 | Loss: 0.2405 | Val Acc: 0.518
Epoch 9/10 | Loss: 0.1785 | Val Acc: 0.518
Epoch 10/10 | Loss: 0.1880 | Val Acc: 0.446


In [9]:
# Save the trained model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'epoch': EPOCHS,
    'val_acc': val_acc
}, 'deepfake_model.pth')

print("\n✅ Model saved as 'deepfake_model.pth'")



✅ Model saved as 'deepfake_model.pth'
