In [None]:
# from google.colab import drive
# drive.mount('/content/drive')


In [None]:
# !ls drive/MyDrive

In [None]:
!pip install tensorflow scikit-learn tqdm



In [None]:
# # Libraries
# import os, random
# import cv2
# import matplotlib.pyplot as plt
# from tqdm import tqdm

In [None]:
# # frames path
# raw_root = "/content/drive/MyDrive/frames"
# REAL_DIR = "/content/drive/MyDrive/frames/reals 2.o"
# FAKE_DIR = "/content/drive/MyDrive/frames/fakes"

In [None]:
# import os

# print("Checking REAL_DIR:", REAL_DIR, " → files:", len(os.listdir(REAL_DIR)) if os.path.exists(REAL_DIR) else "MISSING")
# print("Checking FAKE_DIR:", FAKE_DIR, " → files:", len(os.listdir(FAKE_DIR)) if os.path.exists(FAKE_DIR) else "MISSING")

# # List first few files
# if os.path.exists(REAL_DIR):
#     print("Sample real frames:", os.listdir(REAL_DIR)[:10])
# if os.path.exists(FAKE_DIR):
#     print("Sample fake frames:", os.listdir(FAKE_DIR)[:10])


In [None]:
# def show_samples(label, num=2):
#     folder = os.path.join(raw_root, label)
#     files  = random.sample(os.listdir(folder), min(len(os.listdir(folder)), num))
#     plt.figure(figsize=(6,6))
#     for i, f in enumerate(files):
#         img = cv2.cvtColor(
#             cv2.imread(os.path.join(folder, f)),
#             cv2.COLOR_BGR2RGB
#         )
#         plt.subplot(2,2,i+1)
#         plt.imshow(img)
#         plt.title(f"{label}: {f}")
#         plt.axis('off')
#     plt.show()

# print("Sample FAKE faces:")
# show_samples('fakes')
# print("\nSample REAL faces:")
# show_samples('reals 2.o')

In [None]:
# 1) Mount Drive (run in Colab)
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# 2) Paths (your uploaded location)
RAW_ROOT = "/content/drive/MyDrive/frames"
REAL_DIR = "/content/drive/MyDrive/frames/reals 2.o"   # your folder name (note the space + dot)
FAKE_DIR = "/content/drive/MyDrive/frames/fakes"

# Quick checks
import os
print("RAW_ROOT exists:", os.path.isdir(RAW_ROOT))
print("REAL_DIR exists:", os.path.isdir(REAL_DIR), " ->", len(os.listdir(REAL_DIR)) if os.path.isdir(REAL_DIR) else "MISSING")
print("FAKE_DIR exists:", os.path.isdir(FAKE_DIR), " ->", len(os.listdir(FAKE_DIR)) if os.path.isdir(FAKE_DIR) else "MISSING")

# show few filenames (to confirm naming convention)
if os.path.isdir(REAL_DIR):
    print("Few real filenames:", os.listdir(REAL_DIR)[:10])
if os.path.isdir(FAKE_DIR):
    print("Few fake filenames:", os.listdir(FAKE_DIR)[:10])


In [None]:
# 3) Group frames by video base name
import re
from collections import defaultdict
from tqdm import tqdm

SEQ_LEN = 16   # frames per sequence (you can change later)
def collect_video_sequences(folder, seq_len=SEQ_LEN):
    files = sorted([f for f in os.listdir(folder) if f.lower().endswith(('.jpg','.jpeg','.png'))])
    groups = defaultdict(list)
    # attempt common filename patterns:
    # expects names like: vid001_f001.jpg  or anything that has "_f<number>"
    pat = re.compile(r"^(.*)_f\d+", re.IGNORECASE)
    for fn in files:
        m = pat.match(fn)
        if m:
            base = m.group(1)
        else:
            # fallback: everything before last underscore (if present)
            if "_" in fn:
                base = "_".join(fn.split("_")[:-1])
            else:
                base = os.path.splitext(fn)[0]  # treat full name as base
        groups[base].append(os.path.join(folder, fn))
    # produce only sequences with >=1 frames, trim to seq_len (we will pad later if needed)
    sequences = []
    for base, flist in groups.items():
        flist_sorted = sorted(flist, key=lambda p: int(re.search(r"_f(\d+)", os.path.basename(p) or "")[1]) if re.search(r"_f(\d+)", os.path.basename(p) or "") else os.path.basename(p))
        if len(flist_sorted) >= 1:
            sequences.append(flist_sorted)
    return sequences

print("Collecting sequences (this might take a moment)...")
real_seq = collect_video_sequences(REAL_DIR, SEQ_LEN)
fake_seq = collect_video_sequences(FAKE_DIR, SEQ_LEN)
print("Video sequences found -> real:", len(real_seq), " fake:", len(fake_seq))


In [None]:
# 4) Choose how many videos per class to use (change SAMPLE_PER_CLASS as needed)
MIN_AVAILABLE = min(len(real_seq), len(fake_seq))
SAMPLE_PER_CLASS = min(5000, MIN_AVAILABLE)   # default: up to 5000 per class
print("Min available per class:", MIN_AVAILABLE)
print("Using sample per class:", SAMPLE_PER_CLASS)

# 5) Randomly sample balanced sequences
import random
RNG = 42
random.seed(RNG)
real_sample = random.sample(real_seq, SAMPLE_PER_CLASS)
fake_sample = random.sample(fake_seq, SAMPLE_PER_CLASS)

# 6) Create a local copy for faster training
LOCAL_ROOT = "/content/local_dataset"   # Colab VM (fast)
import shutil, os
def prepare_local_copy(sequences, out_folder):
    os.makedirs(out_folder, exist_ok=True)
    for i, seq in enumerate(tqdm(sequences, desc=f"Copying to {out_folder}")):
        # create a folder per video: video_000001, video_000002, ...
        vd = os.path.join(out_folder, f"video_{i:06d}")
        os.makedirs(vd, exist_ok=True)
        for j, src in enumerate(seq):
            dst = os.path.join(vd, f"f{j:04d}.jpg")
            # copy (or use hardlink if on same fs, but here copy)
            shutil.copy2(src, dst)

out_real = os.path.join(LOCAL_ROOT, "real")
out_fake = os.path.join(LOCAL_ROOT, "fake")
# Copy
prepare_local_copy(real_sample, out_real)
prepare_local_copy(fake_sample, out_fake)
print("Local copy prepared:", LOCAL_ROOT)


In [None]:
# 7) Keras Sequence to yield batches of sequences
import numpy as np
import cv2
from tensorflow.keras.utils import Sequence
import math

IMG_SIZE = 128
SEQ_LEN = 16
BATCH_SIZE = 8  # adjust to fit GPU memory (lower if OOM)

def preprocess_img_cv2(path):
    img = cv2.imread(path)
    if img is None:
        # fallback black image if read fails
        img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    img = img.astype(np.float32) / 255.0
    return img

class VideoSequence(Sequence):
    def __init__(self, video_folders, labels, batch_size=BATCH_SIZE, seq_len=SEQ_LEN, shuffle=True, augment=False):
        self.video_folders = video_folders
        self.labels = np.array(labels)
        self.batch_size = batch_size
        self.seq_len = seq_len
        self.shuffle = shuffle
        self.augment = augment
        self.indexes = np.arange(len(self.video_folders))
        self.on_epoch_end()

    def __len__(self):
        return math.ceil(len(self.video_folders) / float(self.batch_size))

    def __getitem__(self, idx):
        batch_idx = self.indexes[idx*self.batch_size:(idx+1)*self.batch_size]
        batch_x = []
        batch_y = []
        for i in batch_idx:
            folder = self.video_folders[i]
            imgs = sorted([os.path.join(folder, f) for f in os.listdir(folder) if f.lower().endswith(('.jpg','.png'))])
            # choose central window if more than seq_len; otherwise pad by repeating last frame
            n = len(imgs)
            if n >= self.seq_len:
                start = max(0, (n - self.seq_len)//2)
                chosen = imgs[start:start+self.seq_len]
            else:
                chosen = imgs + [imgs[-1]]*(self.seq_len - n)
            frames = np.stack([preprocess_img_cv2(p) for p in chosen], axis=0)  # (seq_len, H, W, 3)
            # optionally augment per-frame (simple horizontal flip)
            if self.augment and np.random.rand() < 0.5:
                frames = frames[:, :, ::-1, :]  # flip horizontally
            batch_x.append(frames)
            batch_y.append(self.labels[i])
        batch_x = np.array(batch_x, dtype=np.float32)  # (batch, seq_len, H, W, 3)
        batch_y = np.array(batch_y, dtype=np.float32)
        return batch_x, batch_y

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)


In [None]:
# 8) Build lists of local video folders (we copied them earlier)
import glob
real_folders = sorted(glob.glob(os.path.join(out_real, "video_*")))
fake_folders = sorted(glob.glob(os.path.join(out_fake, "video_*")))

print("Local video folders -> real:", len(real_folders), " fake:", len(fake_folders))

# balanced lists
min_count = min(len(real_folders), len(fake_folders))
real_folders = random.sample(real_folders, min_count)
fake_folders = random.sample(fake_folders, min_count)

all_folders = real_folders + fake_folders
labels = [0]*len(real_folders) + [1]*len(fake_folders)

# shuffle combined
pairs = list(zip(all_folders, labels))
random.shuffle(pairs)
all_folders, labels = zip(*pairs)
all_folders = list(all_folders)
labels = list(labels)

# splits (70/15/15)
from sklearn.model_selection import train_test_split
train_f, rest_f, train_y, rest_y = train_test_split(all_folders, labels, test_size=0.30, random_state=42, stratify=labels)
val_f, test_f, val_y, test_y = train_test_split(rest_f, rest_y, test_size=0.50, random_state=42, stratify=rest_y)

print("Splits -> train:", len(train_f), " val:", len(val_f), " test:", len(test_f))

# create generators
train_gen = VideoSequence(train_f, train_y, batch_size=BATCH_SIZE, seq_len=SEQ_LEN, shuffle=True, augment=True)
val_gen   = VideoSequence(val_f, val_y, batch_size=BATCH_SIZE, seq_len=SEQ_LEN, shuffle=False, augment=False)
test_gen  = VideoSequence(test_f, test_y, batch_size=BATCH_SIZE, seq_len=SEQ_LEN, shuffle=False, augment=False)


In [None]:
# 9) Build model (TimeDistributed frame encoder -> BiLSTM -> Dense)
import tensorflow as tf
from tensorflow.keras import layers, models

IMG_SIZE = 128
FEATURE_DIM = 512  # projection after frame encoder
# Frame encoder (EfficientNetB0, frozen initially)
base_cnn = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet', input_shape=(IMG_SIZE, IMG_SIZE, 3))
base_cnn.trainable = False
frame_input = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
x = tf.keras.applications.efficientnet.preprocess_input(frame_input*255.0)  # our generator returns [0,1]
x = base_cnn(x)
x = layers.GlobalAveragePooling2D()(x)
frame_encoder = models.Model(frame_input, x, name="frame_encoder")

# Sequence model
video_input = layers.Input(shape=(SEQ_LEN, IMG_SIZE, IMG_SIZE, 3), name="video_input")
# apply encoder to each frame
encoded = layers.TimeDistributed(frame_encoder)(video_input)   # (batch, seq_len, feat_dim)
proj = layers.TimeDistributed(layers.Dense(FEATURE_DIM, activation='relu'))(encoded)
lstm = layers.Bidirectional(layers.LSTM(256, return_sequences=False, dropout=0.2))(proj)
x = layers.Dense(128, activation='relu')(lstm)
x = layers.Dropout(0.4)(x)
out = layers.Dense(1, activation='sigmoid')(x)

model = models.Model(video_input, out)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])
model.summary()


In [None]:
# 10) Compute class weights
import numpy as np
from sklearn.utils import class_weight
train_labels_array = np.array(train_y)
cw = class_weight.compute_class_weight("balanced", classes=np.unique(train_labels_array), y=train_labels_array)
class_weights = {i: w for i, w in enumerate(cw)}
print("Class weights:", class_weights)

# callbacks
checkpoint_path = "/content/cnn_lstm_best.h5"
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(checkpoint_path, save_best_only=True, monitor='val_auc', mode='max'),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6),
    tf.keras.callbacks.EarlyStopping(monitor='val_auc', mode='max', patience=6, restore_best_weights=True)
]

# Train (set epochs as needed)
EPOCHS = 12
history = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS, callbacks=callbacks, class_weight=class_weights)
model.save("/content/drive/MyDrive/deepfake_model_v1.h5")


In [None]:
# 11) Evaluate
model.load_weights(checkpoint_path)
y_true = []
y_pred = []
y_prob = []
for Xb, yb in test_gen:
    probs = model.predict(Xb)
    preds = (probs.ravel() > 0.5).astype(int)
    y_true.extend(yb.tolist())
    y_prob.extend(probs.ravel().tolist())
    y_pred.extend(preds.tolist())

from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
print(classification_report(y_true, y_pred, digits=4))
print("ROC AUC:", roc_auc_score(y_true, y_prob))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))

# Save final model
model.save("/content/cnn_lstm_final.h5")
print("Saved model to /content/cnn_lstm_final.h5")
