<a href="https://colab.research.google.com/github/al69114/blank-app/blob/main/Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install opencv-python numpy tqdm pillow inference-sdk yt-dlp

Collecting inference-sdk
  Downloading inference_sdk-0.46.1-py3-none-any.whl.metadata (20 kB)
Collecting yt-dlp
  Downloading yt_dlp-2025.3.31-py3-none-any.whl.metadata (172 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.2/172.2 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
Collecting dataclasses-json~=0.6.0 (from inference-sdk)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting opencv-python
  Downloading opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting supervision<=0.30.0,>=0.25.1 (from inference-sdk)
  Downloading supervision-0.25.1-py3-none-any.whl.metadata (14 kB)
Collecting aiohttp<=3.10.11,>=3.9.0 (from inference-sdk)
  Downloading aiohttp-3.10.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.7 kB)
Collecting backoff~=2.2.0 (from inference-sdk)
  Downloading backoff-2.2.1-py3-none-any.whl.metadata (14 kB)
Collecting marshmallow<4.0.0,>=3.18

In [10]:
import os
import cv2
import numpy as np
from datetime import datetime
import time
from tqdm import tqdm
import io
from PIL import Image
import traceback
import base64

def download_youtube_video(youtube_url, output_path="videos"):
    """
    Downloads a YouTube video using yt_dlp Python package

    Args:
        youtube_url (str): URL of the YouTube video
        output_path (str): Directory to save the video to

    Returns:
        str: Path to the downloaded video file
    """
    try:
        # Create output directory if it doesn't exist
        if not os.path.exists(output_path):
            os.makedirs(output_path)

        # Generate a timestamp for unique filename
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_file = os.path.join(output_path, f"video_{timestamp}.mp4")

        print(f"Downloading video from: {youtube_url}")

        # Method 1: Try using yt_dlp as a Python package
        try:
            from yt_dlp import YoutubeDL

            ydl_opts = {
                'format': 'best[ext=mp4]',
                'outtmpl': output_file,
                'quiet': False,
                'no_warnings': False,
                'ignoreerrors': False,
            }

            with YoutubeDL(ydl_opts) as ydl:
                ydl.download([youtube_url])
                print(f"Download complete with yt_dlp: {output_file}")
                return output_file
        except ImportError:
            print("yt_dlp not installed as Python package. Trying pytube...")

            # Method 2: Try using pytube
            try:
                from pytube import YouTube

                yt = YouTube(youtube_url)
                video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()

                if video:
                    video.download(output_path=output_path, filename=os.path.basename(output_file))
                    print(f"Download complete with pytube: {output_file}")
                    return output_file
                else:
                    print("No suitable video stream found.")

            except ImportError:
                print("pytube not installed. Trying youtube_dl...")

                # Method 3: Try using youtube_dl
                try:
                    import youtube_dl

                    youtube_dl_opts = {
                        'format': 'best[ext=mp4]',
                        'outtmpl': output_file,
                    }

                    with youtube_dl.YoutubeDL(youtube_dl_opts) as ydl:
                        ydl.download([youtube_url])
                        print(f"Download complete with youtube_dl: {output_file}")
                        return output_file
                except ImportError:
                    print("No YouTube download packages found.")
                    raise ImportError("Please install one of: yt-dlp, pytube, or youtube-dl")
                except Exception as e:
                    print(f"youtube_dl error: {e}")
                    raise
            except Exception as e:
                print(f"pytube error: {e}")
                raise

    except Exception as e:
        print(f"Error downloading video: {e}")
        print("\nPossible solutions:")
        print("1. Install yt-dlp: pip install yt-dlp")
        print("2. Or install pytube: pip install pytube")
        print("3. Or install youtube-dl: pip install youtube-dl")
        print("4. Check your internet connection")
        print("5. The video might be restricted or unavailable")
        return None

class RoboflowFaceDetector:
    """
    Face detection using Roboflow API
    """
    def __init__(self, api_key="RA851UccVU1TP3Ln2aDU", model_id="asasa-mqilf/1"):
        self.api_key = api_key
        self.model_id = model_id
        self.client = None
        self.initialized = False
        self.face_encodings = []
        self.duplicate_count = 0

        try:
            from inference_sdk import InferenceHTTPClient
            self.client = InferenceHTTPClient(
                api_url="https://serverless.roboflow.com",
                api_key=self.api_key
            )
            self.initialized = True
            print(f"Roboflow Face Detection API initialized successfully with model {model_id}!")
        except ImportError:
            print("Error: inference_sdk not installed. Please install with:")
            print("pip install inference-sdk")
            print("Falling back to OpenCV for face detection.")
            self._initialize_opencv_fallback()
        except Exception as e:
            print(f"Error initializing Roboflow client: {e}")
            print("Falling back to OpenCV for face detection.")
            self._initialize_opencv_fallback()

    def _initialize_opencv_fallback(self):
        """Initialize OpenCV face detectors as fallback"""
        self.opencv_face_detectors = []

        # Try to load DNN face detector
        try:
            face_detector_dir = "face_detector"
            face_detector_prototxt = os.path.join(face_detector_dir, "deploy.prototxt")
            face_detector_model = os.path.join(face_detector_dir, "res10_300x300_ssd_iter_140000.caffemodel")

            if os.path.exists(face_detector_model) and os.path.exists(face_detector_prototxt):
                dnn_face_detector = cv2.dnn.readNetFromCaffe(face_detector_prototxt, face_detector_model)
                self.opencv_face_detectors.append(("dnn", dnn_face_detector))
                print("Loaded OpenCV DNN face detector")
            else:
                print("OpenCV DNN face detector files not found.")
                print("For better face detection, download these files:")
                print("- deploy.prototxt: https://github.com/opencv/opencv/blob/master/samples/dnn/face_detector/deploy.prototxt")
                print("- res10_300x300_ssd_iter_140000.caffemodel: https://github.com/opencv/opencv_3rdparty/blob/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel")
                print("Place them in a 'face_detector' directory")
        except Exception as e:
            print(f"Error loading OpenCV DNN face detector: {e}")

        # Haar Cascade face detector
        try:
            face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
            if not face_cascade.empty():
                self.opencv_face_detectors.append(("haar", face_cascade))
                print("Loaded Haar Cascade face detector")
            else:
                print("Failed to load Haar Cascade face detector")
        except Exception as e:
            print(f"Error loading Haar Cascade: {e}")

    def detect_faces(self, frame):
        """
        Detect faces in a frame using Roboflow API

        Args:
            frame: OpenCV BGR image

        Returns:
            list: List of (x, y, w, h) tuples for faces
        """
        # Use Roboflow API if available
        if self.initialized and self.client:
            try:
                # Convert OpenCV frame to PIL Image
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_img = Image.fromarray(rgb_frame)

                # Save to bytes with correct format
                img_byte_arr = io.BytesIO()
                pil_img.save(img_byte_arr, format='JPEG')
                img_byte_arr.seek(0)  # Reset pointer to beginning

                # Get predictions by directly passing the image path
                # Convert the image bytes to a temporary file
                import tempfile
                temp_file = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False)
                temp_file.write(img_byte_arr.getvalue())
                temp_file.close()

                # Now use the file path with the API
                result = self.client.infer(temp_file.name, model_id=self.model_id)

                # Clean up the temporary file
                os.unlink(temp_file.name)

                # Process predictions
                faces = []
                for prediction in result.get('predictions', []):
                    # Extract bounding box
                    if 'x' in prediction and 'y' in prediction and 'width' in prediction and 'height' in prediction:
                        # Format where x,y is the center
                        x = int(prediction['x'] - prediction['width']/2)
                        y = int(prediction['y'] - prediction['height']/2)
                        w = int(prediction['width'])
                        h = int(prediction['height'])
                    elif 'bbox' in prediction:
                        # Format with bbox object
                        bbox = prediction['bbox']
                        x = int(bbox.get('x', 0))
                        y = int(bbox.get('y', 0))
                        w = int(bbox.get('width', 0))
                        h = int(bbox.get('height', 0))
                    elif all(k in prediction for k in ['x_min', 'y_min', 'x_max', 'y_max']):
                        # Format with min/max coordinates
                        x = int(prediction['x_min'])
                        y = int(prediction['y_min'])
                        w = int(prediction['x_max'] - prediction['x_min'])
                        h = int(prediction['y_max'] - prediction['y_min'])
                    else:
                        continue

                    # Make sure coordinates are positive
                    x = max(0, x)
                    y = max(0, y)

                    # Skip if width or height is too small
                    if w < 20 or h < 20:
                        continue

                    faces.append((x, y, w, h))

                # If we found faces, return them
                if faces:
                    return faces
            except Exception as e:
                print(f"Error with Roboflow API: {e}")
                traceback.print_exc()
                print("Falling back to OpenCV detection")

        # Fall back to OpenCV if Roboflow failed or not available
        return self._detect_faces_opencv(frame)

    def _detect_faces_opencv(self, frame):
        """Detect faces using OpenCV as fallback"""
        if not hasattr(self, 'opencv_face_detectors'):
            return []

        height, width = frame.shape[:2]
        all_faces = []

        for detector_name, detector in self.opencv_face_detectors:
            faces = []

            if detector_name == "dnn":
                # DNN-based detection
                blob = cv2.dnn.blobFromImage(
                    cv2.resize(frame, (300, 300)), 1.0, (300, 300),
                    (104.0, 177.0, 123.0), swapRB=False, crop=False
                )
                detector.setInput(blob)
                detections = detector.forward()

                for i in range(detections.shape[2]):
                    confidence = detections[0, 0, i, 2]
                    if confidence > 0.5:
                        box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
                        (startX, startY, endX, endY) = box.astype("int")

                        # Ensure coordinates are within frame
                        startX, startY = max(0, startX), max(0, startY)
                        endX, endY = min(width, endX), min(height, endY)

                        w = endX - startX
                        h = endY - startY

                        # Skip very small faces
                        if w < 30 or h < 30:
                            continue

                        faces.append((startX, startY, w, h))
            else:
                # Haar Cascade detection
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                detected = detector.detectMultiScale(gray, 1.1, 5, minSize=(30, 30))
                faces.extend([(x, y, w, h) for (x, y, w, h) in detected])

            all_faces.extend(faces)

        # Remove duplicates using non-maximum suppression
        if len(all_faces) > 1:
            # Convert to format for NMS
            boxes = [[x, y, x+w, y+h] for (x, y, w, h) in all_faces]
            scores = [1.0] * len(boxes)  # Assign equal confidence

            # Apply NMS
            indices = cv2.dnn.NMSBoxes(boxes, scores, 0.3, 0.3)

            # Extract the filtered faces
            filtered_faces = []
            for i in indices:
                if isinstance(i, list):  # OpenCV 3.x returns nested indices
                    i = i[0]
                x, y, w, h = all_faces[i]
                filtered_faces.append((x, y, w, h))

            return filtered_faces

        return all_faces

    def is_duplicate(self, face_img, similarity_threshold=0.75):
        """
        Check if a face is a duplicate using feature-based comparison

        Args:
            face_img: Face image
            similarity_threshold: Threshold for duplicate detection

        Returns:
            bool: True if duplicate, False if unique
        """
        try:
            # Convert to grayscale
            gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)

            # Extract the upper part of the face (less affected by expressions)
            h, w = gray.shape
            upper_face = gray[:int(h*0.6), :]

            # Resize for consistent comparison
            upper_face = cv2.resize(upper_face, (64, 64))

            # 1. Compute histogram features
            hist = cv2.calcHist([upper_face], [0], None, [64], [0, 256])
            cv2.normalize(hist, hist, 0, 1, cv2.NORM_MINMAX)
            hist_features = hist.flatten()

            # 2. Compute edge features (less affected by lighting)
            sobelx = cv2.Sobel(upper_face, cv2.CV_64F, 1, 0, ksize=3)
            sobely = cv2.Sobel(upper_face, cv2.CV_64F, 0, 1, ksize=3)
            magnitude = cv2.magnitude(sobelx, sobely)
            edge_mask = (magnitude > magnitude.mean()).astype(np.uint8)

            # Combine features
            encoding = np.concatenate([hist_features, edge_mask.flatten()])

            # Check against existing encodings
            for existing_encoding in self.face_encodings:
                # Calculate similarity for histogram part
                hist_similarity = cv2.compareHist(
                    existing_encoding[:64].reshape(-1, 1),
                    hist_features.reshape(-1, 1),
                    cv2.HISTCMP_CORREL
                )

                # Calculate similarity for edge part (Hamming distance)
                hamming_distance = np.count_nonzero(existing_encoding[64:] != edge_mask.flatten())
                edge_similarity = 1.0 - hamming_distance / len(edge_mask.flatten())

                # Combined similarity score
                similarity = 0.5 * hist_similarity + 0.5 * edge_similarity

                if similarity > similarity_threshold:
                    self.duplicate_count += 1
                    return True

            # If we get here, it's a new face
            self.face_encodings.append(encoding)
            return False

        except Exception as e:
            print(f"Error in duplicate detection: {e}")
            return False

    def get_duplicate_count(self):
        """Get the number of duplicates detected"""
        return self.duplicate_count

def enhance_face_image(face_img):
    """
    Enhance a face image for better quality

    Args:
        face_img: Input face image

    Returns:
        Enhanced face image
    """
    if face_img is None or face_img.size == 0:
        return None

    try:
        # Resize for consistency if too small
        if face_img.shape[0] < 150 or face_img.shape[1] < 150:
            scale = max(150 / face_img.shape[0], 150 / face_img.shape[1])
            new_size = (int(face_img.shape[1] * scale), int(face_img.shape[0] * scale))
            face_img = cv2.resize(face_img, new_size, interpolation=cv2.INTER_CUBIC)

        # Create a copy for processing
        enhanced = face_img.copy()

        # Convert to LAB color space for better contrast adjustment
        lab = cv2.cvtColor(enhanced, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)

        # Apply CLAHE to the L channel
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        cl = clahe.apply(l)

        # Merge back the channels
        enhanced_lab = cv2.merge((cl, a, b))
        enhanced = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2BGR)

        # Apply slight sharpening
        kernel = np.array([[-1, -1, -1],
                          [-1, 9, -1],
                          [-1, -1, -1]])
        enhanced = cv2.filter2D(enhanced, -1, kernel)

        return enhanced

    except Exception as e:
        print(f"Error enhancing face: {e}")
        return face_img  # Return original if enhancement fails

def assess_face_quality(face_img, min_size=(50, 50)):
    """
    Assess the quality of a face image

    Args:
        face_img: Input face image
        min_size: Minimum acceptable size for a face

    Returns:
        tuple: (quality_score, reason)
    """
    try:
        if face_img is None or face_img.size == 0:
            return 0.0, "Empty image"

        # Check face size
        h, w = face_img.shape[:2]
        if h < min_size[0] or w < min_size[1]:
            return 0.0, "Too small"

        # Convert to grayscale
        gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)

        # Check for blur using Laplacian variance
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        if laplacian_var < 100:
            return 0.0, "Too blurry"

        # Check brightness
        brightness = np.mean(gray)
        if brightness < 40 or brightness > 215:
            return 0.0, "Poor lighting"

        # Check for contrast
        contrast = gray.std()
        if contrast < 20:
            return 0.0, "Low contrast"

        # Calculate quality score
        blur_score = min(laplacian_var / 500, 1.0)
        brightness_score = 1.0 - abs((brightness - 127.5) / 127.5)
        contrast_score = min(contrast / 80, 1.0)

        # Combine scores
        quality_score = (0.5 * blur_score + 0.25 * brightness_score + 0.25 * contrast_score)

        return quality_score, "Pass"

    except Exception as e:
        print(f"Error assessing face quality: {e}")
        return 0.0, str(e)

def extract_faces_from_video(video_path, output_dir="faces", frame_skip=30, similarity_threshold=0.75,
                           quality_threshold=0.5, max_faces=None, api_key="RA851UccVU1TP3Ln2aDU", model_id="asasa-mqilf/1"):
    """
    Extract unique faces from a video using Roboflow face detection

    Args:
        video_path: Path to the video file
        output_dir: Directory to save faces
        frame_skip: Number of frames to skip between processing
        similarity_threshold: Threshold for considering faces as duplicates
        quality_threshold: Minimum quality score for faces
        max_faces: Maximum number of faces to extract
        api_key: Roboflow API key
        model_id: Roboflow model ID

    Returns:
        dict: Statistics about the extraction process
    """
    try:
        # Create output directory
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            print(f"Created directory: {output_dir}")

        # Initialize counters
        frame_count = 0
        processed_frames = 0
        face_count = 0

        # Initialize the face detector
        face_detector = RoboflowFaceDetector(api_key=api_key, model_id=model_id)

        # Open the video
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"Error: Could not open video file {video_path}")
            return None

        # Get video information
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        duration = total_frames / fps if fps > 0 else 0

        print(f"\nProcessing video: {video_path}")
        print(f"Total frames: {total_frames}")
        print(f"FPS: {fps}")
        print(f"Duration: {duration:.2f} seconds")
        print(f"Frame interval: {frame_skip}")

        # Setup progress bar
        pbar = tqdm(total=total_frames, desc="Processing", unit="frames")

        # Get base video name for saving files
        video_base_name = os.path.splitext(os.path.basename(video_path))[0]

        # For debugging
        save_debug_frames = True
        debug_dir = os.path.join(output_dir, "debug")
        if save_debug_frames and not os.path.exists(debug_dir):
            os.makedirs(debug_dir)

        # Start processing
        processing_start = time.time()
        last_debug_saved = 0  # To track when we last saved a debug frame

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            pbar.update(1)

            # Only process every Nth frame
            if frame_count % frame_skip != 0:
                continue

            processed_frames += 1

            # Get frame dimensions
            height, width = frame.shape[:2]

            # Debug image for visualization
            debug_image = None
            if save_debug_frames and (processed_frames - last_debug_saved >= 30 or processed_frames <= 2):
                debug_image = frame.copy()
                last_debug_saved = processed_frames

            # Detect faces in frame using Roboflow
            try:
                faces = face_detector.detect_faces(frame)

                # Process each detected face
                for face_id, (x, y, w, h) in enumerate(faces):
                    try:
                        # Add margin around face for better results
                        margin_x = int(w * 0.2)
                        margin_y = int(h * 0.2)
                        top_margin = int(h * 0.3)  # Extra margin for forehead

                        # Calculate face region with margins
                        x1 = max(0, x - margin_x)
                        y1 = max(0, y - top_margin)
                        x2 = min(width, x + w + margin_x)
                        y2 = min(height, y + h + margin_y)

                        # Extract face
                        face_img = frame[y1:y2, x1:x2]

                        # Skip empty or tiny faces
                        if face_img is None or face_img.size == 0 or face_img.shape[0] < 20 or face_img.shape[1] < 20:
                            continue

                        # Assess face quality
                        quality_score, reason = assess_face_quality(face_img)

                        # Skip low quality faces
                        if quality_score < quality_threshold:
                            if debug_image is not None:
                                cv2.rectangle(debug_image, (x1, y1), (x2, y2), (0, 0, 255), 2)
                                cv2.putText(debug_image, f"Low quality: {reason}", (x1, y1-10),
                                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                            continue

                        # Check if this is a duplicate face
                        if face_detector.is_duplicate(face_img, similarity_threshold):
                            # Draw red box for duplicates in debug image
                            if debug_image is not None:
                                cv2.rectangle(debug_image, (x1, y1), (x2, y2), (0, 0, 255), 2)
                                cv2.putText(debug_image, "Duplicate", (x1, y1-10),
                                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                            continue

                        # If we got here, it's a unique face - enhance and save it
                        enhanced_face = enhance_face_image(face_img)
                        if enhanced_face is None:
                            continue

                        # Save the face
                        face_count += 1
                        face_filename = f"{video_base_name}_face_{face_count:04d}_{quality_score:.2f}.jpg"
                        cv2.imwrite(os.path.join(output_dir, face_filename), enhanced_face)

                        # Draw green box for saved faces in debug image
                        if debug_image is not None:
                            cv2.rectangle(debug_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
                            cv2.putText(debug_image, f"#{face_count}", (x1, y1-10),
                                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                        # Update progress bar
                        pbar.set_description(f"Found: {face_count} faces (filtered: {face_detector.get_duplicate_count()})")

                        # Check if we've reached the maximum number of faces
                        if max_faces and face_count >= max_faces:
                            print(f"\nReached maximum number of faces ({max_faces})")
                            break

                    except Exception as e:
                        print(f"Error processing face {face_id}: {e}")
                        continue

                # Save debug image if needed
                if debug_image is not None and save_debug_frames:
                    debug_filename = f"{video_base_name}_debug_{processed_frames:04d}.jpg"
                    cv2.imwrite(os.path.join(debug_dir, debug_filename), debug_image)

                # Check if we've reached the maximum number of faces
                if max_faces and face_count >= max_faces:
                    break

            except Exception as e:
                print(f"Error processing frame {frame_count}: {e}")
                continue

        # Clean up
        cap.release()
        pbar.close()

        # Calculate processing statistics
        processing_time = time.time() - processing_start
        processing_fps = processed_frames / processing_time if processing_time > 0 else 0

        # Create a montage of all unique faces for quick review
        try:
            # We don't have a direct way to access the original face images from the detector,
            # so we'll read back the saved faces to create the montage
            face_files = [f for f in os.listdir(output_dir) if f.startswith(video_base_name) and "debug" not in f]

            if face_files:
                # Sort files numerically
                face_files.sort(key=lambda x: int(x.split("_face_")[1].split("_")[0]))

                # Read files into memory
                faces = []
                for file in face_files:
                    try:
                        face = cv2.imread(os.path.join(output_dir, file))
                        if face is not None:
                            faces.append(face)
                    except:
                        continue

                if faces:
                    # Resize all to same dimensions
                    target_size = (150, 150)
                    resized_faces = [cv2.resize(face, target_size) for face in faces]

                    # Determine layout for montage
                    n_faces = len(resized_faces)
                    cols = min(5, n_faces)  # Max 5 faces per row
                    rows = (n_faces + cols - 1) // cols

                    # Create blank montage
                    montage = np.zeros((rows * target_size[1], cols * target_size[0], 3), dtype=np.uint8)

                    # Fill montage with faces
                    for i, face in enumerate(resized_faces):
                        if i >= rows * cols:
                            break

                        row = i // cols
                        col = i % cols

                        y_start = row * target_size[1]
                        y_end = y_start + target_size[1]
                        x_start = col * target_size[0]
                        x_end = x_start + target_size[0]

                        montage[y_start:y_end, x_start:x_end] = face

                    # Save montage
                    montage_path = os.path.join(output_dir, f"{video_base_name}_unique_faces_montage.jpg")
                    cv2.imwrite(montage_path, montage)
                    print(f"Created montage of all unique faces: {montage_path}")
        except Exception as e:
            print(f"Error creating faces montage: {e}")

        # Print summary
        print("\n====== Face Extraction Complete ======")
        print(f"Video: {video_path}")
        print(f"Total frames: {total_frames}")
        print(f"Processed frames: {processed_frames}")
        print(f"Unique faces found: {face_count}")
        print(f"Duplicates filtered: {face_detector.get_duplicate_count()}")
        print(f"Processing time: {processing_time:.2f} seconds")
        print(f"Processing speed: {processing_fps:.2f} frames/second")
        print(f"Faces saved to: {output_dir}")

        if face_count == 0:
            print("\nNo faces were detected. Possible reasons:")
            print("1. The video may not contain clear faces")
            print(f"2. The quality threshold may be too high (currently {quality_threshold})")
            print(f"3. The frame interval may be too large (currently every {frame_skip} frames)")

        return {
            "total_frames": total_frames,
            "processed_frames": processed_frames,
            "unique_faces": face_count,
            "duplicates_filtered": face_detector.get_duplicate_count(),
            "processing_time": processing_time,
            "output_dir": output_dir
        }

    except Exception as e:
        print(f"Error in extract_faces_from_video: {e}")
        import traceback
        traceback.print_exc()
        return None

def main():
    try:
        print("YouTube Face Extractor with Roboflow API")
        print("=======================================")

        # Get YouTube URL from user
        youtube_url = input("Enter YouTube video URL: ")

        # Other parameters
        frame_skip = int(input("Enter frame skip rate (default: 30): ") or "30")
        output_dir = input("Enter output directory (default: 'faces'): ") or "faces"
        max_faces_input = input("Maximum number of faces to extract (optional): ")
        max_faces = int(max_faces_input) if max_faces_input.strip() else None

        # Advanced parameters - with defaults that work well
        print("\nAdvanced parameters (press Enter to use defaults):")
        similarity_threshold = float(input("Similarity threshold (0.5-0.9, default: 0.75): ") or "0.75")
        similarity_threshold = max(0.5, min(0.9, similarity_threshold))

        quality_threshold = float(input("Quality threshold (0.3-0.8, default: 0.5): ") or "0.5")
        quality_threshold = max(0.3, min(0.8, quality_threshold))

        # Roboflow parameters
        api_key = input("Enter Roboflow API key (press Enter to use default): ") or "RA851UccVU1TP3Ln2aDU"
        model_id = input("Enter Roboflow model ID (press Enter to use default): ") or "asasa-mqilf/1"

        # Download the video
        print("\nDownloading video...")
        video_path = download_youtube_video(youtube_url)

        if video_path and os.path.exists(video_path):
            print(f"Video successfully downloaded to {video_path}")

            # Extract faces
            print("\nStarting face extraction with Roboflow...")
            result = extract_faces_from_video(
                video_path,
                output_dir=output_dir,
                frame_skip=frame_skip,
                similarity_threshold=similarity_threshold,
                quality_threshold=quality_threshold,
                max_faces=max_faces,
                api_key=api_key,
                model_id=model_id
            )

            # Clean up downloaded video
            try:
                os.remove(video_path)
                print(f"Temporary video file removed")
            except Exception as e:
                print(f"Could not remove temporary file: {e}")

            # Show final results
            if result and result["unique_faces"] > 0:
                print("\nExtraction complete! To see your faces, check the directory:")
                print(f"  {os.path.abspath(output_dir)}")
                print(f"\nA montage of all unique faces has been created at:")
                print(f"  {os.path.abspath(os.path.join(output_dir, os.path.basename(video_path).split('.')[0] + '_unique_faces_montage.jpg'))}")
                print("\nFor the best results, make sure you have the inference SDK installed:")
                print("  pip install inference-sdk")

        else:
            print("\nFailed to download video. Please check the URL or your internet connection.")

    except KeyboardInterrupt:
        print("\nProcess interrupted by user.")
    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        print("Please check your dependencies and try again.")

if __name__ == "__main__":
    main()

YouTube Face Extractor with Roboflow API
Enter YouTube video URL: https://www.youtube.com/shorts/-sP9jqNNvGU
Enter frame skip rate (default: 30): 
Enter output directory (default: 'faces'): 
Maximum number of faces to extract (optional): 

Advanced parameters (press Enter to use defaults):
Similarity threshold (0.5-0.9, default: 0.75): 
Quality threshold (0.3-0.8, default: 0.5): 
Enter Roboflow API key (press Enter to use default): 
Enter Roboflow model ID (press Enter to use default): 

Downloading video...
Downloading video from: https://www.youtube.com/shorts/-sP9jqNNvGU
[youtube] Extracting URL: https://www.youtube.com/shorts/-sP9jqNNvGU
[youtube] -sP9jqNNvGU: Downloading webpage
[youtube] -sP9jqNNvGU: Downloading tv client config
[youtube] -sP9jqNNvGU: Downloading player 9599b765-main
[youtube] -sP9jqNNvGU: Downloading tv player API JSON
[youtube] -sP9jqNNvGU: Downloading ios player API JSON
[youtube] -sP9jqNNvGU: Downloading m3u8 information
[info] -sP9jqNNvGU: Downloading 1 form

Found: 1 faces (filtered: 0): 100%|██████████| 1677/1677 [00:20<00:00, 82.76frames/s]

Created montage of all unique faces: faces/video_20250414_021353_unique_faces_montage.jpg

Video: videos/video_20250414_021353.mp4
Total frames: 1677
Processed frames: 55
Unique faces found: 1
Duplicates filtered: 0
Processing time: 20.26 seconds
Processing speed: 2.71 frames/second
Faces saved to: faces
Temporary video file removed

Extraction complete! To see your faces, check the directory:
  /content/faces

A montage of all unique faces has been created at:
  /content/faces/video_20250414_021353_unique_faces_montage.jpg

For the best results, make sure you have the inference SDK installed:
  pip install inference-sdk





In [6]:
!pip install --upgrade inference-sdk





In [11]:
import os
import shutil

def delete_files_in_directory(directory="faces"):
    """Deletes all files within the specified directory."""
    try:
        for filename in os.listdir(directory):
            file_path = os.path.join(directory, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print('Failed to delete %s. Reason: %s' % (file_path, e))
        print(f"Files in '{directory}' deleted successfully.")
    except FileNotFoundError:
        print(f"Directory '{directory}' not found.")

# Example usage:
delete_files_in_directory()  # Deletes files in "extracted_frames"

Files in 'faces' deleted successfully.
