In [None]:
!pip install torch==2.0.1 torchvision==0.15.2 pytorchvideo==0.1.5 ffmpeg-python opencv-python

In [None]:
import torch
from pytorchvideo.models.hub import slowfast_r50  # SlowFast model

# Load pre-trained SlowFast model
device = "cuda" if torch.cuda.is_available() else "cpu"
model = slowfast_r50(pretrained=True).to(device)
model.eval()  # Set to evaluation mode


In [None]:
from google.colab import files

uploaded = files.upload()  # Upload a video from your computer

# Get the uploaded filename
video_path = list(uploaded.keys())[0]
print(f"Uploaded video: {video_path}")


In [None]:
import ffmpeg
import numpy as np
import torch
from pytorchvideo.transforms import (
    ApplyTransformToKey, UniformTemporalSubsample, ShortSideScale
)
from torchvision.transforms import Compose
from torchvision.transforms._transforms_video import NormalizeVideo

device = "cuda" if torch.cuda.is_available() else "cpu"

def load_and_preprocess_video(video_path):
    # Probe the video file to get properties
    probe = ffmpeg.probe(video_path)
    video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)

    if video_stream is None:
        raise ValueError("No video stream found in file!")

    width, height = int(video_stream['width']), int(video_stream['height'])

    # Resize while keeping aspect ratio
    new_short_side = 256
    if height < width:
        new_height, new_width = new_short_side, int((new_short_side / height) * width)
    else:
        new_width, new_height = new_short_side, int((new_short_side / width) * height)

    # Decode video using FFmpeg
    out, _ = (
        ffmpeg.input(video_path)
        .filter('scale', new_width, new_height)
        .filter('fps', fps=10)  # Reduce FPS for efficiency
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .run(capture_stdout=True, quiet=True)
    )

    # Compute number of frames
    num_frames = len(out) // (new_height * new_width * 3)

    if num_frames == 0:
        raise ValueError("FFmpeg failed to extract frames. Check the video format!")

    # Reshape array correctly
    video_frames = np.frombuffer(out, np.uint8).reshape([num_frames, new_height, new_width, 3])

    # Convert to PyTorch tensor and permute to [C, T, H, W]
    video_tensor = torch.tensor(video_frames, dtype=torch.uint8).permute(0, 3, 1, 2).float() / 255.0  # [T, C, H, W]

    # Ensure correct number of frames (pad if needed)
    target_frames = 32  # Expected frame count
    if num_frames < target_frames:
        pad_frames = target_frames - num_frames
        padding = torch.zeros((pad_frames, 3, new_height, new_width))  # Padding with black frames
        video_tensor = torch.cat([video_tensor, padding], dim=0)  # Add padding frames

    # Permute to [C, T, H, W] before normalization
    video_tensor = video_tensor.permute(1, 0, 2, 3)  # Convert [T, C, H, W] -> [C, T, H, W]

    # Apply PyTorchVideo transforms
    transform = ApplyTransformToKey(
        key="video",
        transform=Compose([
            UniformTemporalSubsample(target_frames),  # Sample exactly 32 frames
            ShortSideScale(size=256),
            NormalizeVideo(mean=[0.45, 0.45, 0.45], std=[0.225, 0.225, 0.225]),  # Applied on [C, T, H, W]
        ])
    )

    video_tensor = transform({"video": video_tensor})["video"]

    return video_tensor.unsqueeze(0).to(device)  # Add batch dimension: [1, C, T, H, W]

# Load and preprocess video
video_tensor = load_and_preprocess_video(video_path)
print("Video Tensor Shape:", video_tensor.shape)  # Expected: [1, 3, 32, H, W]


In [None]:
def pack_pathway(video_tensor):
    """
    Convert the input video tensor into a list of tensors for SlowFast model.
    Slow Pathway -> Sample every 4th frame
    Fast Pathway -> Original frame rate
    """
    fast_pathway = video_tensor  # Keep full frame rate
    slow_pathway = torch.index_select(video_tensor, 2, torch.linspace(0, video_tensor.shape[2] - 1, video_tensor.shape[2] // 4).long().to(video_tensor.device))

    return [slow_pathway, fast_pathway]  # Return as a list

# Prepare the video for SlowFast model
video_tensor = load_and_preprocess_video(video_path)  # [1, 3, 32, H, W]
input_tensor = pack_pathway(video_tensor)  # List of two tensors

# Pass through model
with torch.no_grad():
    outputs = model(input_tensor)

# Get top-5 predictions
top5_probs, top5_classes = torch.topk(torch.softmax(outputs, dim=1), 5)
print("Top 5 Predicted Action Classes:", top5_classes.tolist())
print("Top 5 Probabilities:", top5_probs.tolist())


In [None]:
import urllib.request

# Download Kinetics-400 class labels
KINETICS_LABELS_URL = "https://raw.githubusercontent.com/deepmind/kinetics-i3d/master/data/label_map.txt"
with urllib.request.urlopen(KINETICS_LABELS_URL) as f:
    kinetics_labels = [line.decode('utf-8').strip() for line in f.readlines()]

# Print top 5 predicted actions
print("Top 5 Predicted Actions:")
for i in range(5):
    action_name = kinetics_labels[top5_classes[0, i].item()]
    print(f"{i+1}. {action_name} - Probability: {top5_probs[0, i].item():.4f}")


Display Video with Predictions

In [None]:
import cv2
import os

# Load the video using OpenCV
cap = cv2.VideoCapture(video_path)

# Get video properties
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Create a VideoWriter to save the processed video
output_path = "output_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

# Get the top predicted action
predicted_action = kinetics_labels[top5_classes[0, 0].item()]
probability = top5_probs[0, 0].item()

# Read and process video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR (OpenCV default) to RGB
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Overlay text: Predicted Action
    label = f"{predicted_action} ({probability:.2f})"
    cv2.putText(frame, label, (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    # Convert RGB back to BGR (for OpenCV writing)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    # Write frame to the output video
    out.write(frame)

# Release everything
cap.release()
out.release()

print("Processed video saved as:", output_path)
