<a href="https://colab.research.google.com/github/Ru1chi/deepfake_detection/blob/main/CNN(MobileNetV2_16)%2BLSTM%2BVision_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from transformers import ViTModel
from torch.cuda.amp import GradScaler, autocast

In [None]:
# Dataset class remains unchanged
class DeepfakeDataset(Dataset):
    def __init__(self, frame_dir, transform=None):
        self.frame_dir = frame_dir
        self.transform = transform
        self.file_names = sorted(os.listdir(frame_dir))

    def __len__(self):
        return len(self.file_names) - 1

    def __getitem__(self, idx):
        frame_name1 = self.file_names[idx]
        frame_name2 = self.file_names[idx + 1]

        frame_path1 = os.path.join(self.frame_dir, frame_name1)
        frame_path2 = os.path.join(self.frame_dir, frame_name2)

        image1 = cv2.imread(frame_path1)
        image2 = cv2.imread(frame_path2)

        flow = self.calculate_optical_flow(image1, image2)

        label = 1 if "FAKE" in frame_name1 else 0

        if self.transform:
            image1 = self.transform(image1)
            flow = self.transform(flow)

        return image1, flow, label

    def calculate_optical_flow(self, img1, img2):
        img1 = cv2.resize(img1, (640, 480))
        img2 = cv2.resize(img2, (640, 480))

        gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
        gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

        flow = cv2.calcOpticalFlowFarneback(gray1, gray2, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        magnitude = cv2.normalize(magnitude * 255 / np.max(magnitude), None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
        flow_rgb = cv2.merge([magnitude] * 3)

        return flow_rgb

In [None]:
# Define Transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
])

In [None]:
# Step 3: Load Data
frame_dir = "/content/drive/Shareddrives/Deepfake Detection/extracted/Training_frames"  # Directory containing your frames
dataset = DeepfakeDataset(frame_dir, transform=transform)

In [None]:
# Split dataset into training and testing sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

In [None]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
from torchvision import models

class DeepfakeModel(nn.Module):
    def __init__(self):
        super(DeepfakeModel, self).__init__()

        # Using MobileNetV2 for feature extraction from frames and optical flow
        self.mobilenet = models.mobilenet_v2(weights='DEFAULT')

        # Modify the final layer to match the output size for binary classification
        num_ftrs = self.mobilenet.classifier[1].in_features
        self.mobilenet.classifier[1] = nn.Identity()  # Remove the last layer

        # Using ViT for additional feature extraction (if needed)
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224-in21k")

        # Update LSTM input size based on MobileNet output
        self.lstm = nn.LSTM(input_size=num_ftrs * 2, hidden_size=256, num_layers=2, batch_first=True)

        # Final classification layer
        self.fc = nn.Linear(256, 1)

    def forward(self, x_frames, x_flows):
        batch_size = x_frames.size(0)

        mobilenet_outs_frames = []
        mobilenet_outs_flows = []

        # Process each frame and flow pair through MobileNetV2
        for t in range(x_frames.size(1)):
            mobilenet_out_frame = self.mobilenet(x_frames[:, t])
            mobilenet_out_flow = self.mobilenet(x_flows[:, t])

            mobilenet_outs_frames.append(mobilenet_out_frame.unsqueeze(1))
            mobilenet_outs_flows.append(mobilenet_out_flow.unsqueeze(1))

        # Concatenate outputs along the sequence dimension
        lstm_input = torch.cat((torch.cat(mobilenet_outs_frames, dim=1), torch.cat(mobilenet_outs_flows, dim=1)), dim=2)

        lstm_out,_=self.lstm(lstm_input)

        final_output=self.fc(lstm_out[:, -1])

        return final_output

In [None]:
# Define the device to be used (GPU if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Initialize the model and other components
model = DeepfakeModel().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

Downloading: "https://download.pytorch.org/models/mobilenet_v2-7ebf99e0.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-7ebf99e0.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 37.7MB/s]
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/502 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

In [None]:
# Initialize GradScaler for mixed precision training
scaler = GradScaler()

  scaler = GradScaler()


In [None]:

# Training Loop
for epoch in range(10):
    model.train()
    for images_frame, images_flow, labels in train_loader:
        images_frame = images_frame.to(device)
        images_flow = images_flow.to(device)
        labels = labels.float().to(device)

        optimizer.zero_grad()

        with autocast():
            outputs = model(images_frame.unsqueeze(1), images_flow.unsqueeze(1))
            loss = criterion(outputs.squeeze(), labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

    print(f'Epoch [{epoch+1}/{10}], Loss: {loss.item():.4f}')

  with autocast():


Epoch [1/10], Loss: 0.6335
Epoch [2/10], Loss: 0.3191
Epoch [3/10], Loss: 0.5432
Epoch [4/10], Loss: 0.2469
Epoch [5/10], Loss: 0.2918
Epoch [6/10], Loss: 0.2078
Epoch [7/10], Loss: 0.2161
Epoch [8/10], Loss: 0.3659
Epoch [9/10], Loss: 0.1210
Epoch [10/10], Loss: 0.0735


In [None]:
# Save the Model
model_path = "/content/drive/Shareddrives/Deepfake Detection/deepfake_detection_model_with_mobileNetV2.pth"
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

Model saved to /content/drive/Shareddrives/Deepfake Detection/deepfake_detection_model_with_mobileNetV2.pth


In [None]:
# Function to load the model remains unchanged
def load_model(model_path):
    model = DeepfakeModel()
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

In [None]:
loaded_model = load_model(model_path)
print('Model loaded successfully.')

  model.load_state_dict(torch.load(model_path))


Model loaded successfully.


In [None]:
# Evaluate the Model
loaded_model.eval()
loaded_model.to(device)

DeepfakeModel(
  (mobilenet): MobileNetV2(
    (features): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU6(inplace=True)
      )
      (1): InvertedResidual(
        (conv): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): ReLU6(inplace=True)
          )
          (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (2): InvertedResidual(
        (conv): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(16, 96, kernel_siz

In [None]:
with torch.no_grad():
    total_correct = 0
    total_samples = 0

    for images_frame, images_flow, labels in test_loader:
        images_frame = images_frame.to(device)
        images_flow = images_flow.to(device)

        labels = labels.float().to(device)

        outputs = loaded_model(images_frame.unsqueeze(1), images_flow.unsqueeze(1))

        predicted_labels = (torch.sigmoid(outputs.squeeze()) > 0.5).float()

        total_correct += (predicted_labels == labels).sum().item()
        total_samples += labels.size(0)

accuracy = total_correct / total_samples * 100
print(f'Accuracy on test set: {accuracy:.2f}%')

Accuracy on test set: 87.75%


In [None]:
pip install torch torchvision transformers opencv-python




In [None]:
import torch
import cv2
from torchvision import transforms

# Load the model
model_path = "/content/drive/Shareddrives/Deepfake Detection/deepfake_detection_model_with_mobileNetV2_16.pth"
model = DeepfakeModel()
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()

# Define transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
])

# Optical flow calculation function (simulated for single image)
def calculate_dummy_optical_flow(image):
    # Generate a dummy optical flow (zero flow for a single image)
    dummy_flow = np.zeros_like(image, dtype=np.uint8)
    return dummy_flow

# Inference function
def detect_deepfake_single_image(image_path):
    # Load and preprocess the image
    img = cv2.imread(image_path)

    # Simulate optical flow using a dummy flow
    flow = calculate_dummy_optical_flow(img)

    # Apply transformations
    img = transform(img)
    flow = transform(flow)

    # Add batch dimension
    img = img.unsqueeze(0)
    flow = flow.unsqueeze(0)

    # Model expects inputs in a sequence format
    img = img.unsqueeze(1)  # Add sequence dimension
    flow = flow.unsqueeze(1)

    # Run inference
    with torch.no_grad():
        output = model(img, flow)
        prob = torch.sigmoid(output).item()

    # Interpret result
    if prob > 0.5:
        print(f"The input image is FAKE with probability {prob:.2f}")
    else:
        print(f"The input image is REAL with probability {1 - prob:.2f}")

# Example usage
image_path = "/content/drive/Shareddrives/Deepfake Detection/extracted/Training_frames/FAKE_aagfhgtpmv_frame0.jpg"  # Update with actual path
detect_deepfake_single_image(image_path)


  model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))


The input image is FAKE with probability 0.87


In [None]:
import cv2
import torch
from torchvision import transforms

# Function to extract frames from a video
def extract_frames(video_path, frame_rate=1):
    """
    Extract frames from a video at the specified frame rate.
    :param video_path: Path to the video file.
    :param frame_rate: Extract 1 frame every 'frame_rate' seconds.
    :return: List of frames (images).
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    interval = int(fps * frame_rate)
    count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if count % interval == 0:
            frames.append(frame)
        count += 1

    cap.release()
    return frames

# Function to classify a single frame
def classify_frame(frame):
    # Simulate optical flow using a dummy flow
    flow = calculate_dummy_optical_flow(frame)

    # Apply transformations
    img = transform(frame)
    flow = transform(flow)

    # Add batch dimension
    img = img.unsqueeze(0)
    flow = flow.unsqueeze(0)

    # Model expects inputs in a sequence format
    img = img.unsqueeze(1)  # Add sequence dimension
    flow = flow.unsqueeze(1)

    # Run inference
    with torch.no_grad():
        output = model(img, flow)
        prob = torch.sigmoid(output).item()  # Sigmoid applied to logits to get probability

    return prob

# Function to classify an entire video
def detect_deepfake_in_video(video_path):
    """
    Detect deepfake frames in a video and classify the video overall.
    """
    # Extract frames from the video
    frames = extract_frames(video_path, frame_rate=1)

    # Run inference on each frame
    highest_prob = 0
    highest_label = None
    for frame in frames:
        prob = classify_frame(frame)
        label = "FAKE" if prob > 0.5 else "REAL"

        # Keep track of the highest probability and its label
        if prob > highest_prob:
            highest_prob = prob
            highest_label = label

    print(f"Video Classification (Highest Probability): {highest_label}")
    print(f"Probability: {highest_prob:.2f}")

    return highest_label, highest_prob

# Example usage
video_path = "/content/drive/Shareddrives/Deepfake Detection/extracted/Testing_videos/aassnaulhq.mp4"  # Update with actual path
detect_deepfake_in_video(video_path)


Video Classification (Highest Probability): FAKE
Probability: 0.81


('FAKE', 0.8072839975357056)