In [1]:
!pip install -q ultralytics supervision gdown

## Video processing

In [1]:
from pytube import YouTube
import os

import cv2
import math
import json
import copy
import base64
import requests

import numpy as np
import pandas as pd
import supervision as sv

from tqdm.notebook import tqdm
from ultralytics import YOLO
from typing import List, Optional, Dict, Iterator, Tuple

In [2]:
df = pd.read_csv('../data/transform_data/Grassroot_data_15.csv')
df

Unnamed: 0.1,Unnamed: 0,Date,Title,URLs,Views,Is_Video,Video_ID,Video_Duration,Duration_Minutes
0,VM513:1,30/01/2024,U23 GetScouted Showcase Match Footage | London...,https://www.youtube.com/watch?v=8ZabZYk8tBg&t=...,1162.0,True,8ZabZYk8tBg,PT1H7M12S,67.200000
1,VM513:1,30/01/2024,TALENT ID MATCH - GETSCOUTED ELITE V MUNDI GEN...,https://www.youtube.com/watch?v=OPddmoSOwsk,526.0,True,OPddmoSOwsk,PT1H30M37S,90.616667
2,VM513:1,30/01/2024,(GetScouted Webinar) What does it take to get ...,https://www.youtube.com/watch?v=2muV6yWpF1A,227.0,True,2muV6yWpF1A,PT49M48S,49.800000
3,VM644:1,30/01/2024,A LATE FREE KICK GOLAZO! | LONDON LEGACY VS TI...,https://www.youtube.com/watch?v=PNuf6akaVAE,207.0,True,PNuf6akaVAE,PT1H34M33S,94.550000
4,VM644:1,30/01/2024,OUTPLAYED ON HOME TURF! | LEGACY VS AC MILANO ...,https://www.youtube.com/watch?v=yRLWlLB4-fo,215.0,True,yRLWlLB4-fo,PT1H33M7S,93.116667
...,...,...,...,...,...,...,...,...,...
584,VM607:1,30/01/2024,La Roca HW vs Omni-U8 Indoor soccer 11-29-14,https://www.youtube.com/watch?v=UIE7-CW-ukk,649.0,True,UIE7-CW-ukk,PT15M28S,15.466667
585,VM607:1,30/01/2024,La Roca RH vs XBOX-U8 Indoor Soccer 11-22-14,https://www.youtube.com/watch?v=M3Gl3UjKBlc,1554.0,True,M3Gl3UjKBlc,PT16M36S,16.600000
586,VM607:1,30/01/2024,La Roca HW vs Outlaws BY-U8 Indoor Soccer 11-1...,https://www.youtube.com/watch?v=YhGj2voXjes,1988.0,True,YhGj2voXjes,PT15M42S,15.700000
587,VM607:1,30/01/2024,La Roca RH vs United-U8 Indoor Soccer 11-15-14,https://www.youtube.com/watch?v=6m8WoDyjrMk,497.0,True,6m8WoDyjrMk,PT18M14S,18.233333


In [3]:
random_indices = np.random.randint(0, df.shape[0], size=5)  
data_download = df.iloc[random_indices]
data_download

Unnamed: 0.1,Unnamed: 0,Date,Title,URLs,Views,Is_Video,Video_ID,Video_Duration,Duration_Minutes
176,VM607:1,30/01/2024,La Roca TC vs Club America - U15 Tournament So...,https://www.youtube.com/watch?v=3OfgiG0uis0,9322.0,True,3OfgiG0uis0,PT24M42S,24.7
248,VM607:1,30/01/2024,La Roca TC vs La Roca TC - U13 Futsal,https://www.youtube.com/watch?v=0QOHSa60CpA,4835.0,True,0QOHSa60CpA,PT19M8S,19.133333
378,VM607:1,30/01/2024,Wasatch Soccer Classic-Wasatch SD vs Wasatch T...,https://www.youtube.com/watch?v=xyktcUuWb3c,48809.0,True,xyktcUuWb3c,PT25M45S,25.75
387,VM607:1,30/01/2024,Forza DN vs Wasatch SD - U11 Premier Soccer,https://www.youtube.com/watch?v=o3_oyr-EJtw,23987.0,True,o3_oyr-EJtw,PT23M29S,23.483333
379,VM607:1,30/01/2024,Wasatch Soccer Classic - Wasatch SD vs Cima FC...,https://www.youtube.com/watch?v=8a0jqEAccsQ,62553.0,True,8a0jqEAccsQ,PT20M54S,20.9


In [4]:
def download_videos(data, url_column, download_path='.'):
    """
    Download YouTube videos from URLs contained in a DataFrame.

    Args:
    data (pd.DataFrame): The DataFrame containing the video URLs.
    url_column (str): The name of the column containing the YouTube URLs.
    download_path (str): The path to the directory where videos will be downloaded.

    This function downloads each video to the specified directory.
    """
    # Ensure the download directory exists
    os.makedirs(download_path, exist_ok=True)

    for url in data[url_column]:
        try:
            yt = YouTube(url)

            # Get the first stream; by default pytube selects the first available stream
            video = yt.streams.filter(progressive=True, file_extension='mp4').first()

            if video:
                video.download(download_path)
                print(f"Downloaded {yt.title} successfully.")
            else:
                print(f"No available video stream for {url}.")
        except Exception as e:
            print(f"Failed to download {url}: {str(e)}")
            



def download_best_available(data, url_column, download_path='.'):
    counter = 1 
    
    for url in data[url_column]:
        try:
            yt = YouTube(url)

            video = yt.streams.filter(progressive=True, file_extension='mp4', res="720p").first()

            if not video:
                video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()

            if video:
                video.download(output_path=download_path)

                print(f"Downloaded {yt.title} at {video.resolution} successfully")
                
                counter += 1

            else:   
                print(f"No video available for {url}")

        except Exception as e:
            print(f"Failed to download {url}: {str(e)}")
            
import cv2

def extract_frames(video_path, frames_dir, skip_frames=0):
    """
    Extract frames from a video file.

    Args:
    video_path (str): Path to the video file.
    frames_dir (str): Directory to save the extracted frames.
    skip_frames (int): Number of frames to skip between extractions.
    """
    os.makedirs(frames_dir, exist_ok=True)
    vidcap = cv2.VideoCapture(video_path)
    success, image = vidcap.read()
    count = 0

    while success:
        if count % (skip_frames + 1) == 0:
            cv2.imwrite(f"{frames_dir}/frame_{count}.jpg", image)     # save frame as JPEG file
        success, image = vidcap.read()
        count += 1
    print("All frame are collected")
    

def extract_frames_from_all_videos(videos_dir, frames_root_dir, skip_frames=0):
    """
    Extract frames from all video files in a given directory and save them into separate directories for each video.

    Args:
    videos_dir (str): Path to the directory containing video files.
    frames_root_dir (str): Root directory to save the frames directories for each video.
    skip_frames (int): Number of frames to skip between extractions.
    """
    os.makedirs(frames_root_dir, exist_ok=True)
    video_files = [f for f in os.listdir(videos_dir) if os.path.isfile(os.path.join(videos_dir, f))]
    
    # Loop over each file in the directory
    for video_file in video_files:
        # Construct the full path to the video file
        video_path = os.path.join(videos_dir, video_file)   
        # Create a directory for frames of this video
        video_name = os.path.splitext(video_file)[0]
        frames_dir = os.path.join(frames_root_dir, video_name)
        os.makedirs(frames_dir, exist_ok=True)
        
        extract_frames(video_path, frames_dir, skip_frames)

In [11]:
download_path = '../data/videos'

download_best_available(data_download, 'URLs', download_path)

Downloaded La Roca S-FS vs Wasatch JS- U12 D1 Soccer at 720p successfully
Downloaded Cache Valley Cup - U13 Tournament Soccer Game #2 at 720p successfully
Downloaded Strikers Green vs Wasatch JS-U12 Indoor Soccer at 720p successfully
Downloaded USA Adidas Cup tournament U13- Rangers vs La Roca at 720p successfully
Downloaded La Roca AV vs Rush FC - U16 D2 Soccer at 720p successfully


In [14]:
videos_dir = '../data/videos'
frames_root_dir = '../data/videos_frames'

extract_frames_from_all_videos(videos_dir, frames_root_dir, skip_frames=0)

All frame are collected
All frame are collected
All frame are collected
All frame are collected
All frame are collected


## Helper function

In [5]:
COLOR_HEX_LIST = [
    "#EE4B2B",
    "#FFFF00",
    "#D3D3D3"
]


def annotate_prompt(
    image: np.ndarray,
    detections: sv.Detections,
    labels: Optional[List[str]] = None
) -> np.ndarray:
    """
    Annotates an image with bounding boxes and labels based on provided detections.

    Parameters:
        image (np.ndarray): The image to be annotated. It should be in a format compatible with sv.BoundingBoxAnnotator
            and sv.LabelAnnotator, typically a NumPy array.
        detections (sv.Detections): A collection of detections, each typically containing information like
            bounding box coordinates, class IDs, etc., to be used for annotation.
        labels (Optional[List[str]]): A list of strings representing the labels for each detection. If not
            provided, labels are automatically generated as sequential numbers.

    Returns:
        np.ndarray: An annotated version of the input image, with bounding boxes and labels drawn over it.

    """
    bounding_box_annotator = sv.BoundingBoxAnnotator(
        color=sv.Color.black(),
        color_lookup=sv.ColorLookup.CLASS)
    label_annotator = sv.LabelAnnotator(
        color=sv.Color.black(),
        text_color=sv.Color.white(),
        color_lookup=sv.ColorLookup.CLASS,
        text_scale=0.7)

    if labels is None:
        labels = [str(i) for i in range(len(detections))]

    annotated_image = image.copy()
    annotated_image = bounding_box_annotator.annotate(
        annotated_image, detections=detections)
    annotated_image = label_annotator.annotate(
        annotated_image, detections=detections, labels=labels)

    return annotated_image


def annotate_result(
    image: np.ndarray,
    detections: sv.Detections
) -> np.ndarray:
    """
    Annotates a given image with ellipses around detected objects.

    Parameters:
        image (np.ndarray): The image to be annotated. It should be in the format
            acceptable by sv.EllipseAnnotator.
        detections (sv.Detections): An object of sv.Detections, which contains
            the detected objects' information to be annotated on the image.

    Returns:
        np.ndarray: An image (numpy array) with ellipses drawn around the detected
            objects. This image is a modified copy of the input image.
    """
    h, w, _ = image.shape
    text_scale = sv.calculate_dynamic_text_scale(resolution_wh=(w, h))
    text_scale = min(text_scale, 0.8)
    line_thickness = sv.calculate_dynamic_line_thickness(resolution_wh=(w, h))
    ellipse_annotator = sv.EllipseAnnotator(
        color=sv.ColorPalette.from_hex(color_hex_list=COLOR_HEX_LIST),
        color_lookup=sv.ColorLookup.CLASS,
        thickness=line_thickness)
    label_annotator = sv.LabelAnnotator(
        color=sv.ColorPalette.from_hex(color_hex_list=COLOR_HEX_LIST),
        text_color=sv.Color.black(),
        color_lookup=sv.ColorLookup.CLASS,
        text_position=sv.Position.BOTTOM_CENTER,
        text_scale=text_scale)

    labels = [f"#{tracker_id}" for tracker_id in detections.tracker_id]

    annotated_image = image.copy()
    annotated_image = ellipse_annotator.annotate(
        annotated_image, detections)
    annotated_image = label_annotator.annotate(
        annotated_image, detections, labels=labels)
    return annotated_image

In [6]:
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"


def encode_image_to_base64(image: np.ndarray) -> str:
    success, buffer = cv2.imencode('.jpg', image)
    if not success:
        raise ValueError("Could not encode image to JPEG format.")

    encoded_image = base64.b64encode(buffer).decode('utf-8')
    return encoded_image


def compose_payload(images: np.ndarray, prompt: str) -> dict:
    text_content = {
        "type": "text",
        "text": prompt
    }
    image_content = [
        {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{encode_image_to_base64(image=image)}"
            }
        }
        for image
        in images
    ]
    return {
        "model": "gpt-4-vision-preview",
        "messages": [
            {
                "role": "user",
                "content": [text_content] + image_content
            }
        ],
        "max_tokens": 300
    }


def compose_headers(api_key: str) -> dict:
    return {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }


def prompt_image(api_key: str, images: np.ndarray, prompt: str) -> str:
    headers = compose_headers(api_key=api_key)
    payload = compose_payload(images=images, prompt=prompt)
    response = requests.post(url=OPENAI_API_URL, headers=headers, json=payload).json()

    if 'error' in response:
        raise ValueError(response['error']['message'])
    return response['choices'][0]['message']['content']

In [7]:
def resize_images(images: List[np.ndarray], size: Tuple[int, int]) -> List[np.ndarray]:
    """
    Resizes all images to the specified size.

    Args:
        images (List[np.ndarray]): A list of images to be resized. Each image is a 3D NumPy array.
        size (Tuple[int, int]): The target size for the images, specified as (width, height).

    Returns:
        List[np.ndarray]: A list of resized images.
    """
    return [cv2.resize(image, size) for image in images]


def blend_images(images: List[np.ndarray]) -> np.ndarray:
    """
    Blends a list of images into a single image.

    Args:
    images: A list of images where each image is a NumPy array. All images must have the same shape and dtype.

    Returns:
    A blended image as a NumPy array.

    Raises:
    ValueError: If the input list is empty.
    """
    if not images:
        raise ValueError("The list of images is empty.")

    image_stack = np.stack(images)
    blended_image = np.mean(image_stack, axis=0)

    return blended_image.astype(np.uint8)


def chunk_list(lst: List, n: int) -> Iterator[List]:
    """
    Yield successive n-sized chunks from a list.

    Parameters:
        lst (List): The list to be chunked.
        n (int): The size of each chunk.

    Yields:
        Iterator[List]: An iterator over the chunks of the list, each being a list of maximum `n` elements.
    """
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

## Set open-ai API Key

In [8]:
OPENAI_API_KEY = "my_apy_key"

## Load detection model

In [9]:
model = YOLO('yolov8x.pt')

In [10]:
single_video_path = "../data/test_video/test2.mp4"

sv.VideoInfo.from_video_path(single_video_path)

VideoInfo(width=1280, height=720, fps=25, total_frames=2992)

In [None]:
frame_generator = sv.get_video_frames_generator(single_video_path, start=80)
frame_iterator = iter(frame_generator)
frame = next(frame_iterator)
sv.plot_image(frame)