In [1]:

import IPython
import sys

def clean_notebook():
    IPython.display.clear_output(wait=True)
    print("Notebook cleaned.")
!pip install facenet-pytorch --no-deps

!pip install pytubefix


# Clean up the notebook
clean_notebook()

Notebook cleaned.


In [1]:
import cv2
from IPython.display import display, Image, clear_output
import numpy as np
from pytubefix import YouTube
from facenet_pytorch import MTCNN
import torch
import time  # For adding delay

# Initialize MTCNN model
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
mtcnn = MTCNN(keep_all=True, device=device)

# YouTube video URL
video_url = "https://youtu.be/EsHQB9gT96k?si=bdi3dDrVnYVIgVYi"
video_url = "https://youtu.be/cmdMopdk6lo?si=vK7azxlZu4PKgiHW"

# Target width for resizing
target_width = 800
# Start and stop times in seconds
start_time = 75  # Start at 10 seconds
end_time = 120   # Stop at 120 seconds

# Download the video stream (use pytubefix to fetch the stream URL)
yt = YouTube(video_url)
video_stream = yt.streams.filter(file_extension='mp4', progressive=True).first()

if not video_stream:
    print("No compatible video stream found.")
    exit()

# Get the stream URL
stream_url = video_stream.url

# Open the YouTube stream in OpenCV
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("Cannot open YouTube video stream")
    exit()

# Set start time in the video (in milliseconds)
cap.set(cv2.CAP_PROP_POS_MSEC, start_time * 1000)

while cap.isOpened():
    clear_output(wait=True)  # Clear previous frame for smoother playback
    ret, frame = cap.read()
    
    if not ret:
        print("Stream stopped.")
        break

    # Get the current playback time in milliseconds
    current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000  # Convert to seconds
    
    # Stop playback if the end time is reached
    if current_time >= end_time:
        print("Reached the specified end time.")
        break

    # Convert frame to RGB format
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Detect faces
    boxes, _ = mtcnn.detect(rgb_frame)

    # Draw bounding boxes on the original frame
    if boxes is not None:
        for box in boxes:
            # Extract coordinates
            x1, y1, x2, y2 = map(int, box)
            # Draw rectangle around face
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

    # Get original dimensions
    original_height, original_width = frame.shape[:2]

    # Calculate aspect ratio and new dimensions
    aspect_ratio = original_height / original_width
    new_height = int(target_width * aspect_ratio)

    # Resize the frame while maintaining the aspect ratio
    resized_frame = cv2.resize(frame, (target_width, new_height))

    # Convert the resized frame to JPEG format for display in Jupyter
    _, buffer = cv2.imencode('.jpg', resized_frame)
    img_bytes = buffer.tobytes()

    # Display the resized frame in Jupyter Notebook
    display(Image(data=img_bytes))
  
    # Add a delay for smoother playback
    time.sleep(0.015)  # Delay in seconds

cap.release()
print("Video stream ended.")


Reached the specified end time.
Video stream ended.


# Face panel

In [2]:
import cv2
from IPython.display import display, Image, clear_output
import numpy as np
from pytubefix import YouTube
from facenet_pytorch import MTCNN
import torch
import time

# Initialize MTCNN model
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
mtcnn = MTCNN(keep_all=True, device=device)

# YouTube video URL
video_url = "https://youtu.be/EsHQB9gT96k?si=bdi3dDrVnYVIgVYi"
video_url = "https://youtu.be/cmdMopdk6lo?si=vK7azxlZu4PKgiHW"

# Target width for resizing (main video area)
target_width = 800

# Face panel settings
face_panel_width = 150  # Width of the face panel on the right
face_thumbnail_size = 120  # Size of each face thumbnail
max_faces_display = 4  # Number of faces to display vertically

# Start and stop times in seconds
start_time = 75
end_time = 120

# Store recent detected faces
recent_faces = []

# Download the video stream
yt = YouTube(video_url)
video_stream = yt.streams.filter(file_extension='mp4', progressive=True).first()

if not video_stream:
    print("No compatible video stream found.")
    exit()

stream_url = video_stream.url

# Open the YouTube stream in OpenCV
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("Cannot open YouTube video stream")
    exit()

# Set start time in the video
cap.set(cv2.CAP_PROP_POS_MSEC, start_time * 1000)

def extract_face(frame, box, margin=20):
    """Extract face region from frame with some margin"""
    h, w = frame.shape[:2]
    x1, y1, x2, y2 = map(int, box)
    
    # Add margin
    x1 = max(0, x1 - margin)
    y1 = max(0, y1 - margin)
    x2 = min(w, x2 + margin)
    y2 = min(h, y2 + margin)
    
    face = frame[y1:y2, x1:x2]
    
    if face.size == 0:
        return None
    
    # Resize to square thumbnail
    face_resized = cv2.resize(face, (face_thumbnail_size, face_thumbnail_size))
    return face_resized

def create_face_panel(faces, panel_height):
    """Create a vertical panel with detected faces"""
    # Create dark gray panel
    panel = np.ones((panel_height, face_panel_width, 3), dtype=np.uint8) * 40
    
    # Calculate vertical spacing
    total_face_height = max_faces_display * face_thumbnail_size
    spacing = (panel_height - total_face_height) // (max_faces_display + 1)
    spacing = max(10, spacing)  # Minimum spacing of 10 pixels
    
    # Add title
    cv2.putText(panel, "Detected", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, (255, 255, 255), 1)
    cv2.putText(panel, "Faces", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, (255, 255, 255), 1)
    
    # Starting y position (after title)
    start_y = 60
    
    # Draw each face
    for i, face in enumerate(faces[:max_faces_display]):
        if face is not None:
            y_pos = start_y + i * (face_thumbnail_size + spacing)
            x_pos = (face_panel_width - face_thumbnail_size) // 2
            
            # Make sure we don't exceed panel bounds
            if y_pos + face_thumbnail_size <= panel_height:
                # Add border around face
                cv2.rectangle(panel, 
                            (x_pos - 2, y_pos - 2), 
                            (x_pos + face_thumbnail_size + 2, y_pos + face_thumbnail_size + 2),
                            (0, 255, 0), 2)
                
                # Place face in panel
                panel[y_pos:y_pos + face_thumbnail_size, 
                      x_pos:x_pos + face_thumbnail_size] = face
                
                # Add face number
                cv2.putText(panel, f"#{i+1}", (x_pos, y_pos - 5), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1)
    
    return panel

while cap.isOpened():
    clear_output(wait=True)
    ret, frame = cap.read()
    
    if not ret:
        print("Stream stopped.")
        break
    
    current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
    
    if current_time >= end_time:
        print("Reached the specified end time.")
        break
    
    # Convert frame to RGB for face detection
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # Detect faces
    boxes, probs = mtcnn.detect(rgb_frame)
    
    # Process detected faces
    if boxes is not None:
        # Sort by confidence if available, otherwise by size
        if probs is not None:
            # Sort by probability (highest first)
            sorted_indices = np.argsort(probs)[::-1]
            boxes = boxes[sorted_indices]
        
        # Clear recent faces and add new ones
        recent_faces = []
        
        for i, box in enumerate(boxes):
            x1, y1, x2, y2 = map(int, box)
            
            # Draw rectangle on main frame
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Add face number label
            cv2.putText(frame, f"#{i+1}", (x1, y1 - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            
            # Extract face for panel
            face = extract_face(frame, box)
            if face is not None:
                recent_faces.append(face)
    
    # Get original dimensions
    original_height, original_width = frame.shape[:2]
    
    # Calculate new dimensions for main video
    aspect_ratio = original_height / original_width
    new_height = int(target_width * aspect_ratio)
    
    # Resize the main frame
    resized_frame = cv2.resize(frame, (target_width, new_height))
    
    # Create face panel with matching height
    face_panel = create_face_panel(recent_faces, new_height)
    
    # Combine main frame and face panel horizontally
    combined_frame = np.hstack([resized_frame, face_panel])
    
    # Add timestamp overlay
    cv2.putText(combined_frame, f"Time: {current_time:.1f}s", (10, 30),
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    
    # Add face count
    face_count = len(recent_faces) if recent_faces else 0
    cv2.putText(combined_frame, f"Faces: {face_count}", (10, 60),
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    
    # Convert to JPEG for display
    _, buffer = cv2.imencode('.jpg', combined_frame)
    img_bytes = buffer.tobytes()
    
    # Display in Jupyter Notebook
    display(Image(data=img_bytes))
    
    # Delay for smoother playback
    time.sleep(0.015)

cap.release()
print("Video stream ended.")

Reached the specified end time.
Video stream ended.
