# Live video feed test with slowfast : LAB 1 Bonus

> Author : Hugo BABIN-RIBY

Meant to run on windows.

Runs on CPU !

In [None]:
%pip install opencv-python
%pip install --upgrade pip
%pip install torch
%pip install pytorchvideo
%pip install --upgrade pytorchvideo
%pip install torchvision

In [None]:
import cv2
import torch

cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Could not open webcam")
try:
    while True:
        ret, frame = cap.read()
        print(torch.tensor(frame))
        cv2.imshow('Webcam Feed', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break  
finally:
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# import slowfast
import torch
# Choose the `slowfast_r50` model 
model = torch.hub.load('facebookresearch/pytorchvideo', 'slowfast_r50', pretrained=True)


In [None]:
from typing import Dict
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.data.encoded_video import EncodedVideo
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample,
    UniformCropVideo
) 

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
model = model.eval()
model = model.to(device)

In [5]:
json_url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json"
json_filename = "kinetics_classnames.json"
try: urllib.URLopener().retrieve(json_url, json_filename)
except: urllib.request.urlretrieve(json_url, json_filename)

with open(json_filename, "r") as f:
    kinetics_classnames = json.load(f)

# Create an id to label name mapping
kinetics_id_to_classname = {}
for k, v in kinetics_classnames.items():
    kinetics_id_to_classname[v] = str(k).replace('"', "")


In [6]:
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 256
num_frames = 32
sampling_rate = 2
frames_per_second = 30
slowfast_alpha = 4
num_clips = 10
num_crops = 3

class PackPathway(torch.nn.Module):
    """
    Transform for converting video frames as a list of tensors. 
    """
    def __init__(self):
        super().__init__()
        
    def forward(self, frames: torch.Tensor):
        fast_pathway = frames
        # Perform temporal sampling from the fast pathway.
        slow_pathway = torch.index_select(
            frames,
            1,
            torch.linspace(
                0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
            ).long(),
        )
        frame_list = [slow_pathway.unsqueeze(0), fast_pathway.unsqueeze(0)]
        return frame_list

transform=Compose(
        [
            UniformTemporalSubsample(num_frames),
            Lambda(lambda x: x/255.0),
            NormalizeVideo(mean, std),
            ShortSideScale(
                size=side_size
            ),
            CenterCropVideo(crop_size),
            PackPathway()
        ]
    )

# The duration of the input clip is also specific to the model.
clip_duration = (num_frames * sampling_rate)/frames_per_second


In [None]:
import cv2
import torch
import time
from torchvision import transforms
from collections import deque
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

class WebcamProcessor:
    def __init__(self, model, device, transform, buffer_size=32):
        self.model = model
        self.device = device
        self.transform = transform
        self.buffer_size = buffer_size
        self.frame_buffer = deque(maxlen=buffer_size)
        self.pred_text = ""
        
    def process_frame(self, frame):
        """Convert OpenCV BGR frame to tensor and normalize"""
        return torch.tensor(frame)
        
    def process_buffer(self):
        """Process the accumulated frames with the model"""
        frames = torch.stack(list(self.frame_buffer))
        frames = frames.permute((3,0,1,2))
        # print(frames.shape) # Dimensions : [C,T,H,W]
        inputs = self.transform(frames)
        # print(inputs[0].shape, inputs[1].shape)
        with torch.no_grad():
            predictions = self.model(inputs)
        # clear the buffer
        del self.frame_buffer
        self.frame_buffer = deque(maxlen=self.buffer_size)
        return predictions
    
    def create_text_display(self, text, width=500, height=50):
        # Create a black image for text
        text_display = np.zeros((height, width, 3), np.uint8)
        
        # Add text to the image
        font = cv2.FONT_HERSHEY_COMPLEX
        font_scale = 0.5
        font_thickness = 1
        font_color = (255, 255, 255)  # White text
        
        # Calculate text size and position to center it
        (text_width, text_height), _ = cv2.getTextSize(text, font, font_scale, font_thickness)
        text_x = (width - text_width) // 2
        text_y = (height + text_height) // 2
        
        cv2.putText(text_display, text, (text_x, text_y), font, font_scale, font_color)
        return text_display
        
    def start_capture(self):
        cap = cv2.VideoCapture(0)
        
        if not cap.isOpened():
            print("Error: Could not open webcam")
            return
        
        frame_counter = 0
        
        try:
            while True:
                ret, frame = cap.read()

                if frame_counter % 2 == 0: #process 1/2 of frames
                    processed_frame = self.process_frame(frame)
                    self.frame_buffer.append(processed_frame)
                    
                    # Process buffer if we have enough frames
                    if len(self.frame_buffer) == self.frame_buffer.maxlen:
                        predictions = self.process_buffer()
                        # Get the predicted classes
                        post_act = torch.nn.Softmax(dim=1)
                        preds = post_act(predictions)
                        pred_classes = preds.topk(k=3).indices[0]

                        # Map the predicted classes to the label names
                        pred_class_names = [kinetics_id_to_classname[int(i)] for i in pred_classes]
                        self.pred_text = ("T3 preds: %s" % ", ".join(pred_class_names))
                
                # Create text display
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                text_display = self.create_text_display(self.pred_text, width=frame_width)
                
                # Combine frame and text display vertically
                combined_display = np.vstack((frame, text_display))
                
                # Show combined display
                cv2.imshow('Webcam Feed with Predictions', combined_display)
                frame_counter += 1
                
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                    
        finally:
            cap.release()
            cv2.destroyAllWindows()

model = model.to(device) 
model.eval()

processor = WebcamProcessor(
    model=model,
    device=device,
    transform=transform,
    buffer_size=num_frames
)

# Start processing
processor.start_capture()