<a href="https://colab.research.google.com/github/Rishiatweb/Number-plate-and-speed-detection/blob/main/Untitled9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls /content/

dataset-df-dc.zip  drive  sample_data


In [None]:
import os
import zipfile
import json

# Define dataset ZIP file path (update this if needed)
zip_filepath = "/content/drive/MyDrive/dataset-df-dc.zip"  # Change path if your file is elsewhere

# Define extraction folder
extract_folder = "/content/dataset_extracted"
os.makedirs(extract_folder, exist_ok=True)

# Step 1: Extract the ZIP file
with zipfile.ZipFile(zip_filepath, "r") as zip_ref:
    zip_ref.extractall(extract_folder)

print(f"Dataset extracted to: {extract_folder}")

# Step 2: Generate metadata.json
metadata = {}

# Iterate through extracted files
for file_name in os.listdir(extract_folder):
    file_path = os.path.join(extract_folder, file_name)

    # Process only video files (common formats)
    if file_name.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
        metadata[file_name] = {
            "label": "UNKNOWN",  # Modify as needed
            "original": None,     # Modify if deepfake mapping is known
            "split": "train",     # Default to "train", modify if needed
            "actors": []          # Add actor details if available
        }

# Step 3: Save metadata.json
metadata_path = os.path.join(extract_folder, "metadata.json")
with open(metadata_path, "w") as json_file:
    json.dump(metadata, json_file, indent=4)

print(f"Metadata saved at: {metadata_path}")


Dataset extracted to: /content/dataset_extracted
Metadata saved at: /content/dataset_extracted/metadata.json


In [None]:
# Import necessary libraries
import os
import cv2
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, LSTM, Dense, TimeDistributed
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from google.colab import files
import zipfile
from sklearn.model_selection import train_test_split
import psutil


# Enable GPU memory growth (Colab-specific)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

def print_memory_usage():
    mem = psutil.Process().memory_info().rss / 1024**2
    print(f"Memory usage: {mem:.2f} MB")

# --- Data Loading and Preprocessing Module ---
class DataLoader:
    def __init__(self, dataset_path, sequence_length=8, frame_size=(128, 128)):
        self.dataset_path = dataset_path
        self.sequence_length = sequence_length
        self.frame_size = frame_size
        self.metadata = self.load_metadata()

    def load_metadata(self):
        """Load metadata.json to get video labels."""
        metadata_path = os.path.join(self.dataset_path, '/content/dataset_extracted/metadata.json')
        with open(metadata_path, 'r') as f:
            return json.load(f)

    def extract_frames(self, video_path):
        """Extract frames from a video and resize them."""
        cap = cv2.VideoCapture(video_path)
        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, self.frame_size)
            frame = frame / 255.0  # Normalize to [0, 1]
            frames.append(frame)
        cap.release()
        #print(f"Extracted {len(frames)} frames from {video_path}")
        return np.array(frames)

    def load_data_from_subfolder(self, subfolder_path, label, max_videos=20):
           """Loads data from a specific subfolder (fake_videos or real_videos)."""
           X, y = [], []
           video_files = [f for f in os.listdir(subfolder_path) if f.endswith('.mp4')]
           for idx, video_file in enumerate(video_files[:max_videos]):
               video_path = os.path.join(subfolder_path, video_file)
               try:
                frames = self.extract_frames(video_path)
                sequences = self.create_sequences(frames)
                print(f"Video {idx + 1}: {len(sequences)} sequences")
                for seq in sequences:
                   X.append(seq)
                   y.append(label)
                print_memory_usage()
               except Exception as e:
                print(f"Error with {video_file}: {e}")
           X_np = np.array(X)
           y_np = np.array(y)
           #print(f"Subfolder {subfolder_path}: X shape {.shape}, y shape {y_np.shape}")
           return X_np, y_np

    def create_sequences(self, frames):
        """Split frames into sequences of fixed length."""
        if len(frames) < self.sequence_length:
            return []  # Skip if too short
        sequences = []
        for i in range(0, len(frames) - self.sequence_length + 1, self.sequence_length):
            seq = frames[i:i + self.sequence_length]
            sequences.append(seq)
        return sequences

    def load_dataset(self, max_videos=100):
        """Load videos and generate labeled sequences."""
        X, y = [], []
        video_files = [f for f in os.listdir(self.dataset_path) if f.endswith('.mp4')]

        for idx, video_file in enumerate(video_files[:max_videos]):  # Limit for testing
            video_path = os.path.join(self.dataset_path, video_file)
            label = 1 if self.metadata.get(video_file, {}).get('label') == 'FAKE' else 0
            frames = self.extract_frames(video_path)
            sequences = self.create_sequences(frames)

            for seq in sequences:
                X.append(seq)
                y.append(label)
            print(f"Processed video {idx + 1}/{min(max_videos, len(video_files))}")

        return np.array(X), np.array(y)
    def data_generator(self, subfolder_path, label, max_videos=20, batch_size=2):
        video_files = [f for f in os.listdir(subfolder_path) if f.endswith('.mp4')]
        for idx, video_file in enumerate(video_files[:max_videos]):
            video_path = os.path.join(subfolder_path, video_file)
            try:
                frames = self.extract_frames(video_path)
                sequences = self.create_sequences(frames)
                print(f"Video {idx + 1}: {len(sequences)} sequences")
                for i in range(0, len(sequences), batch_size):
                    batch_seq = sequences[i:i + batch_size]
                    yield np.array(batch_seq), np.array([label] * len(batch_seq))
                print_memory_usage()
            except Exception as e:
                print(f"Error with {video_file}: {e}")

# --- Model Definition Module ---
class DeepfakeDetector:
    def __init__(self, sequence_length=16, frame_size=(224, 224)):
        self.sequence_length = sequence_length
        self.frame_size = frame_size
        self.model = self.build_model()

    def build_model(self):
        """Build a CNN-LSTM model for temporal inconsistency detection."""
        model = Sequential([
            # CNN for spatial feature extraction
            TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same'),
                           input_shape=(self.sequence_length, *self.frame_size, 3)),
            TimeDistributed(MaxPooling2D((2, 2))),
            TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same')),
            TimeDistributed(MaxPooling2D((2, 2))),
            TimeDistributed(Flatten()),
            # LSTM for temporal analysis
            LSTM(128, return_sequences=False),
            Dense(64, activation='relu'),
            Dense(1, activation='sigmoid')  # Binary output: real (0) or fake (1)
        ])
        model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        return model

    def train_generator(self, train_gen, val_gen, steps_per_epoch, validation_steps, epochs=5):
        checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_accuracy')
        early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
        history = self.model.fit(
            train_gen, steps_per_epoch=steps_per_epoch,
            validation_data=val_gen, validation_steps=validation_steps,
            epochs=epochs, callbacks=[checkpoint, early_stop]
        )
        return history

    def predict(self, X):
        """Predict on new data."""
        return self.model.predict(X)

# --- Main Execution ---

def main():
    # Step 1: Use existing dataset in Colab
    dataset_path = '/content/dataset_extracted/dataset-df-dc-tarini'
    print("Using existing dataset at:", dataset_path)

    # Step 2: Load and preprocess data with generators
    print("Loading dataset...")
    loader = DataLoader(dataset_path, sequence_length=8, frame_size=(128, 128))

    # Get video files
    fake_videos = [f for f in os.listdir(os.path.join(dataset_path, 'fake_videos')) if f.endswith('.mp4')]
    real_videos = [f for f in os.listdir(os.path.join(dataset_path, 'real_videos')) if f.endswith('.mp4')]
    print(f"Fake videos: {len(fake_videos)}, Real videos: {len(real_videos)}")

    # Balance dataset: sample 77 fake videos to match real
    import random
    random.seed(42)  # For reproducibility
    fake_videos_sampled = random.sample(fake_videos, 77)  # Downsample fake to match real

    # Split: 80% train (62), 20% val (15) per class
    train_fake_videos = fake_videos_sampled[:62]
    val_fake_videos = fake_videos_sampled[62:77]
    train_real_videos = real_videos[:62]
    val_real_videos = real_videos[62:77]

    # Generators with explicit video lists
    def video_generator(subfolder_path, video_list, label, batch_size=2):
        for video_file in video_list:
            video_path = os.path.join(subfolder_path, video_file)
            try:
                frames = loader.extract_frames(video_path)
                sequences = loader.create_sequences(frames)
                print(f"Video {video_file}: {len(sequences)} sequences")
                for i in range(0, len(sequences), batch_size):
                    batch_seq = sequences[i:i + batch_size]
                    yield np.array(batch_seq), np.array([label] * len(batch_seq))
                print_memory_usage()
            except Exception as e:
                print(f"Error with {video_file}: {e}")

    train_fake_gen = video_generator(os.path.join(dataset_path, 'fake_videos'), train_fake_videos, label=1)
    train_real_gen = video_generator(os.path.join(dataset_path, 'real_videos'), train_real_videos, label=0)
    val_fake_gen = video_generator(os.path.join(dataset_path, 'fake_videos'), val_fake_videos, label=1)
    val_real_gen = video_generator(os.path.join(dataset_path, 'real_videos'), val_real_videos, label=0)

    # Combine generators with finite iteration
    def combined_generator(gen1, gen2, total_steps):
        steps = 0
        iter1, iter2 = iter(gen1), iter(gen2)
        while steps < total_steps:
            try:
                yield next(iter1)
                steps += 1
                if steps >= total_steps:
                    break
                yield next(iter2)
                steps += 1
            except StopIteration:
                print("One generator exhausted early")
                break

    # Steps: 37 sequences per video, batch_size=2
    steps_per_epoch = (62 * 37) // 2  # 62 videos x 37 sequences = 2294 sequences, 1147 batches
    validation_steps = (15 * 37) // 2  # 15 videos x 37 sequences = 555 sequences, 277 batches

    train_gen = combined_generator(train_fake_gen, train_real_gen, steps_per_epoch)
    val_gen = combined_generator(val_fake_gen, val_real_gen, validation_steps)

    # Step 3: Initialize and train model
    detector = DeepfakeDetector(sequence_length=8, frame_size=(128, 128))
    print("Training model...")
    history = detector.train_generator(train_gen, val_gen, steps_per_epoch, validation_steps, epochs=5)

    # Step 4: Save and download the model
    detector.model.save('deepfake_detector_final.keras')  # Using .keras format
    print("Model saved as 'deepfake_detector_final.keras'")
    files.download('deepfake_detector_final.keras')

if __name__ == "__main__":
    main()

Using existing dataset at: /content/dataset_extracted/dataset-df-dc-tarini
Loading dataset...
Fake videos: 322, Real videos: 77
Training model...
Video errocgcham.mp4: 37 sequences
Video caifxvsozs.mp4: 37 sequences
Epoch 1/5
[1m  36/1147[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m54s[0m 49ms/step - accuracy: 0.7308 - loss: 0.5758Memory usage: 2517.86 MB
[1m  38/1147[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m53s[0m 48ms/step - accuracy: 0.7407 - loss: 0.5585Video deywhkarol.mp4: 37 sequences
Memory usage: 2642.68 MB
[1m  39/1147[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:04[0m 112ms/step - accuracy: 0.7453 - loss: 0.5502Video bxzakyopjf.mp4: 37 sequences
[1m  74/1147[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m2:13[0m 124ms/step - accuracy: 0.7983 - loss: 0.4634Memory usage: 2718.17 MB
[1m  76/1147[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m2:10[0m 122ms/step - accuracy: 0.8004 - loss: 0.4607Video agdkmztvby.mp4: 37 sequences
Memory usage: 2751.63 MB
[1m  77/1147[0m [32m━[0m[



Video atvmxvwyns.mp4: 37 sequences
[1m1147/1147[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m243s[0m 209ms/step - accuracy: 0.5847 - loss: 0.5624 - val_accuracy: 0.5019 - val_loss: 0.6931
Epoch 2/5
[1m1147/1147[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 85us/step - accuracy: 0.0000e+00 - loss: 0.0000e+00 - val_accuracy: 0.5000 - val_loss: 0.6932
Epoch 3/5




[1m1147/1147[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 123us/step - accuracy: 0.5004 - loss: 0.7039 - val_accuracy: 0.5000 - val_loss: 0.6936
Epoch 4/5
[1m1147/1147[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 103us/step - accuracy: 0.5004 - loss: 0.6986 - val_accuracy: 0.5000 - val_loss: 0.6939
Model saved as 'deepfake_detector_final.keras'


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
def main():
    # Step 1: Upload and unzip dataset in Colab
    print("Upload the dataset ZIP file:")
    dataset_path = '/content/dataset_extracted/dataset-df-dc-tarini'

    # Step 2: Load and preprocess data with generators
    print("Loading dataset...")
    loader = DataLoader(dataset_path, sequence_length=8, frame_size=(128, 128))

    # Generators: 16 train, 4 val per class
    train_fake_gen = loader.data_generator(os.path.join(dataset_path, 'fake_videos'), label=1, max_videos=16)
    train_real_gen = loader.data_generator(os.path.join(dataset_path, 'real_videos'), label=0, max_videos=16)
    val_fake_gen = loader.data_generator(os.path.join(dataset_path, 'fake_videos'), label=1, max_videos=4)
    val_real_gen = loader.data_generator(os.path.join(dataset_path, 'real_videos'), label=0, max_videos=4)

    # Combine generators
    def combined_generator(gen1, gen2):
        while True:
            yield next(gen1)
            yield next(gen2)

    train_gen = combined_generator(train_fake_gen, train_real_gen)
    val_gen = combined_generator(val_fake_gen, val_real_gen)

    # Steps: 37 sequences per video, batch_size=2
    steps_per_epoch = (16 * 37) // 2  # 16 videos per class
    validation_steps = (4 * 37) // 2  # 4 videos per class

    # Step 3: Initialize and train model
    detector = DeepfakeDetector(sequence_length=8, frame_size=(128, 128))
    print("Training model...")
    history = detector.train_generator(train_gen, val_gen, steps_per_epoch, validation_steps, epochs=5)

    # Step 4: Save and download the model
    detector.model.save('deepfake_detector_final.h5')
    print("Model saved as 'deepfake_detector_final.h5'")
    files.download('deepfake_detector_final.h5')

In [None]:
import psutil

def print_memory_usage():
    process = psutil.Process(os.getpid())
    mem = process.memory_info().rss / 1024**2  # MB
    print(f"Memory usage: {mem:.2f} MB")
print_memory_usage()

Memory usage: 887.04 MB


In [None]:
!ls /content/dataset_extracted/

In [None]:
import os
num_fake_videos = len([f for f in os.listdir('/content/dataset_extracted/dataset-df-dc-tarini/fake_videos') if f.endswith('.mp4')])
num_real_videos = len([f for f in os.listdir('/content/dataset_extracted/dataset-df-dc-tarini/real_videos') if f.endswith('.mp4')])
print(f"Number of fake videos: {num_fake_videos}")
print(f"Number of real videos: {num_real_videos}")

Number of fake videos: 322
Number of real videos: 77
