In [None]:
!pip install torch==2.1.0 torchvision==0.16.0 --upgrade --quiet
!pip install pytorchvideo accelerate --upgrade --quiet

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install \
    python-dotenv \
    opencv-python \
    timesformer-pytorch \
    transformers==4.48.0 \
    accelerate \
    pytorchvideo \
    imageio \
    ipython \
    evaluate \
    decord \
    av

In [None]:
!pip install numpy==1.26.1 --upgrade --quiet

In [None]:
import os
import av

import torch
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
import pytorchvideo.data

import imageio
import numpy as np
from IPython.display import Image

from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    RemoveKey,
    ShortSideScale,
    UniformTemporalSubsample,
)

from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    RandomHorizontalFlip,
    Resize,
)

import evaluate
from transformers import TrainingArguments, Trainer

In [None]:
import av

import torch
from pytorchvideo.transforms import UniformTemporalSubsample
from torch.utils.data import Dataset
from pytorchvideo.data.encoded_video import EncodedVideo
from typing import Callable, Dict

class DeceptionDataset(Dataset):
    def __init__(
        self,
        video_label_map: Dict[str, int],
        transform: Callable = None,
        num_frames: int = 16,
    ):
        self.video_label_map = video_label_map
        self.video_paths = list(video_label_map.keys())
        self.transform = transform
        self.num_frames = num_frames
        self.num_videos = len(video_label_map)

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, index):
        video_path = self.video_paths[index]
        label = self.video_label_map[video_path]

        video = EncodedVideo.from_path(video_path, decoder="decord")
        video_data = video.get_clip(0, video.duration)
        video_tensor = video_data['video']  # shape: (C, T, H, W)

        if video_tensor is None:
            raise ValueError(f"Could not load video: {video_path}")

        C, T, H, W = video_tensor.shape
        if T < self.num_frames:
            pad = video_tensor[:, -1:].repeat(1, self.num_frames - T, 1, 1)
            video_tensor = torch.cat([video_tensor, pad], dim=1)

        # Subsample T frames uniformly
        video_clip = UniformTemporalSubsample(self.num_frames)(video_tensor)  # still (C, T, H, W)

        if self.transform:
            video_clip = self.transform({"video": video_clip})["video"]

        return {
            "video": video_clip,  # torch.Tensor of shape (C, T, H, W)
            "label": label        # 0 (truth) or 1 (lie)
        }


In [None]:
import os
from typing import Dict


def build_clip_label_map(video_clips_dir: str) -> Dict[str, int]:
    """
    Maps each video clip path to its deception label (0 = truth, 1 = lie)
    based solely on its filename (e.g., trial_lie_002_002.mp4).

    Args:
        video_clips_dir (str): Path to directory containing segmented video clips.

    Returns:
        Dict[str, int]: Dictionary mapping each clip path to its numeric label.
    """
    clip_to_label = {}
    for fname in sorted(os.listdir(video_clips_dir)):  # <- added sorted() here
        if not fname.endswith(".mp4"):
            continue

        parts = fname.split('_')
        if len(parts) < 3:
            print(f"Skipping malformed filename: {fname}")
            continue

        label_str = parts[1]
        label = 1 if label_str.lower() == "lie" else 0
        path = os.path.join(video_clips_dir, fname)
        clip_to_label[path] = label

    return clip_to_label


In [None]:
ROOT_DIR = "/content/drive/MyDrive/deception_detection"
video_clips_dir_train = os.path.join(ROOT_DIR, "train")
video_clips_dir_val = os.path.join(ROOT_DIR, "val")
video_clips_dir_test = os.path.join(ROOT_DIR, "test")

clip_to_label_train = build_clip_label_map(video_clips_dir_train)
clip_to_label_val = build_clip_label_map(video_clips_dir_val)
clip_to_label_test = build_clip_label_map(video_clips_dir_test)

class_labels = {"truth", "lie"}
label2id = {label: i for i, label in enumerate(class_labels)}
id2label = {i: label for label, i in label2id.items()}

In [None]:
model_ckpt = "MCG-NJU/videomae-large" #OpenGVLab/VideoMAEv2-Huge" #"MCG-NJU/videomae-base"
image_processor = VideoMAEImageProcessor.from_pretrained(model_ckpt)
model = VideoMAEForVideoClassification.from_pretrained(
    model_ckpt,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True,
)

In [None]:
mean = image_processor.image_mean
std = image_processor.image_std
if "shortest_edge" in image_processor.size:
    height = width = image_processor.size["shortest_edge"]
else:
    height = image_processor.size["height"]
    width = image_processor.size["width"]
resize_to = (height, width)

num_frames_to_sample = model.config.num_frames
#num_frames_to_sample = 16
print(num_frames_to_sample)
sample_rate = 4
fps = 30
clip_duration = num_frames_to_sample * sample_rate / fps

In [None]:
train_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    RandomShortSideScale(min_size=256, max_size=320),
                    RandomCrop(resize_to),
                    RandomHorizontalFlip(p=0.5),
                ]
            ),
        ),
    ]
)


train_dataset = DeceptionDataset(
    video_label_map=clip_to_label_train,
    transform=train_transform,
    num_frames=num_frames_to_sample
)

In [None]:
val_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    Resize(resize_to),
                ]
            ),
        ),
    ]
)

val_dataset = DeceptionDataset(
    video_label_map=clip_to_label_val,
    transform=val_transform
)

test_dataset = DeceptionDataset(
    video_label_map=clip_to_label_test,
    transform=val_transform
)

In [None]:
def unnormalize_img(img):
    """Un-normalizes the image pixels."""
    img = (img * std) + mean
    img = (img * 255).astype("uint8")
    return img.clip(0, 255)


def create_gif(video_tensor, filename="sample.gif"):
    """Prepares a GIF from a video tensor.
    The video tensor is expected to have the following shape:
    (num_frames, num_channels, height, width).
    """
    frames = []
    for video_frame in video_tensor:
        frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy())
        frames.append(frame_unnormalized)
    kargs = {"duration": 0.25}
    imageio.mimsave(filename, frames, "GIF", **kargs)
    return filename


def display_gif(video_tensor, gif_name="sample.gif"):
    """Prepares and displays a GIF from a video tensor."""
    video_tensor = video_tensor.permute(1, 0, 2, 3)
    gif_filename = create_gif(video_tensor, gif_name)
    return Image(filename=gif_filename)

In [None]:
model_name = model_ckpt.split("/")[-1]
new_model_name = f"{model_name}-finetuned-deception-dataset_v2"
num_epochs = 1
batch_size = 5
gradient_accumulation_steps = 8

args = TrainingArguments(
    new_model_name,
    remove_unused_columns=False,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=True,
    max_steps=(train_dataset.num_videos // batch_size) * num_epochs,
)

In [None]:
metric = evaluate.load("accuracy")


def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)


def collate_fn(examples):
    # permute to (num_frames, num_channels, height, width)
    pixel_values = torch.stack(
        [example["video"].permute(1, 0, 2, 3) for example in examples]
    )
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}


In [None]:
from huggingface_hub import notebook_login
notebook_login()


In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    processing_class=image_processor,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
)

In [None]:
train_results = trainer.train()
trainer.save_model("/content/drive/MyDrive/my_finetuned_model-mae-large-v2")

In [None]:
import torch
from tqdm import tqdm

def evaluate_direct_model(model, dataloader, device: str = "cuda"):
    """
    Evaluate a Hugging Face video model directly on a tensor-based dataloader.

    Args:
        model (torch.nn.Module): Pretrained Hugging Face model loaded via AutoModelForVideoClassification.
        dataloader (DataLoader): PyTorch dataloader yielding {"video": tensor, "label": int}.
        device (str): "cuda" or "cpu".

    Returns:
        float: Accuracy over the dataset.
    """
    model = model.to(device)
    model.eval()

    correct = 0
    total = 0

    for sample in tqdm(dataloader):
        video = sample["video"].squeeze(0)  # (C, T, H, W)
        label = sample["label"]

        # Prepare input
        video = video.permute(1, 0, 2, 3)  # (T, C, H, W)
        inputs = {
            "pixel_values": video.unsqueeze(0).to(device)  # add batch dim
        }

        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits
            pred_label_id = logits.argmax(dim=-1).item()

        if pred_label_id == label:
            correct += 1
        total += 1

    accuracy = correct / total
    print(f"Direct model accuracy: {accuracy:.4f}")
    return accuracy


In [None]:
import torch
from tqdm import tqdm
from collections import defaultdict, Counter
from pathlib import Path

def extract_scene_id_from_path(video_path: str) -> str:
    """
    Extracts a scene/group ID from the video filename.
    Assumes filenames follow pattern: trial_lie_002_003.mp4 → scene ID: trial_lie_002
    """
    filename = Path(video_path).stem  # e.g., trial_lie_002_003
    parts = filename.split('_')
    return '_'.join(parts[:3])  # trial + lie + 002 = trial_lie_002

def evaluate_scene_level_model(model, dataloader, video_paths, device: str = "cuda"):
    """
    Evaluates model using majority voting at the scene level.

    Args:
        model: Hugging Face video classification model.
        dataloader: DataLoader that yields {"video": tensor, "label": int}.
        video_paths: Ordered list of video file paths from the dataset.
        device: "cuda" or "cpu"

    Returns:
        float: scene-level accuracy
    """
    model = model.to(device)
    model.eval()

    scene_preds = defaultdict(list)
    scene_labels = {}

    for idx, sample in enumerate(tqdm(dataloader)):
        video = sample["video"].squeeze(0)  # (C, T, H, W)
        label = sample["label"]
        video_path = video_paths[idx]
        scene_id = extract_scene_id_from_path(video_path)

        video = video.permute(1, 0, 2, 3)  # (T, C, H, W)
        inputs = {"pixel_values": video.unsqueeze(0).to(device)}

        with torch.no_grad():
            # logits = model(**inputs).logits
            outputs = model(**inputs, output_attentions=True)
            logits = outputs.logits.squeeze(0).cpu()
            attentions = outputs.attentions  # List of attention tensors
            pred_label = logits.argmax(dim=-1).item()

        scene_preds[scene_id].append(pred_label)
        scene_labels[scene_id] = label  # Set once per scene

    correct = 0
    for scene_id, preds in scene_preds.items():
        majority = Counter(preds).most_common(1)[0][0]
        true = scene_labels[scene_id]
        if majority == true:
            correct += 1

    total = len(scene_preds)
    accuracy = correct / total
    print(f"Scene-level accuracy (majority vote): {accuracy:.4f}")
    return accuracy


In [None]:
import torch
from tqdm import tqdm
from collections import defaultdict
from pathlib import Path

def extract_scene_id_from_path(video_path: str) -> str:
    """
    Extracts a scene/group ID from the video filename.
    Assumes filenames follow pattern: trial_lie_002_003.mp4 → scene ID: trial_lie_002
    """
    filename = Path(video_path).stem
    parts = filename.split('_')
    return '_'.join(parts[:3])

def evaluate_scene_level_model_logit_sum(model, dataloader, video_paths, device: str = "cuda"):
    """
    Evaluates model using confidence (logit) sum voting at the scene level.

    Args:
        model: Hugging Face video classification model.
        dataloader: DataLoader yielding {"video": tensor, "label": int}.
        video_paths: Ordered list of video file paths from the dataset.
        device: "cuda" or "cpu"

    Returns:
        float: Scene-level accuracy
    """
    model = model.to(device)
    model.eval()

    scene_logits = defaultdict(list)
    scene_labels = {}

    for idx, sample in enumerate(tqdm(dataloader)):
        video = sample["video"].squeeze(0)  # (C, T, H, W)
        label = sample["label"]
        video_path = video_paths[idx]
        scene_id = extract_scene_id_from_path(video_path)

        video = video.permute(1, 0, 2, 3)  # (T, C, H, W)
        inputs = {"pixel_values": video.unsqueeze(0).to(device)}

        with torch.no_grad():
            logits = model(**inputs).logits.squeeze(0).cpu()  # (num_classes,)

        scene_logits[scene_id].append(logits)
        scene_labels[scene_id] = label

    correct = 0
    for scene_id, logits_list in scene_logits.items():
        total_logits = torch.stack(logits_list).sum(dim=0)  # (num_classes,)
        final_prediction = total_logits.argmax().item()
        true_label = scene_labels[scene_id]

        if final_prediction == true_label:
            correct += 1
        else:
          print(scene_id)

    total = len(scene_logits)
    accuracy = correct / total
    print(f"Scene-level accuracy (logit-sum voting): {accuracy:.4f}")
    return accuracy


In [None]:
import torch
from tqdm import tqdm
from collections import defaultdict
from pathlib import Path

def extract_scene_id_from_path(video_path: str) -> str:
    """
    Extracts a scene/group ID from the video filename.
    Assumes filenames follow pattern: trial_lie_002_003.mp4 → scene ID: trial_lie_002
    """
    filename = Path(video_path).stem
    parts = filename.split('_')
    return '_'.join(parts[:3])

def evaluate_scene_level_model_softmax_sum(model, dataloader, video_paths, device: str = "cuda"):
    """
    Evaluates model using probability (softmax) sum voting at the scene level.

    Args:
        model: Hugging Face video classification model.
        dataloader: DataLoader yielding {"video": tensor, "label": int}.
        video_paths: Ordered list of video file paths from the dataset.
        device: "cuda" or "cpu"

    Returns:
        float: Scene-level accuracy
    """
    model = model.to(device)
    model.eval()

    scene_logits = defaultdict(list)
    scene_labels = {}

    for idx, sample in enumerate(tqdm(dataloader)):
        video = sample["video"].squeeze(0)  # (C, T, H, W)
        label = sample["label"]
        video_path = video_paths[idx]
        scene_id = extract_scene_id_from_path(video_path)

        video = video.permute(1, 0, 2, 3)  # (T, C, H, W)
        inputs = {"pixel_values": video.unsqueeze(0).to(device)}

        with torch.no_grad():
            logits = torch.nn.functional.softmax(model(**inputs).logits.squeeze(0), dim=-1).cpu()

        scene_logits[scene_id].append(logits)
        scene_labels[scene_id] = label

    correct = 0
    for scene_id, logits_list in scene_logits.items():
        total_logits = torch.stack(logits_list).sum(dim=0)  # (num_classes,)
        final_prediction = total_logits.argmax().item()
        true_label = scene_labels[scene_id]

        if final_prediction == true_label:
            correct += 1

    total = len(scene_logits)
    accuracy = correct / total
    print(f"Scene-level accuracy (softmax-sum voting): {accuracy:.4f}")
    return accuracy


In [None]:
import torch
from tqdm import tqdm
from collections import defaultdict, Counter
from pathlib import Path
import torch.nn.functional as F

def extract_scene_id_from_path(video_path: str) -> str:
    filename = Path(video_path).stem
    parts = filename.split('_')
    return '_'.join(parts[:3])

def evaluate_scene_level_model_topk_confidence(model, dataloader, video_paths, threshold=0.7, device: str = "cuda"):
    """
    Scene-level evaluation using top-1 voting only for confident predictions (above threshold).

    Args:
        model: Hugging Face video classification model.
        dataloader: PyTorch DataLoader.
        video_paths: List of video file paths (ordered).
        threshold: Min softmax confidence to count a chunk vote.
        device: "cuda" or "cpu".

    Returns:
        float: Scene-level accuracy.
    """
    model = model.to(device)
    model.eval()

    scene_confident_preds = defaultdict(list)
    scene_labels = {}

    for idx, sample in enumerate(tqdm(dataloader)):
        video = sample["video"].squeeze(0)  # (C, T, H, W)
        label = sample["label"]
        video_path = video_paths[idx]
        scene_id = extract_scene_id_from_path(video_path)

        video = video.permute(1, 0, 2, 3)  # (T, C, H, W)
        inputs = {"pixel_values": video.unsqueeze(0).to(device)}

        with torch.no_grad():
            logits = model(**inputs).logits.squeeze(0)  # (num_classes,)
            probs = F.softmax(logits, dim=-1)
            conf, pred = probs.max(dim=-1)

        if conf.item() >= threshold:
            scene_confident_preds[scene_id].append(pred.item())

        scene_labels[scene_id] = label

    correct = 0
    total = 0
    for scene_id, confident_preds in scene_confident_preds.items():
        true = scene_labels[scene_id]
        if confident_preds:
            majority = Counter(confident_preds).most_common(1)[0][0]
        else:
            majority = true  # pessimistic fallback

        if majority == true:
            correct += 1
        total += 1

    accuracy = correct / total if total > 0 else 0.0
    print(f"Scene-level accuracy (top-k voting, threshold={threshold}): {accuracy:.4f}")
    return accuracy


In [None]:
from transformers import AutoProcessor, AutoModelForVideoClassification

model_name = "NiklasTUM/videomae-large-finetuned-deception-dataset_v2"#"NiklasTUM/videomae-base-finetuned-deception-dataset"
model = AutoModelForVideoClassification.from_pretrained(model_name).to("cuda")
model.eval()
evaluate_direct_model(model=model, dataloader=test_dataset)

In [None]:
from transformers import AutoProcessor, AutoModelForVideoClassification

model_name = "NiklasTUM/videomae-large-finetuned-deception-dataset_v2"#"NiklasTUM/videomae-base-finetuned-deception-dataset"
model = AutoModelForVideoClassification.from_pretrained(model_name).to("cuda")
model.eval()

video_paths = list(clip_to_label_test.keys())  # ordered!

# Then later during eval:
accuracy = evaluate_scene_level_model(
    model=model,
    dataloader=test_dataset,
    video_paths=video_paths,
    device="cuda"
)

In [None]:
from transformers import AutoProcessor, AutoModelForVideoClassification

model_name = "NiklasTUM/videomae-base-finetuned-deception-dataset"
model = AutoModelForVideoClassification.from_pretrained(model_name).to("cuda")
model.eval()

video_paths = list(clip_to_label_test.keys())  # ordered!

# Then later during eval:
accuracy = evaluate_scene_level_model_logit_sum(
    model=model,
    dataloader=test_dataset,
    video_paths=video_paths,
    device="cuda"
)

In [None]:
from transformers import AutoProcessor, AutoModelForVideoClassification

model_name = "NiklasTUM/videomae-base-finetuned-deception-dataset"
model = AutoModelForVideoClassification.from_pretrained(model_name).to("cuda")
model.eval()

video_paths = list(clip_to_label_test.keys())  # ordered!

# Then later during eval:
accuracy = evaluate_scene_level_model_softmax_sum(
    model=model,
    dataloader=test_dataset,
    video_paths=video_paths,
    device="cuda"
)

In [None]:
from transformers import AutoProcessor, AutoModelForVideoClassification

model_name = "NiklasTUM/videomae-base-finetuned-deception-dataset"
model = AutoModelForVideoClassification.from_pretrained(model_name).to("cuda")
model.eval()

video_paths = list(clip_to_label_test.keys())  # ordered!

# Then later during eval:
accuracy = evaluate_scene_level_model_topk_confidence(
    model=model,
    dataloader=test_dataset,
    video_paths=video_paths,
    threshold=0.6,
    device="cuda"
)