In [2]:
pip install facenet-pytorch opencv-python numpy tqdm



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
import os
from tqdm import tqdm
from facenet_pytorch import MTCNN
from torch.utils.data import Dataset, DataLoader
from timm import create_model




# Data Preprocessing


class FacePreprocessor:
    def __init__(self, device='cuda:0' if torch.cuda.is_available() else 'cpu'):
        self.device = device
        self.mtcnn = MTCNN(keep_all=True, device=device)

    def process_frame(self, frame):
        """Process a single frame: detect, align, and crop face"""
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        boxes, _, landmarks = self.mtcnn.detect(frame_rgb, landmarks=True)

        if boxes is None or len(boxes) == 0:
            return None

        # Get nose tip (3rd landmark point)
        nose_tip = landmarks[0][2]

        # Calculate face size (1.25x max dimension)
        x1, y1, x2, y2 = boxes[0]
        w, h = x2 - x1, y2 - y1
        max_dim = max(w, h) * 1.25

        # Calculate new bounding box centered at nose tip
        x_center, y_center = nose_tip
        x1_new = max(0, int(x_center - max_dim / 2))
        y1_new = max(0, int(y_center - max_dim / 2))
        x2_new = min(frame.shape[1], int(x_center + max_dim / 2))
        y2_new = min(frame.shape[0], int(y_center + max_dim / 2))

        # Crop face region
        face = frame_rgb[y1_new:y2_new, x1_new:x2_new]
        if face.size == 0:
            return None

        # Resize to 300x300 and convert back to BGR
        face = cv2.resize(face, (300, 300))
        return cv2.cvtColor(face, cv2.COLOR_RGB2BGR)


class VideoProcessor:
    def __init__(self, num_frames=270):
        self.num_frames = num_frames
        self.face_preprocessor = FacePreprocessor()

    def extract_frames(self, video_path):
        """Extract and process frames from a video"""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return []

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_step = max(1, total_frames // self.num_frames)

        frames = []
        for i in range(self.num_frames):
            # Read frame at calculated position
            pos = min(i * frame_step, total_frames - 1)
            cap.set(cv2.CAP_PROP_POS_FRAMES, pos)
            ret, frame = cap.read()

            if not ret:
                break

            # Process frame
            processed = self.face_preprocessor.process_frame(frame)
            if processed is not None:
                frames.append(processed)

        cap.release()
        return np.array(frames)  # (T, 300, 300, 3)


class SequenceGenerator:
    def __init__(self, seq_length=6):
        self.seq_length = seq_length

    def create_sequences(self, frames):
        """Create consecutive frame sequences"""
        sequences = []
        for i in range(len(frames) - self.seq_length + 1):
            sequence = frames[i:i+self.seq_length]
            sequences.append(sequence)
        return np.array(sequences)  # (N, T, H, W, C)

    def normalize(self, sequence):
        """Normalize sequence for model input"""
        # Convert to [0, 1] range
        sequence = sequence.astype(np.float32) / 255.0

        # ImageNet normalization
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        sequence = (sequence - mean) / std

        # Change to (T, C, H, W)
        return np.transpose(sequence, (0, 3, 1, 2))


def preprocess_video(video_path, output_dir):
    """Full preprocessing pipeline for a single video"""
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    frame_dir = os.path.join(output_dir, video_name)

    #  Skip if already processed
    if os.path.exists(frame_dir) and any(f.startswith("seq_") and f.endswith(".npy") for f in os.listdir(frame_dir)):
        print(f" Skipping already processed: {video_name}")
        return

    os.makedirs(frame_dir, exist_ok=True)

    # Step 1: Process video
    processor = VideoProcessor()
    frames = processor.extract_frames(video_path)

    if len(frames) == 0:
        print(f" No valid frames found in: {video_name}")
        return

    # Step 2: Save frames
    for i, frame in enumerate(frames):
        cv2.imwrite(f"{frame_dir}/frame_{i:04d}.jpg", frame)

    # Step 3: Create sequences
    seq_generator = SequenceGenerator()
    sequences = seq_generator.create_sequences(frames)

    # Step 4: Save sequences
    for i, seq in enumerate(sequences):
        normalized_seq = seq_generator.normalize(seq)
        np.save(f"{frame_dir}/seq_{i:03d}.npy", normalized_seq)

    return len(sequences)



class DeepfakeDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.sequence_paths = self._find_sequences()

    def _find_sequences(self):
        sequence_paths = []
        for root, _, files in os.walk(self.data_dir):
            for file in files:
                if file.startswith('seq_') and file.endswith('.npy'):
                    sequence_paths.append(os.path.join(root, file))
        return sequence_paths

    def _len_(self):
        return len(self.sequence_paths)

    def _getitem_(self, idx):
        seq_path = self.sequence_paths[idx]
        sequence = np.load(seq_path)

        # Determine label from directory name
        label = 0 if "fake" in seq_path.lower() else 1
        return torch.tensor(sequence), torch.tensor(label, dtype=torch.float32)


# Model Architecture


from timm import create_model
import torch.nn as nn

from timm import create_model
import torch.nn as nn
import torch

class FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = create_model(
            'xception',
            pretrained=True,
            features_only=True,
            out_indices=(3,)  # Choose the layer that gives you 728-dim features
        )

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)  # Merge batch and time
        features = self.backbone(x)[0]  # Get feature map
        _, C_f, H_f, W_f = features.shape
        return features.view(B, T, C_f, H_f, W_f)  # Unstack time


class Tokenizer(nn.Module):
    def __init__(self, num_frames=6, spatial_size=19, token_dim=728):
        super().__init__()
        self.num_frames = num_frames
        self.num_patches = spatial_size * spatial_size
        self.token_dim = token_dim

        # Classification tokens
        self.spatial_cls = nn.Parameter(torch.randn(1, num_frames, 1, token_dim))
        self.temporal_cls = nn.Parameter(torch.randn(1, 1, self.num_patches + 1, token_dim))

        # Position embedding
        self.pos_embed = nn.Parameter(torch.randn(1, num_frames + 1, self.num_patches + 1, token_dim))

    def forward(self, x):
        # Input: (B, T, C, H, W) -> Output: (B, T+1, HW+1, D)
        B, T, C, H, W = x.shape
        x = x.flatten(3).permute(0, 1, 3, 2)  # (B, T, HW, C)

        # Add spatial CLS tokens
        spatial_cls = self.spatial_cls.expand(B, -1, -1, -1)
        x = torch.cat([spatial_cls, x], dim=2)  # (B, T, HW+1, D)

        # Add temporal CLS token
        temporal_cls = self.temporal_cls.expand(B, -1, -1, -1)
        x = torch.cat([temporal_cls, x], dim=1)  # (B, T+1, HW+1, D)

        # Add position embedding
        x = x + self.pos_embed
        return x


class DecomposedAttention(nn.Module):
    def __init__(self, dim, num_heads=8):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim * 3)
        self.proj = nn.Linear(dim, dim)

    def temporal_attention(self, q, k, v):
        # q, k, v: (B, N, S, T, D)
        attn = torch.einsum('bnstd,bnsud->bnstu', q, k) * self.scale
        attn = attn.softmax(dim=-1)
        return torch.einsum('bnstu,bnsud->bnstd', attn, v)

    def spatial_attention(self, q, k, v):
        # q, k, v: (B, N, T, S, D)
        attn = torch.einsum('bntsd,bntud->bntsu', q, k) * self.scale
        attn = attn.softmax(dim=-1)
        return torch.einsum('bntsu,bntud->bntsd', attn, v)

    def forward(self, x, mode='temporal'):
        B, T, S, D = x.shape
        qkv = self.qkv(x).reshape(B, T, S, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(3, 0, 4, 1, 2, 5)  # (3, B, N, T, S, D)
        q, k, v = qkv[0], qkv[1], qkv[2]

        if mode == 'temporal':
            q = q.permute(0, 1, 3, 2, 4)  # (B, N, S, T, D)
            k = k.permute(0, 1, 3, 2, 4)
            v = v.permute(0, 1, 3, 2, 4)
            out = self.temporal_attention(q, k, v).permute(0, 1, 3, 2, 4)
        else:  # spatial
            out = self.spatial_attention(q, k, v)

        out = out.permute(0, 2, 3, 1, 4).reshape(B, T, S, D)
        return self.proj(out)


class SelfSubtract(nn.Module):
    def forward(self, x):
        # x: (B, T, S, D)
        t_cls = x[:, 0:1]  # Temporal CLS token
        frames = x[:, 1:]

        # Compute residuals: [frame2-frame1, frame3-frame2, ...]
        residuals = frames[:, 1:] - frames[:, :-1]

        # Combine CLS, first frame, and residuals
        return torch.cat([t_cls, frames[:, 0:1], residuals], dim=1)


class ISTVTBlock(nn.Module):
    def __init__(self, dim, num_heads):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        self.norm3 = nn.LayerNorm(dim)

        self.temp_attn = DecomposedAttention(dim, num_heads)
        self.spatial_attn = DecomposedAttention(dim, num_heads)
        self.self_subtract = SelfSubtract()

        self.ffn = nn.Sequential(
            nn.Linear(dim, dim * 4),
            nn.GELU(),
            nn.Linear(dim * 4, dim)
        )

    def forward(self, x):
        # Temporal attention with self-subtract
        x_norm = self.norm1(x)
        x_sub = self.self_subtract(x_norm)
        temp_out = self.temp_attn(x_sub, mode='temporal') + x

        # Spatial attention
        temp_norm = self.norm2(temp_out)
        spatial_out = self.spatial_attn(temp_norm, mode='spatial') + temp_out

        # FFN
        ffn_in = self.norm3(spatial_out)
        return self.ffn(ffn_in) + spatial_out


class ISTVT(nn.Module):
    def __init__(self, num_frames=6, num_blocks=12, dim=728, num_heads=8):
        super().__init__()
        self.feature_extractor = FeatureExtractor()
        self.tokenizer = Tokenizer(num_frames=num_frames)
        self.blocks = nn.ModuleList([
            ISTVTBlock(dim, num_heads) for _ in range(num_blocks)
        ])
        self.head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, 1)
        )

    def forward(self, x):
        # Input: (B, T, C, H, W) = (batch, 6, 3, 300, 300)
        features = self.feature_extractor(x)  # (B, 6, 728, 19, 19)
        tokens = self.tokenizer(features)     # (B, 7, 362, 728)

        for block in self.blocks:
            tokens = block(tokens)

        # Use temporal CLS token for prediction
        cls_token = tokens[:, 0, 0]
        return self.head(cls_token).squeeze(1)

class DeepfakeDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.sequence_paths = self._find_sequences()

    def _find_sequences(self):
        sequence_paths = []
        for root, _, files in os.walk(self.data_dir):
            for file in files:
                if file.startswith('seq_') and file.endswith('.npy'):
                    sequence_paths.append(os.path.join(root, file))
        return sequence_paths

    def __len__(self):
        return len(self.sequence_paths)

    def __getitem__(self, idx):
        seq_path = self.sequence_paths[idx]
        sequence = np.load(seq_path)

        # Determine label from directory name
        label = 0 if "fake" in seq_path.lower() else 1
        return torch.tensor(sequence), torch.tensor(label, dtype=torch.float32)

# Training Setup

def train(model, dataloader, val_loader, epochs=100, lr=0.0005, checkpoint_dir="checkpoints"):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    if torch.cuda.device_count() > 1:
        model = nn.DataParallel(model)
        print(f"Using {torch.cuda.device_count()} GPUs")

    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)

    os.makedirs(checkpoint_dir, exist_ok=True)
    start_epoch = 0
    best_val_acc = 0.0

    # Resume if last_epoch.txt exists
    last_epoch_file = os.path.join(checkpoint_dir, "last_epoch.txt")
    if os.path.exists(last_epoch_file):
        with open(last_epoch_file, "r") as f:
            start_epoch = int(f.read().strip()) + 1
        checkpoint_path = os.path.join(checkpoint_dir, f"model_epoch_{start_epoch - 1}.pth")
        if os.path.exists(checkpoint_path):
            model.load_state_dict(torch.load(checkpoint_path))
            print(f"Resumed from epoch {start_epoch}")

    for epoch in range(start_epoch, epochs):
        # Training
        model.train()
        train_loss = 0.0
        progress = tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
        for sequences, labels in progress:
            sequences = sequences.to(device).float()
            labels = labels.to(device).float()

            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            progress.set_postfix({"loss": f"{loss.item():.4f}"})

        # Validation
        model.eval()
        val_loss = 0.0
        correct, total = 0, 0
        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences = sequences.to(device).float()
                labels = labels.to(device).float()

                outputs = model(sequences)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = (torch.sigmoid(outputs) > 0.5).float()
                correct += (preds == labels).sum().item()
                total += labels.size(0)

        val_acc = 100 * correct / total
        train_loss /= len(dataloader)
        val_loss /= len(val_loader)
        scheduler.step(val_loss)

        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")

        # Save current epoch
        torch.save(model.state_dict(), os.path.join(checkpoint_dir, f"model_epoch_{epoch}.pth"))
        with open(last_epoch_file, "w") as f:
            f.write(str(epoch))

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), os.path.join(checkpoint_dir, "best_model.pth"))
            print("Saved best model!")

    return model



# ======================
# Main Execution
# ======================

if __name__ == "__main__":
    from google.colab import drive
    drive.mount('/content/drive')

    import os

    # === Configuration ===
    DATA_DIR = "/content/drive/MyDrive/ISTVT Dataset"  # This should contain 'real/' and 'fake/'
    PROCESSED_DIR = "processed_data"
    CHECKPOINT_DIR = "/content/drive/MyDrive/deepfake_checkpoints"

    BATCH_SIZE = 1
    EPOCHS = 5
    NUM_FRAMES = 6

    os.makedirs(PROCESSED_DIR, exist_ok=True)
    os.makedirs(CHECKPOINT_DIR, exist_ok=True)

    # === Step 1: Preprocess videos (real + fake) ===
    for label in ["real", "fake"]:
        input_dir = os.path.join(DATA_DIR, label)
        output_dir = os.path.join(PROCESSED_DIR, label)
        os.makedirs(output_dir, exist_ok=True)

        for video_file in os.listdir(input_dir):
            if video_file.lower().endswith(('.mp4', '.avi', '.mov')):
                video_path = os.path.join(input_dir, video_file)
                print(f"Preprocessing {video_path}...")
                preprocess_video(video_path, output_dir)

    # === Step 2: Load dataset ===
    full_dataset = DeepfakeDataset(PROCESSED_DIR)
    print ("lenght of dataset is ": {len(full_dataset)})

    # === Step 3: Split into train/val ===
    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

    # === Step 4: Initialize Model ===
    model = ISTVT(num_frames=NUM_FRAMES)

    # === Step 5: Train (auto resume + save to Drive) ===
    trained_model = train(model, train_loader, val_loader, epochs=EPOCHS, checkpoint_dir=CHECKPOINT_DIR)

    print(" Training completed and model saved in Google Drive!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__talking_against_wall.mp4...
 Skipping already processed: 01__talking_against_wall
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__kitchen_pan.mp4...
 Skipping already processed: 01__kitchen_pan
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__exit_phone_room.mp4...
 Skipping already processed: 01__exit_phone_room
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__hugging_happy.mp4...
 Skipping already processed: 01__hugging_happy
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__outside_talking_pan_laughing.mp4...
 Skipping already processed: 01__outside_talking_pan_laughing
Preprocessing /content/drive/MyDrive/ISTVT Dataset/real/01__podium_speech_happy.mp4...
 Skipping already processed: 01__podium_speech_happy
Preprocessing /content/drive/MyDrive/ISTVT Data

  model = create_fn(
Epoch 1/5 [Train]:   0%|          | 1/1866 [01:39<51:42:27, 99.81s/it, loss=0.6985]

In [5]:
# === Step 6: Evaluate a Single Video ===
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
import os
from tqdm import tqdm
from facenet_pytorch import MTCNN
from torch.utils.data import Dataset, DataLoader
from timm import create_model

def evaluate_single_video(model, video_path, num_frames=6):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()

    print(f"\n📹 Evaluating video: {os.path.basename(video_path)}")

    # Extract and process frames
    frames = VideoProcessor(num_frames=num_frames).extract_frames(video_path)
    if len(frames) < num_frames:
        print(" Not enough frames for evaluation.")
        return

    # Create sequences
    seq_gen = SequenceGenerator(seq_length=num_frames)
    sequences = seq_gen.create_sequences(frames)

    # Predict on all sequences
    preds = []
    for seq in sequences:
        norm_seq = seq_gen.normalize(seq)
        tensor = torch.tensor(norm_seq).unsqueeze(0).float().to(device)

        with torch.no_grad():
            output = model(tensor)
            prob = torch.sigmoid(output).item()
            preds.append(prob)

    avg_prob = np.mean(preds)
    pred_label = int(avg_prob > 0.5)

    print(f"\n Prediction: {'REAL' if pred_label else 'FAKE'}")
    print(f" Confidence Score: {avg_prob:.4f}")

# === Example usage ===
trained_model = "/content/best_istvt_model.pth"
TEST_VIDEO_PATH = "/content/01_03__talking_against_wall__JZUXXFRB.mp4"  # change this to your test video path

evaluate_single_video(trained_model, TEST_VIDEO_PATH, num_frames=6)


In [None]:
# === Step 6: Evaluate a Single Video ===
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
import os
from tqdm import tqdm
from facenet_pytorch import MTCNN
from torch.utils.data import Dataset, DataLoader
from timm import create_model

def evaluate_single_video(model, video_path, num_frames=6):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()

    print(f"\n Evaluating video: {os.path.basename(video_path)}")

    # Extract and process frames
    frames = VideoProcessor(num_frames=num_frames).extract_frames(video_path)
    if len(frames) < num_frames:
        print(" Not enough frames for evaluation.")
        return

    # Create sequences
    seq_gen = SequenceGenerator(seq_length=num_frames)
    sequences = seq_gen.create_sequences(frames)

    # Predict on all sequences
    preds = []
    for seq in sequences:
        norm_seq = seq_gen.normalize(seq)
        tensor = torch.tensor(norm_seq).unsqueeze(0).float().to(device)

        with torch.no_grad():
            output = model(tensor)
            prob = torch.sigmoid(output).item()
            preds.append(prob)

    avg_prob = np.mean(preds)
    pred_label = int(avg_prob > 0.5)

    print(f"\n Prediction: {'REAL' if pred_label else 'FAKE'}")
    print(f"Confidence Score: {avg_prob:.4f}")

# === Example usage ===
trained_model = "/content/best_istvt_model.pth"
TEST_VIDEO_PATH = "/content/01__meeting_serious.mp4"  # change this to your test video path

evaluate_single_video(trained_model, TEST_VIDEO_PATH, num_frames=6)
