# Feature extraction for test set

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch.nn.functional as F

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data transformations (ResNet-style transformations)
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Custom dataset class
class CustomDatasetNew(Dataset):
    def __init__(self, root_dir, transform=None, limit=None):
        self.root_dir = root_dir
        self.transform = transform

        self.image_files = []
        self.labels = []
        for label_folder in tqdm(['0_real', '1_fake'], desc="Loading dataset"):
            full_path = os.path.join(root_dir, label_folder)
            for idx, file_name in enumerate(os.listdir(full_path)):
                if limit and idx >= limit:
                    break  # Limit the number of files loaded
                if file_name.endswith(('.jpg', '.png', '.jpeg','.JPG','.JPEG')):  # Ensure image files
                    self.image_files.append(os.path.join(full_path, file_name))
                    if 'real' in label_folder:
                        self.labels.append(0)  # Label 0 for real images
                    else:
                        self.labels.append(1)  # Label 1 for fake images

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = self.image_files[idx]  # This is a string path
        image = Image.open(img_path).convert("RGB")  # Ensure image is 3 channels
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, img_path  # Ensure that the path returned is a string (not a tensor)


# Load ResNet model and capture features
def load_saved_resnet_model(model_path):
    model = torchvision.models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False  # Freeze all layers

    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, 2)  # Binary classification (real/fake)

    model.load_state_dict(torch.load(model_path))  # Load the saved model
    model = model.to(device)

    # Hook functions to capture low, mid, and high-level features
    model.layer1[0].register_forward_hook(lambda m, i, o: hook_fn(m, i, o, low_level_features))
    model.layer3[0].register_forward_hook(lambda m, i, o: hook_fn(m, i, o, mid_level_features))
    model.layer4[0].register_forward_hook(lambda m, i, o: hook_fn(m, i, o, high_level_features))

    model.eval()
    return model

# Hook functions to capture ResNet features
low_level_features, mid_level_features, high_level_features = [], [], []

def hook_fn(module, input, output, storage_list):
    storage_list.append(output.clone().detach())

# Define linear layers to convert ResNet features to 768 dimensions
# Define linear layers to convert ResNet features to 768 dimensions
low_to_768 = nn.Linear(256, 768).to(device)   # For low-level features
mid_to_768 = nn.Linear(1024, 768).to(device)  # For mid-level features
high_to_768 = nn.Linear(2048, 768).to(device) # For high-level features

def extract_resnet_features(model, image):
    low_level_features.clear()
    mid_level_features.clear()
    high_level_features.clear()

    with torch.no_grad():
        image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device
        model(image)

    # Pool ResNet features and map to 768 dimensions
    low_pooled = F.adaptive_avg_pool2d(low_level_features[-1].to(device), (1, 1)).squeeze()
    mid_pooled = F.adaptive_avg_pool2d(mid_level_features[-1].to(device), (1, 1)).squeeze()
    high_pooled = F.adaptive_avg_pool2d(high_level_features[-1].to(device), (1, 1)).squeeze()

    low_768 = low_to_768(low_pooled)   # Shape [1, 768]
    mid_768 = mid_to_768(mid_pooled)   # Shape [1, 768]
    high_768 = high_to_768(high_pooled) # Shape [1, 768]

    return low_768, mid_768, high_768


# Function to preprocess the image using ViT's transforms
def pipeline_preprocessor():
    vit_weights = torchvision.models.ViT_B_16_Weights.DEFAULT
    return vit_weights.transforms()

# Function to extract ViT embeddings
def get_vit_embedding(vit_model, image_path):
    preprocessing = pipeline_preprocessor()  # Preprocessing from ViT
    img = Image.open(image_path).convert("RGB")  # Ensure we load image by path (string)
    img = preprocessing(img).unsqueeze(0).to(device)  # Add batch dimension

    with torch.no_grad():
        feats = vit_model._process_input(img)
        batch_class_token = vit_model.class_token.expand(img.shape[0], -1, -1)
        feats = torch.cat([batch_class_token, feats], dim=1)
        feats = vit_model.encoder(feats)
        vit_hidden = feats[:, 0]  # CLS token
    return vit_hidden

# Load ViT model
def load_vit_model(pretrained_weights_path):
    vit_model = torchvision.models.vit_b_16(pretrained=False).to(device)
    pretrained_vit_weights = torch.load(pretrained_weights_path, map_location=device)
    vit_model.load_state_dict(pretrained_vit_weights, strict=False)
    vit_model.eval()  # Set to evaluation mode
    return vit_model

# Add a sequence dimension (if missing) before applying attention
def ensure_correct_shape(tensor):
    if len(tensor.shape) == 2:  # If shape is [batch_size, embedding_dim]
        tensor = tensor.unsqueeze(1)  # Add a sequence dimension: [batch_size, 1, embedding_dim]
    elif len(tensor.shape) == 1:  # If shape is [embedding_dim]
        tensor = tensor.unsqueeze(0).unsqueeze(1)  # Add batch and sequence dimensions: [1, 1, embedding_dim]
    return tensor


# Scaled dot product attention function
def scaled_dot_product_attention(Q, K, V):
    # Ensure Q, K, and V have the correct shapes
    Q = ensure_correct_shape(Q)  # Should be [batch_size, 1, embedding_dim]
    K = ensure_correct_shape(K)  # Should be [batch_size, 1, embedding_dim]
    V = ensure_correct_shape(V)  # Should be [batch_size, 1, embedding_dim]

#     print(f"Q shape after unsqueeze: {Q.shape}, K shape after unsqueeze: {K.shape}, V shape after unsqueeze: {V.shape}")  # Debugging
    d_k = Q.size(-1)
    scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32).to(Q.device))
    attn_weights = F.softmax(scores, dim=-1)
    output = torch.matmul(attn_weights, V)
    return output

# Save features for each dataset (train/val/test)
import csv

# Save features for each dataset (train/val/test) as CSV
def save_features_to_csv(model, vit_model, data_loader, save_path):
    os.makedirs(os.path.dirname(save_path), exist_ok=True)

    with open(save_path, mode="w", newline="") as file:
        writer = csv.writer(file)
        # Write the CSV header
        writer.writerow(["image_name", "features", "label"])

        for images, img_paths in tqdm(data_loader, desc="Extracting features"):
            for i in range(len(images)):
                image = images[i].to(device)  # Move image to the correct device
                img_path = img_paths[i]  # Image path

                # Ensure img_path is a string
                if isinstance(img_path, torch.Tensor):
                    img_path = img_path.item() if img_path.dim() == 0 else str(img_path)

                # Extract ResNet features
                try:
                    low_768, mid_768, high_768 = extract_resnet_features(model, image)
                except Exception as e:
                    print(f"Error extracting ResNet features for {img_path}: {e}")
                    continue

                # Extract ViT features
                try:
                    vit_hidden = get_vit_embedding(vit_model, img_path)  # img_path should be a string
                except Exception as e:
                    print(f"Error extracting ViT features for {img_path}: {e}")
                    continue

                # Apply attention between ResNet and ViT features
                try:
                    output_1 = scaled_dot_product_attention(vit_hidden, low_768, low_768)
                    output_2 = scaled_dot_product_attention(output_1, mid_768, mid_768)
                    final_output = scaled_dot_product_attention(output_2, high_768, high_768)
                except Exception as e:
                    print(f"Error applying attention for {img_path}: {e}")
                    continue

                # Convert features to a flattened list
                features = final_output.detach().cpu().numpy().flatten().tolist()


                # Extract label from the image path
                label = 0 if "real" in img_path else 1

                # Write the row to the CSV
                writer.writerow([os.path.basename(img_path), features, label])

    print(f"Features saved to {save_path}")


# Load models
resnet_model = load_saved_resnet_model('/content/drive/MyDrive/Final_folder_code_thesis/Original_model/pretrained_resnet_state_dict.pth')
vit_model = load_vit_model('/content/drive/MyDrive/Final_folder_code_thesis/Original_model/pretrained_vit_state_dict.pth')

#train_dir = "/content/WildRF/train"
#val_dir = "/content/WildRF/val"
test_dir="/content/drive/MyDrive/Final_folder_code_thesis/DeepWild_Final/test"

#train_dataset = CustomDatasetNew(root_dir=train_dir, transform=data_transforms)
#val_dataset = CustomDatasetNew(root_dir=val_dir, transform=data_transforms)
#test_dataset = CustomDatasetNew(root_dir=test_dir, transform=data_transforms)
for test_subdir in ['twitter', 'facebook', 'reddit']:
    test_dataset = CustomDatasetNew(root_dir=os.path.join(test_dir, test_subdir), transform=data_transforms)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
    print(f"Processing Test Dataset: {test_subdir}")

    # Save features to a separate CSV file for each subdirectory
    save_path = f"/content/drive/MyDrive/Final_folder_code_thesis/New_test_set_test_features_{test_subdir}.csv"
    save_features_to_csv(resnet_model, vit_model, test_loader, save_path=save_path)


#train_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
#val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)
#test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

#print("Processing Train Dataset:")
#save_features_to_csv(resnet_model, vit_model, train_loader, save_path="features_WildRF/train_features.csv")

#print("Processing Validation Dataset:")
#save_features_to_csv(resnet_model, vit_model, val_loader, save_path="features_WildRF/val_features.csv")

#print("Processing Test Dataset:")
#save_features_to_csv(resnet_model, vit_model, test_loader, save_path="features_WildRF/test_features.csv")

#Importing libaries


In [None]:
!pip uninstall -y protobuf

In [None]:
!pip install protobuf==5.26.1

In [None]:
import google.protobuf
print(google.protobuf.__version__)


In [None]:
!pip install mediapipe==0.10.21

In [None]:
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
!pip install mediapipe==0.10.21
!pip install pretrainedmodels
!pip install ultralytics

#Twitter test data


In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from PIL import Image
import cv2
from ultralytics import YOLO  # For YOLOv8 face detection
from tqdm import tqdm
import pretrainedmodels  # For Xception model
import numpy as np
import mediapipe as mp  # For facial landmark extraction
from torch.cuda.amp import autocast, GradScaler

In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Paths
test_csv = "/content/drive/MyDrive/Final_folder_code_thesis/New_test_set_test_features_twitter.csv"
test_folder = "/content/drive/MyDrive/Final_folder_code_thesis/DeepWild_Final/test/twitter"

# Transforms for images
transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class DeepfakeClassifier(torch.nn.Module):
    def __init__(self):
        super(DeepfakeClassifier, self).__init__()
        self.xception = xception_model  # Outputs 128 features
        self.sobel_cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.sobel_linear = None  # Will initialize dynamically
        self.fc_landmarks = nn.Linear(936, 128).to(device)  # 936 = flattened landmarks
        self.fc_yolo = nn.Linear(80, 64).to(device)  # Adjust YOLO features to 64
        self.fc1 = None  # To be initialized dynamically
        self.fc2 = nn.Linear(128, 2).to(device)

    def initialize_sobel_linear(self, input_shape):
        with torch.no_grad():
            # Initialize Sobel Linear
            sample_input = torch.zeros(1, *input_shape).to(device)
            output = self.sobel_cnn(sample_input)
            flattened_size = output.view(-1).size(0)
            self.sobel_linear = nn.Linear(flattened_size, 128).to(device)

            # Calculate the total feature size for fc1
            total_feature_size = 128 + 128 + 128 + 64  # xception + sobel + landmarks + YOLO
            self.fc1 = nn.Linear(total_feature_size, 128).to(device)

    def forward(self, image, sobel_image, yolo_features, face_landmarks):
        # Process features
        yolo_features = yolo_features.float()  # Fix for dtype mismatch
        image_features = self.xception(image)  # Output: [batch_size, 128]
        sobel_features = self.sobel_cnn(sobel_image)  # Output: [batch_size, C, H, W]
        sobel_features = self.sobel_linear(sobel_features.view(sobel_features.size(0), -1))
        yolo_features = torch.relu(self.fc_yolo(yolo_features))
        landmark_features = torch.relu(self.fc_landmarks(face_landmarks))

        # Combine features
        combined = torch.cat((image_features, sobel_features, yolo_features, landmark_features), dim=1)

        # Fully connected layers
        x = torch.relu(self.fc1(combined))
        x = self.fc2(x)
        return x


# Module 2 definition (DNN)
class DNN(nn.Module):
    def __init__(self, input_dim, hidden_dim_1, hidden_dim_2, output_dim, dropout_prob=0.2):
        super(DNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim_1)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(p=dropout_prob)
        self.fc2 = nn.Linear(hidden_dim_1, hidden_dim_2)
        self.dropout2 = nn.Dropout(p=dropout_prob)
        self.fc3 = nn.Linear(hidden_dim_2, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x

# Function to process images
def process_images(folder_path, transform, limit=None):
    images = []
    filenames = []
    labels = []

    for label_dir in ["0_real", "1_fake"]:
        label_path = os.path.join(folder_path, label_dir)
        label = 0 if label_dir == "0_real" else 1

        for i, fname in enumerate(tqdm(os.listdir(label_path), desc=f"Processing {label_dir}")):
            if limit and len(images) >= limit:
                break
            img_path = os.path.join(label_path, fname)
            image = cv2.imread(img_path)
            if image is not None:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = transform(Image.fromarray(image))
                images.append(image)
                filenames.append(fname)
                labels.append(label)

    # Convert to tensors
    X = torch.stack(images)
    y = torch.tensor(labels, dtype=torch.long)
    return X, filenames, y


# Process CSV features for Module 2
def process_csv(path, limit=None):
    df = pd.read_csv(path)
    features = df['features'].apply(lambda x: list(map(float, x.strip('[]').split(','))))
    filenames = df['image_name'].tolist()  # Ensure CSV has 'image_name' column
    X = torch.tensor(features.tolist(), dtype=torch.float32)
    y = torch.tensor(df['label'].values, dtype=torch.long)

    if limit:
        X = X[:limit]
        filenames = filenames[:limit]
        y = y[:limit]

    return X, filenames, y


# Set up Mediapipe for facial landmarks extraction
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)

#import xception
import torch
import pretrainedmodels
import os

# Define the path where the model is saved
drive_path = "/content/drive/MyDrive/Final_folder_code_thesis/REALxception_model.pth"

# Ensure you are using the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the Xception model without pretrained weights
xception_model = pretrainedmodels.__dict__["xception"](pretrained=None).to(device)

# Modify the last linear layer to match the saved model
xception_model.last_linear = torch.nn.Linear(xception_model.last_linear.in_features, 128).to(device)

# Load the state dictionary
if os.path.exists(drive_path):
    print("Loading Xception model from Google Drive...")
    xception_model.load_state_dict(torch.load(drive_path, map_location=device))
else:
    raise FileNotFoundError(f"Model file not found at {drive_path}")





# Now the model is fully loaded with the correct architecture and weights


# Load YOLOv8 model
yolo_model = YOLO("yolov8n.pt").to(device)  # Ensure YOLO runs on GPU if available)  # Choose the YOLOv8 model variant based on resources
# Define COCO classes we are interested in (people, vehicles, animals, household items, etc.)
COCO_CLASSES = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
    "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
    "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone",
    "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear",
    "hair drier", "toothbrush"
]

def generate_sobel_edges(image, transform):
    """
    Generates Sobel edges for a given image.
    """
    gray_image = cv2.cvtColor(image.permute(1, 2, 0).cpu().numpy(), cv2.COLOR_RGB2GRAY)
    sobel_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3)
    sobel_combined = cv2.magnitude(sobel_x, sobel_y)
    sobel_combined = cv2.convertScaleAbs(sobel_combined)
    sobel_combined = cv2.merge([sobel_combined, sobel_combined, sobel_combined])
    return transform(Image.fromarray(sobel_combined))

def extract_yolo_features_and_landmarks(image):
    """
    Extracts YOLO object detection features and face landmarks from the given image.
    """
    results = yolo_model(image.permute(1, 2, 0).cpu().numpy())
    detected_objects = []
    landmarks = np.zeros((936,), dtype=np.float32)

    for result in results[0].boxes:
        class_id = int(result.cls[0])
        class_name = yolo_model.names[class_id]

        if class_name == "person":
            crop = image.permute(1, 2, 0).cpu().numpy()[
                int(result.xyxy[0][1]):int(result.xyxy[0][3]),
                int(result.xyxy[0][0]):int(result.xyxy[0][2]),
            ]
            crop = (crop * 255).astype(np.uint8) if crop.max() <= 1.0 else crop.astype(np.uint8)
            face_result = face_mesh.process(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
            if face_result.multi_face_landmarks:
                landmarks = np.array(
                    [[p.x, p.y] for p in face_result.multi_face_landmarks[0].landmark]
                ).flatten()

        detected_objects.append(class_id)

    yolo_features = torch.tensor([1 if i in detected_objects else 0 for i in range(len(COCO_CLASSES))])
    return yolo_features, torch.tensor(landmarks)

def prepare_ensemble_data(csv_path, folder_path, transform, module1, module2, batch_size=4):
    """
    Prepares the ensemble dataset by processing data in batches.
    """
    images, img_filenames, labels1 = process_images(folder_path, transform)
    csv_features, csv_filenames, labels2 = process_csv(csv_path)

    img_base_names = [os.path.splitext(fname)[0] for fname in img_filenames]
    csv_base_names = [os.path.splitext(fname)[0] for fname in csv_filenames]

    mapping = {f"{base}_{labels1[i].item()}": (i, None) for i, base in enumerate(img_base_names)}
    for i, base in enumerate(csv_base_names):
        key = f"{base}_{labels2[i].item()}"
        if key in mapping:
            mapping[key] = (mapping[key][0], i)

    img_indices, csv_indices = [], []
    for key, (img_idx, csv_idx) in mapping.items():
        if csv_idx is not None:
            img_indices.append(img_idx)
            csv_indices.append(csv_idx)

    images = images[img_indices]
    labels1 = labels1[img_indices]
    csv_features = csv_features[csv_indices]

    combined_outputs_module1, combined_outputs_module2 = [], []
    for i in range(0, len(images), batch_size):
        batch_images = images[i:i + batch_size].to(device)
        sobel_images, yolo_features, face_landmarks = [], [], []

        for img in batch_images:
            sobel_images.append(generate_sobel_edges(img, transform))
            yolo, landmarks = extract_yolo_features_and_landmarks(img)
            yolo_features.append(yolo)
            face_landmarks.append(landmarks)

        sobel_images = torch.stack(sobel_images).to(device)
        yolo_features = torch.stack(yolo_features).to(device)
        face_landmarks = torch.stack(face_landmarks).to(device)

        # Get outputs from module1
        module1.eval()
        with torch.no_grad():
            module1_output = module1(batch_images, sobel_images, yolo_features, face_landmarks)
            combined_outputs_module1.append(module1_output.cpu())

        # Get outputs from module2
        module2.eval()
        with torch.no_grad():
            module2_output = module2(csv_features[i:i + batch_size].to(device))
            combined_outputs_module2.append(module2_output.cpu())

    module1_outputs = torch.cat(combined_outputs_module1, dim=0)
    module2_outputs = torch.cat(combined_outputs_module2, dim=0)

    # Ensure labels match the outputs
    min_size = min(module1_outputs.size(0), labels1.size(0), module2_outputs.size(0))
    module1_outputs = module1_outputs[:min_size]
    module2_outputs = module2_outputs[:min_size]
    labels1 = labels1[:min_size]

    # Return the dataset
    return TensorDataset(module1_outputs, module2_outputs, labels1)
# Initialize models
module1 = DeepfakeClassifier().to(device)
module1.initialize_sobel_linear(input_shape=(3, 299, 299))
module1.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/bedst_model_module2.pth"))

module2 = DNN(input_dim=768, hidden_dim_1=128, hidden_dim_2=256, output_dim=2).to(device)
module2.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/final_model1_weights.pth"))

# Freeze weights of module1 and module2
for param in module1.parameters():
    param.requires_grad = False

for param in module2.parameters():
    param.requires_grad = False

test_dataset = prepare_ensemble_data(test_csv, test_folder, transform, module1, module2)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

# Ensemble model
class EnsembleModel(nn.Module):
    def __init__(self, module1_dim, module2_dim, output_dim):
        super(EnsembleModel, self).__init__()
        self.fc1 = nn.Linear(module1_dim + module2_dim, output_dim)  # Combine module1 and module2 outputs

    def forward(self, x1_logits, x2_logits):
        # Apply softmax to logits for probabilities
        x1_probs = torch.softmax(x1_logits, dim=1)
        x2_probs = torch.softmax(x2_logits, dim=1)
        # Concatenate probabilities
        combined_probs = torch.cat((x1_probs, x2_probs), dim=1)
        # Pass through the fully connected layer
        output = self.fc1(combined_probs)
        return output

In [None]:
import torch
from torch import nn

# Define the accuracy function
def compute_accuracy(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # No need to track gradients for evaluation
        for module1_out, module2_out, labels in dataloader:
            module1_out, module2_out, labels = module1_out.to(device), module2_out.to(device), labels.to(device)

            outputs = model(module1_out, module2_out)
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Assuming EnsembleModel is defined and loaded correctly
ensemble_model = EnsembleModel(module1_dim=2, module2_dim=2, output_dim=2).to(device)

# Load the trained weights from the saved model
ensemble_model.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/ensemble_model_weights.pth"))

# Set the model to evaluation mode
ensemble_model.eval()

# Now compute the accuracy on the test set
test_accuracy = compute_accuracy(ensemble_model, test_loader, device)

# Print the test accuracy
print(f"Test Accuracy twitter: {test_accuracy:.2f}%")


Test Accuracy twitter: 55.39%


### ROC and AUC


In [None]:
from sklearn.metrics import (
    roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay,
    precision_score, recall_score, f1_score, accuracy_score
)
import matplotlib.pyplot as plt

# Store predictions and labels
all_preds = []
all_labels = []
all_probs = []

ensemble_model.eval()
with torch.no_grad():
    for module1_out, module2_out, labels in test_loader:
        module1_out, module2_out = module1_out.to(device), module2_out.to(device)
        outputs = ensemble_model(module1_out, module2_out)

        probs = torch.softmax(outputs, dim=1)[:, 1]  # Probability for class 1 (fake)
        _, preds = torch.max(outputs, 1)

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

# === ROC & AUC ===
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='magenta', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Deepfake Detection (Twitter Test Set)')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()

# === Confusion Matrix ===
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Real", "Fake"])
plt.figure(figsize=(6, 6))
disp.plot(cmap="RdPu", values_format="d")
plt.title("Confusion Matrix for Deepfake Detection (Twitter Test Set)")
plt.grid(False)
plt.show()

# === Metrics ===
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, pos_label=1)
recall = recall_score(all_labels, all_preds, pos_label=1)
f1 = f1_score(all_labels, all_preds, pos_label=1)

print(f"✅ Evaluation Results for Twitter Test Set")
print(f"Accuracy       : {accuracy:.4f}")
print(f"AUC Score      : {roc_auc:.4f}")
print(f"Precision (Fake): {precision:.4f}")
print(f"Recall (Fake)  : {recall:.4f}")
print(f"F1 Score (Fake): {f1:.4f}")



# Reddit test data

In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from PIL import Image
import cv2
from ultralytics import YOLO  # For YOLOv8 face detection
from tqdm import tqdm
import pretrainedmodels  # For Xception model
import numpy as np
import mediapipe as mp  # For facial landmark extraction
from torch.cuda.amp import autocast, GradScaler

In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Paths
test_csv = "/content/drive/MyDrive/Final_folder_code_thesis/New_test_set_test_features_reddit.csv"
test_folder = "/content/drive/MyDrive/Final_folder_code_thesis/DeepWild_Final/test/reddit"

# Transforms for images
transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class DeepfakeClassifier(torch.nn.Module):
    def __init__(self):
        super(DeepfakeClassifier, self).__init__()
        self.xception = xception_model  # Outputs 128 features
        self.sobel_cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.sobel_linear = None  # Will initialize dynamically
        self.fc_landmarks = nn.Linear(936, 128).to(device)  # 936 = flattened landmarks
        self.fc_yolo = nn.Linear(80, 64).to(device)  # Adjust YOLO features to 64
        self.fc1 = None  # To be initialized dynamically
        self.fc2 = nn.Linear(128, 2).to(device)

    def initialize_sobel_linear(self, input_shape):
        with torch.no_grad():
            # Initialize Sobel Linear
            sample_input = torch.zeros(1, *input_shape).to(device)
            output = self.sobel_cnn(sample_input)
            flattened_size = output.view(-1).size(0)
            self.sobel_linear = nn.Linear(flattened_size, 128).to(device)

            # Calculate the total feature size for fc1
            total_feature_size = 128 + 128 + 128 + 64  # xception + sobel + landmarks + YOLO
            self.fc1 = nn.Linear(total_feature_size, 128).to(device)

    def forward(self, image, sobel_image, yolo_features, face_landmarks):
        # Process features
        yolo_features = yolo_features.float()  # Fix for dtype mismatch
        image_features = self.xception(image)  # Output: [batch_size, 128]
        sobel_features = self.sobel_cnn(sobel_image)  # Output: [batch_size, C, H, W]
        sobel_features = self.sobel_linear(sobel_features.view(sobel_features.size(0), -1))
        yolo_features = torch.relu(self.fc_yolo(yolo_features))
        landmark_features = torch.relu(self.fc_landmarks(face_landmarks))

        # Combine features
        combined = torch.cat((image_features, sobel_features, yolo_features, landmark_features), dim=1)

        # Fully connected layers
        x = torch.relu(self.fc1(combined))
        x = self.fc2(x)
        return x


# Module 2 definition (DNN)
class DNN(nn.Module):
    def __init__(self, input_dim, hidden_dim_1, hidden_dim_2, output_dim, dropout_prob=0.2):
        super(DNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim_1)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(p=dropout_prob)
        self.fc2 = nn.Linear(hidden_dim_1, hidden_dim_2)
        self.dropout2 = nn.Dropout(p=dropout_prob)
        self.fc3 = nn.Linear(hidden_dim_2, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x

# Function to process images
def process_images(folder_path, transform, limit=None):
    images = []
    filenames = []
    labels = []

    for label_dir in ["0_real", "1_fake"]:
        label_path = os.path.join(folder_path, label_dir)
        label = 0 if label_dir == "0_real" else 1

        for i, fname in enumerate(tqdm(os.listdir(label_path), desc=f"Processing {label_dir}")):
            if limit and len(images) >= limit:
                break
            img_path = os.path.join(label_path, fname)
            image = cv2.imread(img_path)
            if image is not None:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = transform(Image.fromarray(image))
                images.append(image)
                filenames.append(fname)
                labels.append(label)

    # Convert to tensors
    X = torch.stack(images)
    y = torch.tensor(labels, dtype=torch.long)
    return X, filenames, y


# Process CSV features for Module 2
def process_csv(path, limit=None):
    df = pd.read_csv(path)
    features = df['features'].apply(lambda x: list(map(float, x.strip('[]').split(','))))
    filenames = df['image_name'].tolist()  # Ensure CSV has 'image_name' column
    X = torch.tensor(features.tolist(), dtype=torch.float32)
    y = torch.tensor(df['label'].values, dtype=torch.long)

    if limit:
        X = X[:limit]
        filenames = filenames[:limit]
        y = y[:limit]

    return X, filenames, y


# Set up Mediapipe for facial landmarks extraction
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)

#import xception
import torch
import pretrainedmodels
import os

# Define the path where the model is saved
drive_path = "/content/drive/MyDrive/Final_folder_code_thesis/REALxception_model.pth"

# Ensure you are using the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the Xception model without pretrained weights
xception_model = pretrainedmodels.__dict__["xception"](pretrained=None).to(device)

# Modify the last linear layer to match the saved model
xception_model.last_linear = torch.nn.Linear(xception_model.last_linear.in_features, 128).to(device)

# Load the state dictionary
if os.path.exists(drive_path):
    print("Loading Xception model from Google Drive...")
    xception_model.load_state_dict(torch.load(drive_path, map_location=device))
else:
    raise FileNotFoundError(f"Model file not found at {drive_path}")





# Now the model is fully loaded with the correct architecture and weights


# Load YOLOv8 model
yolo_model = YOLO("yolov8n.pt").to(device)  # Ensure YOLO runs on GPU if available)  # Choose the YOLOv8 model variant based on resources
# Define COCO classes we are interested in (people, vehicles, animals, household items, etc.)
COCO_CLASSES = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
    "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
    "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone",
    "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear",
    "hair drier", "toothbrush"
]

def generate_sobel_edges(image, transform):
    """
    Generates Sobel edges for a given image.
    """
    gray_image = cv2.cvtColor(image.permute(1, 2, 0).cpu().numpy(), cv2.COLOR_RGB2GRAY)
    sobel_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3)
    sobel_combined = cv2.magnitude(sobel_x, sobel_y)
    sobel_combined = cv2.convertScaleAbs(sobel_combined)
    sobel_combined = cv2.merge([sobel_combined, sobel_combined, sobel_combined])
    return transform(Image.fromarray(sobel_combined))

def extract_yolo_features_and_landmarks(image):
    """
    Extracts YOLO object detection features and face landmarks from the given image.
    """
    results = yolo_model(image.permute(1, 2, 0).cpu().numpy())
    detected_objects = []
    landmarks = np.zeros((936,), dtype=np.float32)

    for result in results[0].boxes:
        class_id = int(result.cls[0])
        class_name = yolo_model.names[class_id]

        if class_name == "person":
            crop = image.permute(1, 2, 0).cpu().numpy()[
                int(result.xyxy[0][1]):int(result.xyxy[0][3]),
                int(result.xyxy[0][0]):int(result.xyxy[0][2]),
            ]
            crop = (crop * 255).astype(np.uint8) if crop.max() <= 1.0 else crop.astype(np.uint8)
            face_result = face_mesh.process(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
            if face_result.multi_face_landmarks:
                landmarks = np.array(
                    [[p.x, p.y] for p in face_result.multi_face_landmarks[0].landmark]
                ).flatten()

        detected_objects.append(class_id)

    yolo_features = torch.tensor([1 if i in detected_objects else 0 for i in range(len(COCO_CLASSES))])
    return yolo_features, torch.tensor(landmarks)

def prepare_ensemble_data(csv_path, folder_path, transform, module1, module2, batch_size=4):
    """
    Prepares the ensemble dataset by processing data in batches.
    """
    images, img_filenames, labels1 = process_images(folder_path, transform)
    csv_features, csv_filenames, labels2 = process_csv(csv_path)

    img_base_names = [os.path.splitext(fname)[0] for fname in img_filenames]
    csv_base_names = [os.path.splitext(fname)[0] for fname in csv_filenames]

    mapping = {f"{base}_{labels1[i].item()}": (i, None) for i, base in enumerate(img_base_names)}
    for i, base in enumerate(csv_base_names):
        key = f"{base}_{labels2[i].item()}"
        if key in mapping:
            mapping[key] = (mapping[key][0], i)

    img_indices, csv_indices = [], []
    for key, (img_idx, csv_idx) in mapping.items():
        if csv_idx is not None:
            img_indices.append(img_idx)
            csv_indices.append(csv_idx)

    images = images[img_indices]
    labels1 = labels1[img_indices]
    csv_features = csv_features[csv_indices]

    combined_outputs_module1, combined_outputs_module2 = [], []
    for i in range(0, len(images), batch_size):
        batch_images = images[i:i + batch_size].to(device)
        sobel_images, yolo_features, face_landmarks = [], [], []

        for img in batch_images:
            sobel_images.append(generate_sobel_edges(img, transform))
            yolo, landmarks = extract_yolo_features_and_landmarks(img)
            yolo_features.append(yolo)
            face_landmarks.append(landmarks)

        sobel_images = torch.stack(sobel_images).to(device)
        yolo_features = torch.stack(yolo_features).to(device)
        face_landmarks = torch.stack(face_landmarks).to(device)

        # Get outputs from module1
        module1.eval()
        with torch.no_grad():
            module1_output = module1(batch_images, sobel_images, yolo_features, face_landmarks)
            combined_outputs_module1.append(module1_output.cpu())

        # Get outputs from module2
        module2.eval()
        with torch.no_grad():
            module2_output = module2(csv_features[i:i + batch_size].to(device))
            combined_outputs_module2.append(module2_output.cpu())

    module1_outputs = torch.cat(combined_outputs_module1, dim=0)
    module2_outputs = torch.cat(combined_outputs_module2, dim=0)

    # Ensure labels match the outputs
    min_size = min(module1_outputs.size(0), labels1.size(0), module2_outputs.size(0))
    module1_outputs = module1_outputs[:min_size]
    module2_outputs = module2_outputs[:min_size]
    labels1 = labels1[:min_size]

    # Return the dataset
    return TensorDataset(module1_outputs, module2_outputs, labels1)
# Initialize models
module1 = DeepfakeClassifier().to(device)
module1.initialize_sobel_linear(input_shape=(3, 299, 299))
module1.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/bedst_model_module2.pth"))

module2 = DNN(input_dim=768, hidden_dim_1=128, hidden_dim_2=256, output_dim=2).to(device)
module2.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/final_model1_weights.pth"))

# Freeze weights of module1 and module2
for param in module1.parameters():
    param.requires_grad = False

for param in module2.parameters():
    param.requires_grad = False

test_dataset = prepare_ensemble_data(test_csv, test_folder, transform, module1, module2)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

# Ensemble model
class EnsembleModel(nn.Module):
    def __init__(self, module1_dim, module2_dim, output_dim):
        super(EnsembleModel, self).__init__()
        self.fc1 = nn.Linear(module1_dim + module2_dim, output_dim)  # Combine module1 and module2 outputs

    def forward(self, x1_logits, x2_logits):
        # Apply softmax to logits for probabilities
        x1_probs = torch.softmax(x1_logits, dim=1)
        x2_probs = torch.softmax(x2_logits, dim=1)
        # Concatenate probabilities
        combined_probs = torch.cat((x1_probs, x2_probs), dim=1)
        # Pass through the fully connected layer
        output = self.fc1(combined_probs)
        return output

In [None]:
import torch
from torch import nn

# Define the accuracy function
def compute_accuracy(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # No need to track gradients for evaluation
        for module1_out, module2_out, labels in dataloader:
            module1_out, module2_out, labels = module1_out.to(device), module2_out.to(device), labels.to(device)

            outputs = model(module1_out, module2_out)
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Assuming EnsembleModel is defined and loaded correctly
ensemble_model = EnsembleModel(module1_dim=2, module2_dim=2, output_dim=2).to(device)

# Load the trained weights from the saved model
ensemble_model.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/ensemble_model_weights.pth"))

# Set the model to evaluation mode
ensemble_model.eval()

# Now compute the accuracy on the test set
test_accuracy = compute_accuracy(ensemble_model, test_loader, device)

# Print the test accuracy
print(f"Test Accuracy reddit: {test_accuracy:.2f}%")


Test Accuracy reddit: 55.99%


### ROC, AUC and other performance measures


In [None]:
from sklearn.metrics import (
    roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay,
    precision_score, recall_score, f1_score, accuracy_score
)
import matplotlib.pyplot as plt

# Store predictions and labels
all_preds = []
all_labels = []
all_probs = []

ensemble_model.eval()
with torch.no_grad():
    for module1_out, module2_out, labels in test_loader:
        module1_out, module2_out = module1_out.to(device), module2_out.to(device)
        outputs = ensemble_model(module1_out, module2_out)

        probs = torch.softmax(outputs, dim=1)[:, 1]  # Probability for class 1 (fake)
        _, preds = torch.max(outputs, 1)

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

# === ROC & AUC ===
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='magenta', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Deepfake Detection (Reddit Test Set)')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()

# === Confusion Matrix ===
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Real", "Fake"])
plt.figure(figsize=(6, 6))
disp.plot(cmap="RdPu", values_format="d")
plt.title("Confusion Matrix for Deepfake Detection (Reddit Test Set)")
plt.grid(False)
plt.show()

# === Metrics ===
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, pos_label=1)
recall = recall_score(all_labels, all_preds, pos_label=1)
f1 = f1_score(all_labels, all_preds, pos_label=1)

print(f"✅ Evaluation Results for Reddit Test Set")
print(f"Accuracy       : {accuracy:.4f}")
print(f"AUC Score      : {roc_auc:.4f}")
print(f"Precision (Fake): {precision:.4f}")
print(f"Recall (Fake)  : {recall:.4f}")
print(f"F1 Score (Fake): {f1:.4f}")



# Facebook test data

In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from PIL import Image
import cv2
from ultralytics import YOLO  # For YOLOv8 face detection
from tqdm import tqdm
import pretrainedmodels  # For Xception model
import numpy as np
import mediapipe as mp  # For facial landmark extraction
from torch.cuda.amp import autocast, GradScaler

In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Paths
test_csv = "/content/drive/MyDrive/Final_folder_code_thesis/New_test_set_test_features_facebook.csv"
test_folder = "/content/drive/MyDrive/Final_folder_code_thesis/DeepWild_Final/test/facebook"

# Transforms for images
transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class DeepfakeClassifier(torch.nn.Module):
    def __init__(self):
        super(DeepfakeClassifier, self).__init__()
        self.xception = xception_model  # Outputs 128 features
        self.sobel_cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.sobel_linear = None  # Will initialize dynamically
        self.fc_landmarks = nn.Linear(936, 128).to(device)  # 936 = flattened landmarks
        self.fc_yolo = nn.Linear(80, 64).to(device)  # Adjust YOLO features to 64
        self.fc1 = None  # To be initialized dynamically
        self.fc2 = nn.Linear(128, 2).to(device)

    def initialize_sobel_linear(self, input_shape):
        with torch.no_grad():
            # Initialize Sobel Linear
            sample_input = torch.zeros(1, *input_shape).to(device)
            output = self.sobel_cnn(sample_input)
            flattened_size = output.view(-1).size(0)
            self.sobel_linear = nn.Linear(flattened_size, 128).to(device)

            # Calculate the total feature size for fc1
            total_feature_size = 128 + 128 + 128 + 64  # xception + sobel + landmarks + YOLO
            self.fc1 = nn.Linear(total_feature_size, 128).to(device)

    def forward(self, image, sobel_image, yolo_features, face_landmarks):
        # Process features
        yolo_features = yolo_features.float()  # Fix for dtype mismatch
        image_features = self.xception(image)  # Output: [batch_size, 128]
        sobel_features = self.sobel_cnn(sobel_image)  # Output: [batch_size, C, H, W]
        sobel_features = self.sobel_linear(sobel_features.view(sobel_features.size(0), -1))
        yolo_features = torch.relu(self.fc_yolo(yolo_features))
        landmark_features = torch.relu(self.fc_landmarks(face_landmarks))

        # Combine features
        combined = torch.cat((image_features, sobel_features, yolo_features, landmark_features), dim=1)

        # Fully connected layers
        x = torch.relu(self.fc1(combined))
        x = self.fc2(x)
        return x


# Module 2 definition (DNN)
class DNN(nn.Module):
    def __init__(self, input_dim, hidden_dim_1, hidden_dim_2, output_dim, dropout_prob=0.2):
        super(DNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim_1)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(p=dropout_prob)
        self.fc2 = nn.Linear(hidden_dim_1, hidden_dim_2)
        self.dropout2 = nn.Dropout(p=dropout_prob)
        self.fc3 = nn.Linear(hidden_dim_2, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x

# Function to process images
def process_images(folder_path, transform, limit=None):
    images = []
    filenames = []
    labels = []

    for label_dir in ["0_real", "1_fake"]:
        label_path = os.path.join(folder_path, label_dir)
        label = 0 if label_dir == "0_real" else 1

        for i, fname in enumerate(tqdm(os.listdir(label_path), desc=f"Processing {label_dir}")):
            if limit and len(images) >= limit:
                break
            img_path = os.path.join(label_path, fname)
            image = cv2.imread(img_path)
            if image is not None:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = transform(Image.fromarray(image))
                images.append(image)
                filenames.append(fname)
                labels.append(label)

    # Convert to tensors
    X = torch.stack(images)
    y = torch.tensor(labels, dtype=torch.long)
    return X, filenames, y


# Process CSV features for Module 2
def process_csv(path, limit=None):
    df = pd.read_csv(path)
    features = df['features'].apply(lambda x: list(map(float, x.strip('[]').split(','))))
    filenames = df['image_name'].tolist()  # Ensure CSV has 'image_name' column
    X = torch.tensor(features.tolist(), dtype=torch.float32)
    y = torch.tensor(df['label'].values, dtype=torch.long)

    if limit:
        X = X[:limit]
        filenames = filenames[:limit]
        y = y[:limit]

    return X, filenames, y


# Set up Mediapipe for facial landmarks extraction
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)

#import xception
import torch
import pretrainedmodels
import os

# Define the path where the model is saved
drive_path = "/content/drive/MyDrive/Final_folder_code_thesis/REALxception_model.pth"

# Ensure you are using the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the Xception model without pretrained weights
xception_model = pretrainedmodels.__dict__["xception"](pretrained=None).to(device)

# Modify the last linear layer to match the saved model
xception_model.last_linear = torch.nn.Linear(xception_model.last_linear.in_features, 128).to(device)

# Load the state dictionary
if os.path.exists(drive_path):
    print("Loading Xception model from Google Drive...")
    xception_model.load_state_dict(torch.load(drive_path, map_location=device))
else:
    raise FileNotFoundError(f"Model file not found at {drive_path}")





# Now the model is fully loaded with the correct architecture and weights


# Load YOLOv8 model
yolo_model = YOLO("yolov8n.pt").to(device)  # Ensure YOLO runs on GPU if available)  # Choose the YOLOv8 model variant based on resources
# Define COCO classes we are interested in (people, vehicles, animals, household items, etc.)
COCO_CLASSES = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
    "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
    "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone",
    "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear",
    "hair drier", "toothbrush"
]

def generate_sobel_edges(image, transform):
    """
    Generates Sobel edges for a given image.
    """
    gray_image = cv2.cvtColor(image.permute(1, 2, 0).cpu().numpy(), cv2.COLOR_RGB2GRAY)
    sobel_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3)
    sobel_combined = cv2.magnitude(sobel_x, sobel_y)
    sobel_combined = cv2.convertScaleAbs(sobel_combined)
    sobel_combined = cv2.merge([sobel_combined, sobel_combined, sobel_combined])
    return transform(Image.fromarray(sobel_combined))

def extract_yolo_features_and_landmarks(image):
    """
    Extracts YOLO object detection features and face landmarks from the given image.
    """
    results = yolo_model(image.permute(1, 2, 0).cpu().numpy())
    detected_objects = []
    landmarks = np.zeros((936,), dtype=np.float32)

    for result in results[0].boxes:
        class_id = int(result.cls[0])
        class_name = yolo_model.names[class_id]

        if class_name == "person":
            crop = image.permute(1, 2, 0).cpu().numpy()[
                int(result.xyxy[0][1]):int(result.xyxy[0][3]),
                int(result.xyxy[0][0]):int(result.xyxy[0][2]),
            ]
            crop = (crop * 255).astype(np.uint8) if crop.max() <= 1.0 else crop.astype(np.uint8)
            face_result = face_mesh.process(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
            if face_result.multi_face_landmarks:
                landmarks = np.array(
                    [[p.x, p.y] for p in face_result.multi_face_landmarks[0].landmark]
                ).flatten()

        detected_objects.append(class_id)

    yolo_features = torch.tensor([1 if i in detected_objects else 0 for i in range(len(COCO_CLASSES))])
    return yolo_features, torch.tensor(landmarks)

def prepare_ensemble_data(csv_path, folder_path, transform, module1, module2, batch_size=4):
    """
    Prepares the ensemble dataset by processing data in batches.
    """
    images, img_filenames, labels1 = process_images(folder_path, transform)
    csv_features, csv_filenames, labels2 = process_csv(csv_path)

    img_base_names = [os.path.splitext(fname)[0] for fname in img_filenames]
    csv_base_names = [os.path.splitext(fname)[0] for fname in csv_filenames]

    mapping = {f"{base}_{labels1[i].item()}": (i, None) for i, base in enumerate(img_base_names)}
    for i, base in enumerate(csv_base_names):
        key = f"{base}_{labels2[i].item()}"
        if key in mapping:
            mapping[key] = (mapping[key][0], i)

    img_indices, csv_indices = [], []
    for key, (img_idx, csv_idx) in mapping.items():
        if csv_idx is not None:
            img_indices.append(img_idx)
            csv_indices.append(csv_idx)

    images = images[img_indices]
    labels1 = labels1[img_indices]
    csv_features = csv_features[csv_indices]

    combined_outputs_module1, combined_outputs_module2 = [], []
    for i in range(0, len(images), batch_size):
        batch_images = images[i:i + batch_size].to(device)
        sobel_images, yolo_features, face_landmarks = [], [], []

        for img in batch_images:
            sobel_images.append(generate_sobel_edges(img, transform))
            yolo, landmarks = extract_yolo_features_and_landmarks(img)
            yolo_features.append(yolo)
            face_landmarks.append(landmarks)

        sobel_images = torch.stack(sobel_images).to(device)
        yolo_features = torch.stack(yolo_features).to(device)
        face_landmarks = torch.stack(face_landmarks).to(device)

        # Get outputs from module1
        module1.eval()
        with torch.no_grad():
            module1_output = module1(batch_images, sobel_images, yolo_features, face_landmarks)
            combined_outputs_module1.append(module1_output.cpu())

        # Get outputs from module2
        module2.eval()
        with torch.no_grad():
            module2_output = module2(csv_features[i:i + batch_size].to(device))
            combined_outputs_module2.append(module2_output.cpu())

    module1_outputs = torch.cat(combined_outputs_module1, dim=0)
    module2_outputs = torch.cat(combined_outputs_module2, dim=0)

    # Ensure labels match the outputs
    min_size = min(module1_outputs.size(0), labels1.size(0), module2_outputs.size(0))
    module1_outputs = module1_outputs[:min_size]
    module2_outputs = module2_outputs[:min_size]
    labels1 = labels1[:min_size]

    # Return the dataset
    return TensorDataset(module1_outputs, module2_outputs, labels1)
# Initialize models
module1 = DeepfakeClassifier().to(device)
module1.initialize_sobel_linear(input_shape=(3, 299, 299))
module1.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/bedst_model_module2.pth"))

module2 = DNN(input_dim=768, hidden_dim_1=128, hidden_dim_2=256, output_dim=2).to(device)
module2.load_state_dict(torch.load("/content/drive/MyDrive/Final_folder_code_thesis/Original_model/final_model1_weights.pth"))

# Freeze weights of module1 and module2
for param in module1.parameters():
    param.requires_grad = False

for param in module2.parameters():
    param.requires_grad = False

test_dataset = prepare_ensemble_data(test_csv, test_folder, transform, module1, module2)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

# Ensemble model
class EnsembleModel(nn.Module):
    def __init__(self, module1_dim, module2_dim, output_dim):
        super(EnsembleModel, self).__init__()
        self.fc1 = nn.Linear(module1_dim + module2_dim, output_dim)  # Combine module1 and module2 outputs

    def forward(self, x1_logits, x2_logits):
        # Apply softmax to logits for probabilities
        x1_probs = torch.softmax(x1_logits, dim=1)
        x2_probs = torch.softmax(x2_logits, dim=1)
        # Concatenate probabilities
        combined_probs = torch.cat((x1_probs, x2_probs), dim=1)
        # Pass through the fully connected layer
        output = self.fc1(combined_probs)
        return output

In [None]:
import torch
from torch import nn

# Define the accuracy function
def compute_accuracy(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # No need to track gradients for evaluation
        for module1_out, module2_out, labels in dataloader:
            module1_out, module2_out, labels = module1_out.to(device), module2_out.to(device), labels.to(device)

            outputs = model(module1_out, module2_out)
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Assuming EnsembleModel is defined and loaded correctly
ensemble_model = EnsembleModel(module1_dim=2, module2_dim=2, output_dim=2).to(device)

# Load the trained weights from the saved model
ensemble_model.load_state_dict(torch.load("/content/drive/MyDrive/HFMFmednytdataset/ensemble_model_weights.pth"))

# Set the model to evaluation mode
ensemble_model.eval()

# Now compute the accuracy on the test set
test_accuracy = compute_accuracy(ensemble_model, test_loader, device)

# Print the test accuracy
print(f"Test Accuracy facebook: {test_accuracy:.2f}%")


Test Accuracy facebook: 52.10%


In [None]:
from sklearn.metrics import (
    roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay,
    precision_score, recall_score, f1_score, accuracy_score
)
import matplotlib.pyplot as plt

# Store predictions and labels
all_preds = []
all_labels = []
all_probs = []

ensemble_model.eval()
with torch.no_grad():
    for module1_out, module2_out, labels in test_loader:
        module1_out, module2_out = module1_out.to(device), module2_out.to(device)
        outputs = ensemble_model(module1_out, module2_out)

        probs = torch.softmax(outputs, dim=1)[:, 1]  # Probability for class 1 (fake)
        _, preds = torch.max(outputs, 1)

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

# === ROC & AUC ===
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='magenta', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Deepfake Detection (Facebook Test Set)')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()

# === Confusion Matrix ===
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Real", "Fake"])
plt.figure(figsize=(6, 6))
disp.plot(cmap="RdPu", values_format="d")
plt.title("Confusion Matrix for Deepfake Detection (Facebook Test Set)")
plt.grid(False)
plt.show()

# === Metrics ===
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, pos_label=1)
recall = recall_score(all_labels, all_preds, pos_label=1)
f1 = f1_score(all_labels, all_preds, pos_label=1)

print(f"✅ Evaluation Results for Facebook Test Set")
print(f"Accuracy       : {accuracy:.4f}")
print(f"AUC Score      : {roc_auc:.4f}")
print(f"Precision (Fake): {precision:.4f}")
print(f"Recall (Fake)  : {recall:.4f}")
print(f"F1 Score (Fake): {f1:.4f}")

