In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, Lambda, RandomHorizontalFlip
from torchvision.transforms._transforms_video import CenterCropVideo
from pytorchvideo.data import LabeledVideoDataset, make_clip_sampler
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    UniformTemporalSubsample
)
from IPython.display import HTML, display
from base64 import b64encode
import torchmetrics
import time


# Define the model
class OurModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = torch.hub.load('facebookresearch/pytorchvideo', 'efficient_x3d_xs', pretrained=True)
        self.relu = nn.ReLU()
        self.linear = nn.Linear(400, 1)

    def forward(self, x):
        x = self.model(x)
        x = self.relu(x)
        x = self.linear(x)
        return x


# Define video transformation pipeline
video_transform = Compose([
    ApplyTransformToKey(
        key='video',
        transform=Compose([
            UniformTemporalSubsample(20),
            Lambda(lambda x: x / 255.0),
            Normalize((0.45, 0.45, 0.45), (0.225, 0.225, 0.225)),
            RandomShortSideScale(min_size=248, max_size=256),
            CenterCropVideo(224),
            RandomHorizontalFlip(p=0.5),
        ])
    ),
])

# Set up clip sampler
clip_sampler = make_clip_sampler("uniform", 2.0)

# Load the trained model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = OurModel()
model.load_state_dict(torch.load("trained_model/efficient_x3d_xs_finetuned.pth", map_location=device))
model.to(device)
model.eval()
print("Model loaded and ready.")


  from .autonotebook import tqdm as notebook_tqdm
Using cache found in C:\Users\bimsa/.cache\torch\hub\facebookresearch_pytorchvideo_main


Model loaded and ready.


In [2]:
def predict_video(video_path):
    start_total = time.time()
    
    # Create dataset and loader for a single video
    start_dataset = time.time()
    sample = [(video_path, {"label": 0})]
    dataset = LabeledVideoDataset(
        labeled_video_paths=sample,
        clip_sampler=clip_sampler,
        transform=video_transform,
        decode_audio=False,
        decoder="pyav"
    )
    loader = DataLoader(dataset, batch_size=1, num_workers=0)
    end_dataset = time.time()
    print(f"Dataset and DataLoader creation time: {end_dataset - start_dataset:.3f} seconds")
    
    # Load batch
    start_batch = time.time()
    batch = next(iter(loader))
    video_tensor = batch['video'].to(device)
    end_batch = time.time()
    print(f"Batch loading and tensor transfer to device time: {end_batch - start_batch:.3f} seconds")
    
    # Model inference
    start_infer = time.time()
    with torch.no_grad():
        logits = model(video_tensor)
        prob = torch.sigmoid(logits)
        pred = (prob > 0.5).long().item()
    end_infer = time.time()
    print(f"Model inference time: {end_infer - start_infer:.3f} seconds")
    
    end_total = time.time()
    print(f"Total prediction time: {end_total - start_total:.3f} seconds")
    
    return pred, prob.item()



def show_video(video_path, width=400):
    mp4 = open(video_path, 'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML(f"""
    <video width="{width}" controls>
        <source src="{data_url}" type="video/mp4">
    </video>
    """)


In [3]:
video_path = "unseen_testing_videos/t2.mp4"
pred, prob = predict_video(video_path)

class_names = ['NonViolence', 'Violence']
print(f"Predicted class: {class_names[pred]} (Score: {prob:.4f})")



Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 4.110 seconds
Model inference time: 22.924 seconds
Total prediction time: 27.054 seconds
Predicted class: NonViolence (Score: 0.0007)


In [15]:
import cv2
import threading
import time
import os
import tempfile

class_names = ['NonViolence', 'Violence']
latest_prediction = ("Analyzing...", 0.0)
lock = threading.Lock()

def predict_clip_thread(video_clip_path):
    global latest_prediction
    pred, prob = predict_video(video_clip_path)
    with lock:
        latest_prediction = (class_names[pred], prob)

def real_time_classify(video_path, output_path="output_prediction.mp4", fps=30, chunk_duration=3):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    actual_fps = cap.get(cv2.CAP_PROP_FPS)
    fps = actual_fps if fps is None else fps
    chunk_size = int(fps * chunk_duration)

    # Video writer for output
    out_writer = cv2.VideoWriter(
        output_path,
        cv2.VideoWriter_fourcc(*'mp4v'),
        fps,
        (width, height)
    )

    frame_buffer = []
    frame_count = 0
    clip_index = 0

    print("Starting real-time classification and saving...")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_buffer.append(frame)
        frame_count += 1

        # Every chunk_duration seconds, run prediction in background
        if frame_count == chunk_size:
            with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
                temp_path = temp_file.name

            temp_writer = cv2.VideoWriter(
                temp_path,
                cv2.VideoWriter_fourcc(*'mp4v'),
                fps,
                (frame.shape[1], frame.shape[0])
            )
            for f in frame_buffer:
                temp_writer.write(f)
            temp_writer.release()

            threading.Thread(target=predict_clip_thread, args=(temp_path,), daemon=True).start()

            frame_buffer = []
            frame_count = 0
            clip_index += 1

        # Overlay prediction label on frame
        label, score = latest_prediction
        display_frame = frame.copy()
        cv2.putText(display_frame, f"{label} ({score:.2f})", (10, 60),
                    cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 4)

        # Show and save the processed frame
        cv2.imshow("Real-time Classification", display_frame)
        out_writer.write(display_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out_writer.release()
    cv2.destroyAllWindows()
    print(f"Processing complete. Saved to: {output_path}")


In [16]:
video_path = "unseen_testing_videos/footage.mp4"
real_time_classify(video_path)

Starting real-time classification and saving...
Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 2.497 seconds
Model inference time: 0.416 seconds
Total prediction time: 2.919 seconds
Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 1.757 seconds
Model inference time: 0.318 seconds
Total prediction time: 2.115 seconds
Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 1.768 seconds
Model inference time: 0.152 seconds
Total prediction time: 1.926 seconds
Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 1.700 seconds
Model inference time: 0.395 seconds
Total prediction time: 2.105 seconds
Dataset and DataLoader creation time: 0.001 seconds
Batch loading and tensor transfer to device time: 1.694 seconds
Model inference time: 0.227 seconds
Total prediction time: 1.948 seconds
Dataset