In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(
        name=fn, length=len(uploaded[fn])))


In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!pip install kaggle

In [None]:
!if [ ! -d "/content/automathon-deepfake" ]; then kaggle competitions download -c automathon-deepfake -p /content/automathon-deepfake; unzip /content/automathon-deepfake/automathon-deepfake.zip -d /content/automathon-deepfake; else echo "Directory /content/automathon-deepfake already contains data."; fi



In [None]:
!pip install torch torchvision


In [None]:
!mv /content/automathon-deepfake/dataset/experimental_dataset/metadata.json /content/automathon-deepfake/dataset

In [None]:
%cd /content/automathon-deepfake
%mkdir frames

In [None]:
import torch
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.transforms import Compose, ToPILImage, Resize, CenterCrop, ToTensor, Normalize
import torchvision.transforms as transforms
from torchvision.transforms.functional import to_tensor, normalize
from torchvision.io import read_video
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import os
import glob
import numpy as np
from tqdm import tqdm
import json
from PIL import Image
from torchvision.utils import save_image
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import torchvision.transforms.functional as F
from torchsummary import summary
import torch.optim as optim

In [None]:
DATASET_VIDEO_PATH = "/content/automathon-deepfake/dataset/experimental_dataset"
DATASET_METADATA_PATH = "/content/automathon-deepfake/dataset/metadata.json"
FRAME_SAVE_PATH = "/content/automathon-deepfake/frames"
FRAME_RATE = 1  # Frame rate to sample (e.g., 1 frame per second)

# Make sure the frame save directory exists
os.makedirs(FRAME_SAVE_PATH, exist_ok=True)

# Load video metadata
df_labels = pd.read_json(DATASET_METADATA_PATH, orient='index')
df_labels.reset_index(inplace=True)
df_labels.columns = ['Filename', 'Label']
df_labels['label_value'] = np.where(df_labels['Label'] == 'real', 1, 0)

In [None]:
class VideoDataset(Dataset):
    def __init__(self, dataframe, root_dir, sequence_length=10, transform=None):
        """
        Args:
            dataframe (DataFrame): DataFrame containing video filenames and labels.
            root_dir (str): Directory path where video files are stored.
            sequence_length (int): Number of frames to extract from each video.
            transform (callable, optional): Optional transform to be applied on a frame.
        """
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.sequence_length = sequence_length
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        video_filename = self.dataframe.iloc[idx]['Filename']
        video_path = os.path.join(self.root_dir, video_filename)
        label = self.dataframe.iloc[idx]['label_value']
        # Read video and extract frames
        frames, _, _ = read_video(video_path, pts_unit='sec', start_pts=0, end_pts=10, output_format='TCHW')
        total_frames = len(frames)
        frame_indices = torch.linspace(0, total_frames - 1, steps=self.sequence_length).long()
        selected_frames = frames[frame_indices]

        processed_frames = []
        for frame in selected_frames:
            if self.transform:
                frame = self.transform(frame)
            processed_frames.append(frame)

        frames_tensor = torch.stack(processed_frames)
        return frames_tensor, label

# Example of setting up the dataset and dataloader with transformations
transform = transforms.Compose([
    transforms.ToPILImage(),  # Necessary to convert raw video frame to PIL Image for some transformations
    transforms.Resize((256, 256)),
    transforms.CenterCrop((224,224)),
    transforms.ToTensor(),  # Convert the PIL Image to a tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the tensor
])

dataset = VideoDataset(df_labels, DATASET_VIDEO_PATH, transform=transform)
loader = DataLoader(dataset, batch_size=1, shuffle=True)

In [None]:

# Assuming 'df_labels' is your DataFrame containing video filenames and their labels
# Shuffle the DataFrame
df_labels = df_labels.sample(frac=1).reset_index(drop=True)

# Define split sizes
train_size = int(0.7 * len(df_labels))
val_size = int(0.15 * len(df_labels))
test_size = len(df_labels) - train_size - val_size

# Split the DataFrame into train, validation, and test sets
train_df = df_labels[:train_size]
val_df = df_labels[train_size:train_size + val_size]
test_df = df_labels[train_size + val_size:]

# Define a transform pipeline
transform = Compose([
    ToPILImage(),  # Convert raw video frame to PIL Image for transformations
    Resize((256, 256)),
    CenterCrop((224, 224)),
    ToTensor(),  # Convert the PIL Image to a tensor
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the tensor
])

# Create datasets for each set
train_dataset = VideoDataset(train_df, DATASET_VIDEO_PATH, transform=transform)
val_dataset = VideoDataset(val_df, DATASET_VIDEO_PATH, transform=transform)
test_dataset = VideoDataset(test_df, DATASET_VIDEO_PATH, transform=transform)

# Create data loaders for each set
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Now you can use these loaders in your training loop


In [None]:
import torch
from torch.utils.data import DataLoader

# Assuming train_loader is already defined
data_loader = train_loader  # You can replace this with val_loader or test_loader as needed

# Fetch and print the shape of one batch of data and labels from the specified DataLoader
for data, labels in data_loader:
    print(f"Data shape: {data.shape}")  # Shape of the video frames tensor
    print(f"Labels shape: {labels.shape}")  # Shape of the labels tensor
    break  # Only look at the first batch


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class VideoCNN3D(nn.Module):
    def __init__(self):
        super(VideoCNN3D, self).__init__()
        # 3D convolutional layer with input channels = 3 (RGB)
        self.conv1 = nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=1)
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        # Second 3D convolutional layer
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3), padding=1)
        self.pool2 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        # Third 3D convolutional layer
        self.conv3 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), padding=1)
        self.pool3 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        # Flattening and dense layers
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 10 * 28 * 28, 512)  # Adjust size according to the output of the last pool layer
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 2)  # Assuming binary classification

    def forward(self, x):
        # Invert sequence length and channels dimensions
        x = input_tensor.permute(0, 2, 1, 3, 4)
        x = F.relu(self.conv1(x))
        x = self.pool1(x)

        x = F.relu(self.conv2(x))
        x = self.pool2(x)

        x = F.relu(self.conv3(x))
        x = self.pool3(x)

        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        x = F.softmax(x, dim=1)
        return torch.argmax(x, dim=1)

# Example initialization and forward pass simulation
model = VideoCNN3D()
print(model)

# Simulate a forward pass
input_tensor = torch.randn(1, 10 ,3, 224, 224)  # Batch size, Channels, Sequence length, Height, Width
output = model(input_tensor)
print("Output shape:", output.shape)


In [None]:
model = VideoCNN3D()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)


In [None]:
! pip install av

In [None]:
import copy
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        model.train()  # Set model to training mode

        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs
            labels = labels

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Training Loss: {epoch_loss:.4f}')

        # Validation phase
        model.eval()
        val_loss = 0.0
        for inputs, labels in val_loader:
            inputs = inputs
            labels = labels

            with torch.no_grad():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)

        val_loss /= len(val_loader.dataset)
        print(f'Validation Loss: {val_loss:.4f}')

        # Deep copy the model if it has the best validation loss so far
        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), 'best_model.pth')
            print('Best model saved.')

        # Step the scheduler
        scheduler.step(val_loss)

    print('Training complete. Best val Loss: {:4f}'.format(best_loss))
    model.load_state_dict(best_model_wts)
    return model

# Train the model
trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler)
