In [19]:
%pip install opencv-python
%pip install numpy
%pip install torch torchvision
%pip install wget
%pip install pytorchvideo
%pip install json

Note: you may need to restart the kernel to use updated packages.


ERROR: You must give at least one requirement to install (see "pip help install")


Note: you may need to restart the kernel to use updated packages.


ERROR: You must give at least one requirement to install (see "pip help install")


In [1]:
# IMPORTS ALL PACKAGE
import time
import cv2
import numpy as np
import torch
import json
import wget
import os

from pytorchvideo.data.encoded_video import EncodedVideo
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample
)

# IMPORT DETECTION PACKAGE
from PIL import Image, ImageDraw
from torchvision.models.detection import fasterrcnn_mobilenet_v3_large_320_fpn, FasterRCNN_MobileNet_V3_Large_320_FPN_Weights
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import pil_to_tensor, to_pil_image



In [2]:
# DOWNLOAD THE RECOGNITION MODEL

# Device on which to run the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Pick a pretrained model and load the pretrained weights
model_name = "x3d_xs"
model_recognition = torch.hub.load("facebookresearch/pytorchvideo", model=model_name, pretrained=True)

# Set to eval mode and move to desired device
model_recognition = model_recognition.eval()
model_recognition = model_recognition.to(device)

Using cache found in C:\Users\nyok/.cache\torch\hub\facebookresearch_pytorchvideo_main


In [3]:
# DOWNLOAD THE KINETIC-400 LABEL

url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json"
file_name = "kinetics_classnames.json"

if os.path.isfile(file_name):
    print(f"{file_name} is already in directory. Skipping Download")
else:
    print("Downloading kinetics__classnames")
    wget.download(url, "kinetics_classnames.json")

with open("kinetics_classnames.json", "r") as f:
    kinetics_classnames = json.load(f)
    labels = [line.strip() for line in kinetics_classnames]

# Create an id to label name mapping
kinetics_id_to_classname = {}
for k, v in kinetics_classnames.items():
    kinetics_id_to_classname[v] = str(k).replace('"', "")

print(labels[0:10], np.shape(labels))

kinetics_classnames.json is already in directory. Skipping Download
['"sharpening knives"', '"eating ice cream"', '"cutting nails"', '"changing wheel"', '"bench pressing"', 'deadlifting', '"eating carrots"', 'marching', '"throwing discus"', '"playing flute"'] (400,)


In [4]:
# TRANSFORM PARAMETER
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
frames_per_second = 6

# SPECIFIC PARAMETER FOR MODEL
model_transform_params = {
    "x3d_xs": {"side_size": 182, "crop_size": 182, "num_frames": 4, "sampling_rate": 12},
    "x3d_s": {"side_size": 182, "crop_size": 182, "num_frames": 13, "sampling_rate": 6},
    "x3d_m": {"side_size": 256, "crop_size": 256, "num_frames": 16, "sampling_rate": 5}
}

transform_params = model_transform_params[model_name]

transform = ApplyTransformToKey(
    key="video",
    transform=Compose(
        [
            UniformTemporalSubsample(transform_params["num_frames"]),
            Lambda(lambda x: x/255.0),
            NormalizeVideo((0.45, 0.45, 0.45), (0.225, 0.225, 0.225)),
            ShortSideScale(size=transform_params["side_size"]),
            CenterCropVideo(
                crop_size=(transform_params["crop_size"], transform_params["crop_size"])
            )
        ]
    ),
)

# UniformTemporalSubsample : Reduces the number of frames to the required num_frames by uniformly sampling.
# Lambda(lambda x: x/255.0): Normalizes pixel values to the range [0, 1].
# NormalizeVideo           : Applies mean and standard deviation normalization using the provided values.
# ShortSideScale           : Resizes the video frames so that the shorter side is of length side_size.
# CenterCropVideo          : Crops the center crop_size×crop_size region from each frame.

In [5]:
# COMBINE THE DETECTION AND RECOGNITION

# Step 1: Initialize model with the best available weights
weights = FasterRCNN_MobileNet_V3_Large_320_FPN_Weights.DEFAULT
model_detection = fasterrcnn_mobilenet_v3_large_320_fpn(weights=weights, box_score_thresh=0.9)
model_detection.eval()

# Step 2: Initialize the inference transforms
preprocess = weights.transforms()

In [6]:
# MAKE MODEL WITH ACCURACY SCORE FOR EACH FRAME

In [14]:
# DETECTION FUNCTION
def detects(resized_frame):
    # Convert OpenCV BGR frame to PIL Image
    pil_img = Image.fromarray(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB))

    # Convert PIL Image to tensor and add batch dimension
    img_tensor = pil_to_tensor(pil_img).unsqueeze(0)

    # Apply inference preprocessing transforms
    batch = [preprocess(img_tensor[0])]

    # Step 5: Perform object detection
    with torch.no_grad():
        prediction = model_detection(batch)[0]

    # Draw bounding boxes on the frame
    boxes = prediction["boxes"]
    box = draw_bounding_boxes(img_tensor[0], boxes=boxes, colors="red", width=2)

    # Convert tensor back to OpenCV format for display
    result_frame = cv2.cvtColor(np.array(to_pil_image(box.detach())), cv2.COLOR_RGB2BGR)
    return result_frame

In [15]:
def recognizes(video_data):
    # Move the inputs to the desired device
    inputs = video_data["video"].to(device).unsqueeze(0)

    print(f"Shape of inputs: {inputs.shape}")

    # Pass the input through the model
    with torch.no_grad():
        prediction = model_recognition(inputs).squeeze(0).softmax(0)
        # Get the top prediction (label with the highest score)
        label = prediction.argmax().item()
        score = prediction[label].item()

        # Print the top prediction and its confidence score
        print(f"Top prediction: {kinetics_id_to_classname[label]} ({100 * score:.2f}%)")
        pred_class = kinetics_id_to_classname[label]

    return pred_class, score

In [16]:
# MAIN PROCESSING FUNCTION
def process_videos(use_webcam=False, video_path=None):
    # Initialize webcam or video file
    if use_webcam:
        capture = cv2.VideoCapture(0)
    elif video_path:
        capture = cv2.VideoCapture(video_path)
    else:
        print("Error: Provide video path or function for using webcam")
        return
    if not capture.isOpened():
        print("Error: Could not open video source.")
        return
    
    input_size = (480, 480)
    frame_buffer = []
    frame_count = 0
    prev_time = 0
    start_time = time.time()   
    
    try:
        while True:
            ret, img = capture.read()
            if not ret:
                print("Error: Could not read frame.")
                break
            
            frame_count += 1
            
            # Skip frames based on the value of skip_frames
#             if frame_count % 2 != 0:
#                 continue 
            
            # Resize image based on input size
            resized_frame = cv2.resize(img, input_size)
            
            # Call detection function 
            result_frame = detects(resized_frame)
            
            # Add frame to buffer
            frame_buffer.append(cv2.cvtColor(result_frame, cv2.COLOR_BGR2RGB))
            if len(frame_buffer) >= transform_params["num_frames"]:
                try:
                    # Create a batch of repeated frames to match with required number for model
                    video_frames = frame_buffer[-transform_params["num_frames"]:]

                    # Convert to PyTorch tensor and permute the tensor into shape (C,T,H,W)
                    video_data = {"video": torch.tensor(np.array(video_frames)).permute(3, 0, 1, 2).float()}

                    print(f"Shape of video_data['video']: {video_data['video'].shape}")

                    # Apply the transform to normalize the input for model
                    video_data = transform(video_data)
                    
                    (pred_class, score) = recognizes(video_data)
                    
                    print(pred_class, score)
                    
                    cv2.putText(result_frame, f"{pred_class} ({10000 * score:.2f}%)", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 153, 0), 2)
                    153
#                     print(predict_class, score)

                    # Display the prediction on the video frame
#                     for i, class_name in enumerate(label):
#                         cv2.putText(result_frame, f"{class_name}", (20, 40 + i * 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                        
                except RuntimeError as e:
                    print(f"RuntimeError in action recognition: {e}")
                    print(f"Error occurred at frame {frame_count}")
        
        
            current_time = time.time()
            total_time = current_time - start_time  # Total elapsed time since the start
            
            # Calculate FPS (instantaneous and average)
            fps = 1 / (current_time - prev_time) if (current_time - prev_time) > 0 else 0
            average_fps = frame_count / total_time if total_time > 0 else 0
            prev_time = current_time
    
        
            cv2.putText(result_frame, f"FPS: {float(fps):.2f}", (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            cv2.putText(result_frame, f"Average FPS: {float(average_fps):.2f}", (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            
            # Show the video stream with predictions
            cv2.imshow('X3D-S Action Recognition', result_frame)
                            
            # Press 'Esc' to exit
            if cv2.waitKey(30) & 0xFF == 27:
                break
    finally:
        capture.release()
        cv2.destroyAllWindows()


In [17]:
# RUNNING THE MODEL WITH OR WITHOUT WEBCAM
# For webcam:
process_videos(use_webcam=True)

# For video file:
# process_videos(video_path="C:/Users/nyok/Desktop/OpenCV/Videos/fixinghairlive.MOV")

Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 182])
Top prediction: beatboxing (0.37%)
beatboxing 0.003670202801004052
Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 182])
Top prediction: beatboxing (0.33%)
beatboxing 0.003289933083578944
Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 182])
Top prediction: beatboxing (0.41%)
beatboxing 0.004067048896104097
Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 182])
Top prediction: playing harmonica (0.37%)
playing harmonica 0.0036784049589186907
Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 182])
Top prediction: eating burger (0.35%)
eating burger 0.003452070290222764
Shape of video_data['video']: torch.Size([3, 4, 480, 480])
Shape of inputs: torch.Size([1, 3, 4, 182, 18

In [None]:
import torch
print(torch.cuda.is_available())