In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
import pandas as pd
from glob import glob

In [None]:
import os

# Adjust the path based on actual folder location
fake_folder = '/kaggle/input/final-dataset/final_dataset/Celeb-synthesis'
real_folder = '/kaggle/input/final-dataset/final_dataset/Celeb-real'

# Check if paths exist
print("Fake folder exists:", os.path.exists(fake_folder))
print("Real folder exists:", os.path.exists(real_folder))

In [None]:
from glob import glob

fake_videos = sorted(glob(os.path.join(fake_folder, '*.mp4')))
real_videos = sorted(glob(os.path.join(real_folder, '*.mp4')))

print("Number of fake videos:", len(fake_videos))
print("Number of real videos:", len(real_videos))

In [None]:
all_videos =  real_videos+fake_videos
labels = [0]*len(real_videos) + [1]*len(fake_videos)  # 1 = Fake, 0 = Real

# Example:
for video, label in zip(all_videos, labels):
    print(f"{video} -> {'Fake' if label == 1 else 'Real'}")

In [None]:
frame_count = []
for video_file in all_videos:
  cap = cv2.VideoCapture(video_file)
  frame_count.append(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))
print("frames are " , frame_count)
print("Total no of video: " , len(frame_count))
print('Average frame per video:',np.mean(frame_count))

In [None]:
output_video_folder = "/kaggle/working/output_videos"
os.makedirs(output_video_folder, exist_ok=True)

label_file_path = os.path.join(output_video_folder, "labels.csv")
label_entries = []


In [None]:
!pip install facenet-pytorch --no-deps --quiet



In [None]:
import os
import cv2

def filter_and_trim_videos(video_path, output_folder, label, required_frames=150):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    video_name = os.path.splitext(os.path.basename(video_path))[0]

    # Only process if video has at least required_frames
    if total_frames < required_frames:
        cap.release()
        print(f"Skipping {video_name}: only {total_frames} frames (< {required_frames})")
        return None

    frames = []
    for i in range(required_frames):
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()

    if len(frames) < required_frames:
        print(f"Skipping {video_name}: could not read {required_frames} frames")
        return None

    # Determine frame size from first frame
    if frames:
        height, width = frames[0].shape[:2]
        size = (width, height)
    else:
        print(f"Skipping {video_name}: empty frame list")
        return None

    # Save trimmed video
    os.makedirs(output_folder, exist_ok=True)
    output_path = os.path.join(output_folder, f"{video_name}.mp4")
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, 25.0, size)

    for frame in frames:
        out.write(frame)
    out.release()

    print(f"Saved trimmed video: {output_path}")
    return video_name, label


In [None]:
from tqdm import tqdm
import cv2
import os
from glob import glob
import os
from tqdm import tqdm

fake_folder = '/kaggle/input/final-dataset/final_dataset/Celeb-synthesis'
real_folder = '/kaggle/input/final-dataset/final_dataset/Celeb-real'

fake_videos = sorted(glob(os.path.join(fake_folder, '*.mp4')))
real_videos = sorted(glob(os.path.join(real_folder, '*.mp4')))

all_video_paths = [(path, 1) for path in fake_videos] + [(path, 0) for path in real_videos]

output_video_folder = '/kaggle/working/trimmed_videos'
label_entries = []

# --- Make sure output_video_folder exists before the loop ---
# This is crucial, as os.path.join won't create the directory.
os.makedirs(output_video_folder, exist_ok=True)

for path, label in tqdm(all_video_paths):
    try:
        # filter_and_trim_videos should return the filename (e.g., 'id20_id35_0009.mp4')
        # if it returns the full path, you'll need to adjust how you construct full_video_path
        result = filter_and_trim_videos(path, output_video_folder, label, required_frames=150)
        
        if result:
            video_filename, processed_label = result # Unpack the result. Renamed `label` to `processed_label` to avoid confusion with loop's `label`
            
            # Construct the full path to the output video
            full_video_path = os.path.join(output_video_folder, video_filename)
            
            # Append the full path and the label
            label_entries.append((full_video_path, processed_label))
            
    except Exception as e:
        print(f"\nError processing video: {path}")
        print(f"Error details: {e}")
        continue


In [None]:
df_labels = pd.DataFrame(label_entries, columns=['video_name', 'label'])
df_labels.to_csv(label_file_path, index=False)

In [None]:
df_labels = df_labels.drop_duplicates()
df_labels = df_labels.reset_index(drop=True)


In [None]:
X=df_labels['video_name']
y=df_labels['label']

In [None]:
from sklearn.model_selection import train_test_split
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=42)

In [None]:
X_train = X_train.reset_index(drop=True)
X_test = X_test.reset_index(drop=True)

In [None]:
X_train
X_train = X_train.apply(lambda x: str(x) + ".mp4")

In [None]:
X_train


In [None]:
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
    transforms.Resize((112, 112)),   # Ensure same spatial size
    transforms.ToTensor(),           # Convert image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Same normalization as training
                         std=[0.229, 0.224, 0.225])
])


In [None]:
df_labels['video_name'] = df_labels['video_name'].apply(lambda x: str(x) + ".mp4")

print("\ndf_labels['video_name'] AFTER adding .mp4:")
print(df_labels['video_name'].head())


# --- Step 2: Now, create your dictionary using the corrected df_labels ---
video_label_dict = dict(zip(df_labels['video_name'], df_labels['label']))

In [None]:
video_label_dict

In [None]:
!pip install face_recognition

In [None]:
import face_recognition
print("face_recognition version:", face_recognition.__version__)

In [None]:
import torch
from torch.utils.data import Dataset
import cv2
import os
import numpy as np
from torchvision import transforms
import face_recognition
from PIL import Image
import matplotlib.pyplot as plt

class VideoDataSet(Dataset):
    def __init__(self, video_paths, label_dict, num_frames=10, frame_stride=20, transform=None, device='cpu', debug=False): # device is not used in this class
        self.video_paths = video_paths
        self.label_dict = label_dict
        self.num_frames = num_frames
        self.frame_stride = frame_stride
        self.transform = transform
        self.debug = debug
        # self.device = device # device parameter was not used

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, index):
        path = self.video_paths[index]
        
        label = self.label_dict.get(path, -1)

        frames_tensor_list = self.extract_and_process_frames(path, debug=self.debug, index=index)

        # If no frames found, pad with dummy tensors
        if len(frames_tensor_list) == 0:
            dummy = torch.zeros((3, 112, 112)) # Assuming 3 channels, 112x112 size
            frames_tensor_list = [dummy] * self.num_frames

        # Pad if fewer than expected
        while len(frames_tensor_list) < self.num_frames:
            frames_tensor_list.append(frames_tensor_list[-1].clone()) # Pad with the last valid frame

        video_tensor = torch.stack(frames_tensor_list)
        return video_tensor, label

    def extract_and_process_frames(self, path, debug=False, index=0):
        cap = cv2.VideoCapture(path)
        if not cap.isOpened():
            print(f"Error: Could not open video {path}")
            return []
            
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        processed_frames_tensors = []

        for i in range(self.num_frames):
            frame_idx = i * self.frame_stride
            if frame_idx >= total_frames:
                break

            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            success, frame = cap.read()
            if not success:
                # print(f"Warning: Failed to read frame {frame_idx} from video {path}") # Optional warning
                continue

            # Convert BGR to RGB
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Detect faces (face_recognition returns list of (top, right, bottom, left))
            face_locations = face_recognition.face_locations(rgb_frame)

            pil_image_to_transform: Image.Image # This will be the PIL image (face or full frame)

            if len(face_locations) == 0:
                # No face found, use whole resized frame as fallback
                pil_img = Image.fromarray(rgb_frame)
                pil_image_to_transform = pil_img.resize((112, 112))
                
                if debug:
                    print(f"[Warning] No face detected at frame {frame_idx} in video index {index} (path: {os.path.basename(path)})")
                    # Plot the image as it was in the original code for this specific "no face" debug case:
                    # i.e., resized PIL image converted to tensor, before any self.transform.
                    temp_tensor_for_plot = transforms.ToTensor()(pil_image_to_transform)
                    self.plot_image(temp_tensor_for_plot, index, frame_idx, "No Face Fallback")
            else:
                # Use first detected face
                top, right, bottom, left = face_locations[0]
                # Ensure coordinates are within image bounds, although face_recognition should handle this.
                top = max(0, top)
                left = max(0, left)
                bottom = min(rgb_frame.shape[0], bottom)
                right = min(rgb_frame.shape[1], right)
                
                face_crop = rgb_frame[top:bottom, left:right]

                if face_crop.size == 0: # Check if crop is empty
                    # Fallback if face crop is empty for some reason (e.g., invalid coordinates)
                    pil_img = Image.fromarray(rgb_frame)
                    pil_image_to_transform = pil_img.resize((112, 112))
                    if debug:
                        print(f"[Warning] Empty face crop at frame {frame_idx} in video index {index}, using full frame. (path: {os.path.basename(path)})")
                        temp_tensor_for_plot = transforms.ToTensor()(pil_image_to_transform)
                        self.plot_image(temp_tensor_for_plot, index, frame_idx, "Empty Crop Fallback")
                else:
                    # Convert to PIL Image and resize
                    pil_image_to_transform = Image.fromarray(face_crop).resize((112, 112))

            # Apply user-defined transforms (expected to take PIL Image and output Tensor)
            # or default ToTensor if no transform is provided.
            final_tensor_frame: torch.Tensor
            if self.transform:
                final_tensor_frame = self.transform(pil_image_to_transform)
            else:
                final_tensor_frame = transforms.ToTensor()(pil_image_to_transform)

            # Visual debug plot for the first frame of the first few samples (shows final processed tensor)
            if debug and i == 0 and index < 3:
                # This plots the frame after self.transform (or default ToTensor)
                # It might replot if "no face" debug also plotted, but this shows the *final* tensor.
                self.plot_image(final_tensor_frame, index, frame_idx, "Processed Frame")

            processed_frames_tensors.append(final_tensor_frame)

        cap.release()
        return processed_frames_tensors

    def plot_image(self, image_tensor, video_index, frame_index, title_prefix=""):
        # Ensure tensor is on CPU and is a float
        npimg = image_tensor.cpu().float().permute(1, 2, 0).numpy()
        
        # Normalize for display (handles tensors that are [0,1] or normalized with mean/std)
        min_val = npimg.min()
        max_val = npimg.max()
        if max_val - min_val > 1e-5: # Avoid division by zero/small range
            npimg = (npimg - min_val) / (max_val - min_val)
        npimg = np.clip(npimg, 0, 1) # Clip to [0,1] range

        plt.imshow(npimg)
        plt.title(f'{title_prefix} - Video #{video_index + 1} - Frame {frame_index}')
        plt.axis('off')
        plt.show()

In [None]:
# Create your dataset
X_train_dataset = VideoDataSet(X_train, video_label_dict, num_frames=10, transform=train_transform, debug=True)

# Get the 0th video and its label
video_tensor, label = X_train_dataset[65]

# Pick a specific frame (e.g., 5th frame, index 4)
frame_to_plot = video_tensor[4]

# Call the method using the correct dataset object
X_train_dataset.plot_image(frame_to_plot, video_index=34, frame_index=9)



In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(X_train_dataset, batch_size=4, shuffle=True)

for videos, labels in train_loader:
    print("Batch video tensor shape:", videos.shape)
    print("Batch labels:", labels)


In [None]:
test_loader=DataLoader(X_test_dataset,batch_size=4,shuffle=True)


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class DeepfakeModel(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=True):
        super(DeepfakeModel, self).__init__()

        # Load pretrained ResNeXt50 and remove last two layers
        model = models.resnext50_32x4d(pretrained=True)                                                       #Loads a pre-trained ResNeXt-50 model
        self.model = nn.Sequential(*list(model.children())[:-2])                                              #Extracts all layers except the last two (typically the classification head) to use as a feature extractor.
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional)
        self.relu = nn.LeakyReLU()                                                                            #Initializes a Leaky ReLU activation function
        self.dp = nn.Dropout(0.4)                                                                             #Adds a dropout layer with a rate of 0.4 to help prevent overfitting by randomly zeroing out 40% of the neurons during training.
        self.linear1 = nn.Linear(2048, num_classes)                                                           #Defines a fully connected layer that maps the LSTM output to the number of classes (2 classes: fake and real).
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        
    def forward(self, x):
        batch_size, seq_length, c, h, w = x.shape                                                             #c=no. of channels (3-RGB) (h*w=112*112)
        x = x.view(batch_size * seq_length, c, h, w)                                                          #Reshapes the input tensor x to (batch_size * seq_length, channels, height, width), allowing each frame to be processed individually by the CNN.
        fmap = self.model(x)                                                                                  #Passes the reshaped input through the ResNeXt model to extract feature maps.
        x = self.avgpool(fmap)                                                                                #Applies adaptive average pooling to reduce spatial dimensions to 1x1.
        x = x.view(batch_size, seq_length, 2048)                                                              #Reshapes the pooled output to (batch_size, seq_length, 2048), making it suitable for input to the LSTM.
        x_lstm, _ = self.lstm(x, None)                                                                        #Passes the input tensor through the LSTM layer, initializing the hidden state to zeros; 'x_lstm' contains the LSTM output for each time step and output of hidden state is ignored(_).
        return fmap, self.dp(self.linear1(torch.mean(x_lstm, dim=1))) 


        


In [None]:
def train_model(model, train_loader, num_epochs, optimizer, device):
    model = model.to(device)
    model.train()

    
    class_weights = torch.tensor([1.0, 5.0]).cuda()  # Defines class weights to handle class imbalance
    criterion = nn.CrossEntropyLoss(weight=class_weights).cuda()  # Uses cross-entropy loss with class weights



    for epoch in range(num_epochs):
        total_loss = 0.0
        total_correct = 0
        total_samples = 0

        print(f"\nEpoch {epoch + 1}/{num_epochs}")

        for batch_idx, (videos, labels) in enumerate(train_loader):
            videos = videos.to(device)          # [B, T, 3, H, W]
            labels = labels.to(device).float()  # [B], binary (0 or 1)

            optimizer.zero_grad()

            # Forward pass through full model (ResNeXt + LSTM)
            _, outputs = model(videos)          # outputs: [B]

            
            labels = labels.to(device).long()
            loss = criterion(outputs, labels)   # loss between logits and target
            optimizer.zero_grad()
            loss.backward()



            optimizer.step()


            # Convert logits to probabilities with sigmoid
            
            preds = torch.argmax(outputs, dim=1)  # shape: [B]


            batch_correct = (preds == labels).sum().item()
            total_correct += batch_correct
            total_samples += labels.size(0)
            total_loss += loss.item()

            batch_accuracy = batch_correct / labels.size(0)
            running_accuracy = total_correct / total_samples

            print(f"Batch {batch_idx+1}: Loss={loss.item():.4f}, "
                  f"Batch Acc={batch_accuracy*100:.2f}%, "
                  f"Running Acc={running_accuracy*100:.2f}%")

        avg_loss = total_loss / (batch_idx + 1)
        final_accuracy = total_correct / total_samples
        print(f"Epoch {epoch+1} Summary: Avg Loss={avg_loss:.4f}, "
              f"Accuracy={final_accuracy*100:.2f}%")


In [None]:
import torch.optim as optim

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = DeepfakeModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001,weight_decay=1e-5)

# assuming train_loader is ready with (videos, labels)

train_model(model, train_loader, num_epochs=10, optimizer=optimizer, device=device)


In [None]:
torch.save(model.state_dict(), 'model_weights.pth')



In [None]:
X_test = X_test.apply(lambda x: str(x) + ".mp4")

In [None]:
X_test

In [None]:
X_test_dataset = VideoDataSet(X_test, video_label_dict, num_frames=10, transform=train_transform, debug=True)

In [None]:
test_loader = DataLoader(X_test_dataset, batch_size=4, shuffle=True)

In [None]:
for i, (videos, labels) in enumerate(test_loader):
    print(f"\nBatch {i+1}:")
    print("Batch video tensor shape:", videos.shape)  # e.g. torch.Size([4, 3, 16, 64, 64])
    print("Batch labels:", labels)

    if i == 2:  # Stop after printing 3 batches (index 0, 1, 2)
        break


In [None]:
# Make sure you have this import at the top of your file
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report # MODIFIED IMPORT

def test_model(model, test_loader, device):
    model = model.to(device)
    model.eval()

    # Note: These weights are used for calculating loss, not for evaluation metrics.
    class_weights = torch.tensor([1.0, 5.0]).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)

    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch_idx, (videos, labels) in enumerate(test_loader):
            videos = videos.to(device)
            labels = labels.to(device).long()

            _, outputs = model(videos)
            loss = criterion(outputs, labels)

            preds = torch.argmax(outputs, dim=1)

            # --- This was a duplicate line in your original code, I removed one ---
            # preds = torch.argmax(outputs, dim=1) 

            batch_correct = (preds == labels).sum().item()
            total_correct += batch_correct
            total_samples += labels.size(0)
            total_loss += loss.item()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            batch_accuracy = batch_correct / labels.size(0)
            running_accuracy = total_correct / total_samples

            print(f"Test Batch {batch_idx+1}: Loss={loss.item():.4f}, "
                  f"Batch Acc={batch_accuracy*100:.2f}%, "
                  f"Running Acc={running_accuracy*100:.2f}%")

    avg_loss = total_loss / (batch_idx + 1)
    final_accuracy = total_correct / total_samples
    print(f"\nTest Summary: Avg Loss={avg_loss:.4f}, Accuracy={final_accuracy*100:.2f}%")

    # --- NEW CODE BLOCK STARTS HERE ---

    # Generate and print the classification report
    # Using target_names makes the report easier to read
    print("\nClassification Report:")
    target_names = ['Real (Class 0)', 'Deepfake (Class 1)']
    report = classification_report(all_labels, all_preds, target_names=target_names)
    print(report)

    # --- NEW CODE BLOCK ENDS HERE ---

    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6,5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[0,1], yticklabels=[0,1])
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
test_model(model, test_loader, device)
