<h1>YouTube</h1>

<h2>Download YouTube Video</h2>

<h3>Sanitize YouTube video title to use in file names</h3>

In [22]:
import re

def sanitize_filename(title):
    """Sanitize the string to be safe for filenames."""
    # Remove any character that is not alphanumeric, a space, underscore, or hyphen
    sanitized = re.sub(r'[^\w\s-]', '', title)
    # Replace spaces or multiple hyphens with a single hyphen
    sanitized = re.sub(r'\s+', '-', sanitized).strip('-_')
    return sanitized

In [23]:
from pytube import YouTube

def download_youtube_video(url, path):
    print(f"Downloading content from {url}...")
    yt = YouTube(url)

    # Sanitize the video title to create a valid filename
    base_filename = sanitize_filename(yt.title)

    if not os.path.exists(path):
        os.makedirs(path)

    # Download the video at 720p resolution
    video_filename = f"{base_filename}.mp4"
    video_stream = yt.streams.filter(file_extension='mp4', res="720p").first()

    video_path = video_stream.download(output_path=path, filename=video_filename)
    print(f"Video downloaded to {video_path}")

    # Download the best audio stream and save it as an MP4 (which is what pytube does natively)
    audio_filename = f"{base_filename}.mp3"
    audio_stream = yt.streams.filter(only_audio=True).first()
    audio_path = audio_stream.download(output_path=path, filename=audio_filename)
    print(f"Audio downloaded to {audio_path}")
    
    return video_path, audio_path

<h2>Get Frames From Video</h2>

<h3>Create directory if it doesn't exist</h3>

In [24]:
import os
import shutil 

def create_frame_output_dir(output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    else:
        shutil.rmtree(output_dir)
        os.makedirs(output_dir)

In [25]:
import cv2

def extract_frame_from_video(video_file_path):
    print(f"Extracting {video_file_path} at 1 frame per second. This might take a bit...")
    #get the name of the file from the path
    file_name = os.path.basename(video_file_path)
    file_name, file_extension = os.path.splitext(file_name)
    
    frame_extraction_directory = f"content/frames/{file_name}"
    create_frame_output_dir(frame_extraction_directory)
    vidcap = cv2.VideoCapture(video_file_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    frame_count = 0
    count = 0
    while vidcap.isOpened():
        success, frame = vidcap.read()
        if not success:  # End of video
            break
        if int(count / fps) == frame_count:  # Extract a frame every second
            min = frame_count // 60
            sec = frame_count % 60
            time_string = f"{min:02d}_{sec:02d}"
            image_name = f"vid_frame{time_string}.jpg"
            output_filename = frame_extraction_directory + "/" + image_name
            cv2.imwrite(output_filename, frame)
            frame_count += 1
        count += 1
    vidcap.release()
    print(f"Completed video frame extraction!\n\nExtracted: {frame_count} frames")
    return frame_extraction_directory

<h2>Download video and extract frames</h2>

In [26]:
def get_video(url: str, extract: bool = True):
    # Download video
    video_file_path, audio_file_path = download_youtube_video(url, "content/videos")
    # Extract frames
    extract_frame_from_video(video_file_path)
    return video_file_path, audio_file_path

<h2>Example Run</h2>

In [27]:
# video_url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ'
# get_video(video_url)

Downloading content from https://www.youtube.com/watch?v=dQw4w9WgXcQ...
Video downloaded to c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp4
Audio downloaded to c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp3
Extracting c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp4 at 1 frame per second. This might take a bit...
Completed video frame extraction!

Extracted: 213 frames


('c:\\Users\\JR\\Desktop\\coding\\TeachMe\\server\\experiments\\content/videos\\Never-Gonna-Give-You-Up.mp4',
 'c:\\Users\\JR\\Desktop\\coding\\TeachMe\\server\\experiments\\content/videos\\Never-Gonna-Give-You-Up.mp3')

<h1>Gemini</h1>

<h2>Cycling API Key</h2>

In [29]:
import google.generativeai as genai
import ast
import os
import random
# Load environment variables from a .env file
from dotenv import load_dotenv
load_dotenv()

# Parse API keys stored in an environment variable and convert them into a Python list
GEMINI_API_KEYS = os.environ.get("GEMINI_API_KEYS")
KEY_LIST = ast.literal_eval(GEMINI_API_KEYS)

# Shuffle the API keys list to ensure usage of different keys over time
random.shuffle(KEY_LIST)

# Initialize a global index to track the current API key in use
current_api_key_index = 0

In [30]:
def cycle_api_key():
    """Retrieve the next API key from the list, cycling back to the start if necessary."""
    global current_api_key_index
    if current_api_key_index >= len(KEY_LIST) - 1:
        current_api_key_index = 0
    else:
        current_api_key_index += 1
    return KEY_LIST[current_api_key_index]

<h2>Generate new model</h2>

In [31]:
def generate_new_model():
    """Generate and configure a new AI model instance with a cycled API key."""
    global current_api_key_index
    global messages
    api_key = cycle_api_key()  # Cycle to the next API key
    
    # Configure the generative AI model with the new API key
    genai.configure(api_key=api_key)
    
    # Initialize the model with specific configurations
    model = genai.GenerativeModel(
        "gemini-1.5-pro-latest",
        generation_config=genai.GenerationConfig(
            temperature=0,  # Set deterministic behavior for the model
        ),
    )
    return model

<h3>Upload a file to gemini</h3>

In [33]:
def upload_file(path):
    """Upload a file to the generative AI server and return the response.
    
    Args:
        path (str): The path to the file being uploaded.
    
    Returns:
        object: Server's response after file upload.
    """
    if path is None:
        return None
    return genai.upload_file(path=path)

<h3>Class to help send frame files to gemini</h3>

In [34]:
def get_timestamp(filename):
    """Extract the timestamp from a file name based on a predefined format.
    
    Args:
        filename (str): Filename from which to extract the timestamp.
    
    Returns:
        int or None: Extracted timestamp if format is correct, otherwise None.
    """
    parts = filename.split("_frame")
    if len(parts) != 2:
        return None  # Filename format is incorrect
    return parts[1].split('_')[0]

class File:
    """A class to represent a file and its related metadata for processing."""
    def __init__(self, file_path: str, display_name: str = None):
        self.file_path = file_path
        self.display_name = display_name if display_name else file_path
        self.timestamp = get_timestamp(file_path)

    def set_file_response(self, response):
        """Associate a server response with the file.
        
        Args:
            response (object): The response received after uploading the file.
        """
        self.response = response

<h2>Aggregate Data</h2>

<h3>Url -> (Video and Audio) -> Gemini</h3>

In [35]:
def video_to_gemini(frame_directory: str):
    """Returns a list of timestamps and responses for the uploaded video frames for gemini"""
    # Retrieve and sort video frames from the directory
    files = os.listdir(frame_directory)
    files = sorted(files)
    files_to_upload = [File(os.path.join(frame_directory, file)) for file in files]

    full_video = False  # Flag to control whether to process all video files
    
    uploaded_files = []
    print(f'Uploading {len(files_to_upload) if full_video else 10} files. This might take a bit...')

    # Upload selected video files to the server
    for file in files_to_upload if full_video else files_to_upload[40:50]:
        print(f'Uploading: {file.file_path}...')
        response = genai.upload_file(path=file.file_path)
        file.set_file_response(response)
        uploaded_files.append(file)

    print(f"Completed file uploads!\n\nUploaded: {len(uploaded_files)} files")
    
    return [attr for file in uploaded_files for attr in (file.timestamp, file.response)]

In [36]:
def audio_to_gemini(audio_file_path: str):
    """Returns a list of audio context for gemini"""
    return upload_file(audio_file_path)

<h3>Example</h3>

In [38]:
# Example video url
url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
save_path = "content/videos"

# Download video and extract frames
video_file_path, audio_file_path = download_youtube_video(url, save_path)
frame_directory = extract_frame_from_video(video_file_path)

Downloading content from https://www.youtube.com/watch?v=dQw4w9WgXcQ...
Video downloaded to c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp4
Audio downloaded to c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp3
Extracting c:\Users\JR\Desktop\coding\TeachMe\server\experiments\content/videos\Never-Gonna-Give-You-Up.mp4 at 1 frame per second. This might take a bit...
Completed video frame extraction!

Extracted: 213 frames


In [40]:
# Generate a new model
model = generate_new_model()

In [42]:
# Get the audio and video contexts
audio_context = audio_to_gemini(audio_file_path)
video_context = video_to_gemini(frame_directory)

Uploading 10 files. This might take a bit...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_40.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_41.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_42.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_43.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_44.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_45.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_46.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_47.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_48.jpg...
Uploading: content/frames/Never-Gonna-Give-You-Up\vid_frame00_49.jpg...
Completed file uploads!

Uploaded: 10 files


In [44]:
audio_context

<google.generativeai.types.file_types.File at 0x20a6bfaaf10>

In [45]:
video_context

['00',
 <google.generativeai.types.file_types.File at 0x20a6923f150>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6924d5d0>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bf33fd0>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bf722d0>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bd41d90>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a664d63d0>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bfb7f50>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6b9d7f10>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bf63510>,
 '00',
 <google.generativeai.types.file_types.File at 0x20a6bf73550>]

In [47]:
prompt = "Describe the context"
response = model.generate_content([prompt, audio_context] + video_context, request_options={"timeout": 1000})

In [48]:
response.text

'The music video is for the song "Never Gonna Give You Up" by Rick Astley. The video features Astley singing and dancing in various locations, including a warehouse, a church, and a street. The video is intercut with shots of a woman dancing and a man running. The video has a retro feel, with Astley\'s clothing and hairstyle being typical of the 1980s. The video is also notable for its use of the "Rickroll" meme, in which people are tricked into watching the video by clicking on a link that appears to be something else.'