In [None]:
!pip install torch==2.2.1
!pip install groq
!pip install sentence-transformers==2.6.0
!pip install llama-index==0.10.20
!pip install python-dotenv==1.0.1
!pip install llama-index-embeddings-huggingface==0.1.4
!pip install llama-index-readers-web==0.1.9
!pip install youtube_transcript_api==0.6.2
!pip install llama-index-readers-youtube-transcript==0.1.4

## Setup API keys

In [None]:
import os
import openai
from dotenv import load_dotenv
from dotenv import load_dotenv, find_dotenv

load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')
assert openai.api_key, 'Please set OPENAI_API_KEY!'

groq_api_key = os.getenv('GROQ_API_KEY')
assert groq_api_key, 'Please set GROQ_API_KEY!'

## Some Utility Functions

In [None]:
from groq import Groq

client = Groq(api_key=groq_api_key)

def groq_chat(system_prompt, query, model="llama-3.1-8b-instant"):
    chat_completion = client.chat.completions.create(
        messages=[
            {
                    "role": "system",
                    "content": system_prompt,
            },
            {
                "role": "user",
                "content": query,
            }
        ],
        model=model,
    )

    return chat_completion.choices[0].message.content


In [None]:
from llama_index.core import (
    Document,
    SimpleDirectoryReader,
    VectorStoreIndex,
    DocumentSummaryIndex,
    SummaryIndex,
    StorageContext,
    load_index_from_storage,
    get_response_synthesizer
)
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader
from llama_index.core.tools import FunctionTool, QueryEngineTool, ToolMetadata
from llama_index.llms.openai import OpenAI
from llama_index.core.node_parser import SentenceSplitter

def get_youtube_transcript_doc(yt_links:list)->Document:
    return YoutubeTranscriptReader().load_data(ytlinks=yt_links)

def build_vector_index(doc:Document, persist_dir=None)->VectorStoreIndex:
    if persist_dir:
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
        index = VectorStoreIndex.load_from_storage(storage_context)
    else:
        index = VectorStoreIndex.from_documents(doc, show_progress=True)
    return index

def build_summary_index(doc:Document, persist_dir=None)->SummaryIndex:
    if persist_dir:
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
        index = DocumentSummaryIndex.load_from_storage(storage_context)
    else:
        # from https://docs.llamaindex.ai/en/stable/examples/index_structs/doc_summary/DocSummary/
        # LLM (gpt-3.5-turbo)
        chatgpt = OpenAI(temperature=0, model="gpt-3.5-turbo")
        splitter = SentenceSplitter(chunk_size=1024)

        # default mode of building the index
        response_synthesizer = get_response_synthesizer(
            response_mode="tree_summarize", use_async=True
        )
        index = DocumentSummaryIndex.from_documents(
            doc,
            llm=chatgpt,
            transformations=[splitter],
            response_synthesizer=response_synthesizer,
            show_progress=True,
        )
    return index

def get_query_engine(index, similarity_top=3):
    return index.as_query_engine(similarity_top_k=similarity_top)

def get_youtube_tool():
    return FunctionTool.from_defaults(fn=get_youtube_transcript_doc)


In [None]:
import nest_asyncio

nest_asyncio.apply()

## Create an example Vector Index from a YouTube Link

In [None]:
# Intro to LLMs, Andrej Karpathy
yt_links = ["https://www.youtube.com/watch?v=zjkBMFhNj_g"]

# AMD at Computex 2024: AMD AI and High-Performance Computing with Dr. Lisa Su
# yt_links = ["https://www.youtube.com/watch?v=MCi8jgALPYA"]

yt_doc = get_youtube_transcript_doc(yt_links)
yt_vector_index = build_vector_index(yt_doc)

yt_engine = get_query_engine(yt_vector_index)
# yt_tool = get_youtube_tool(transcript_doc)

query_engine_tools = [
    QueryEngineTool(
        query_engine=yt_engine,
        metadata=ToolMetadata(
            name="youtube",
            description=(
                "YouTube transcript of Andrej Karpathy's Introduction to LLMs. "
                "Use a detailed plain text question as input to the tool."
            ),
        ),
    ),
]

## Summarize Document

In [None]:
yt_doc = get_youtube_transcript_doc(yt_links)
doc_id = "youtube_doc"
yt_doc[0].doc_id = doc_id
yt_summary_index = build_summary_index(yt_doc)

In [None]:
print(yt_summary_index.get_document_summary(doc_id))

## Query the Index

In [None]:
query = "What's a neural network?"
response = yt_engine.query(query)
print(response)

## Create a ReAct Agent

In [None]:
from llama_index.llms.openai import OpenAI
from llama_index.core.agent import ReActAgent

# llm = OpenAI(model="gpt-4")
llm = LocalLLM()
agent = ReActAgent.from_tools(
    query_engine_tools,
    llm=llm,
    verbose=True,
)

## Test Agent

In [None]:
hint = "(use the youtube tool)"
queries = [
    # "Summarize to just the main takeaways.",
    # "What is a LLM according to Andrej Karpathy?",
    # "How does it work?",
    "How do you train ChatGPT?",
    "What's system 1 vs. system 2 thinking?",
    "What were the two main stages of AlphaGo?",
    "What will an LLM OS be able to do in a few years?",
    "How do you jailbreak an LLM?",
    "How do you do prompt injection?",
]

for query in queries:
    print(f"Query: {query}")
    response = agent.chat(f"{query}\n{hint}")
    print("-------------------------------------------------------------")

## Interactive chat

In [None]:
while True:
    user_input = input()
    if not user_input:
        break
    print(f"User: {user_input}")
    response = agent.chat(user_input)
    print(f"Agent: {response}")

## YouTube Interactive API

In [None]:
import os
from dotenv import load_dotenv
from dotenv import load_dotenv, find_dotenv

youtube_api_key = os.getenv('YOUTUBE_API_KEY')
assert youtube_api_key

### Fetch and process youtube transcript

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
import json
import re

def extract_video_id(url):
    patterns = [
        r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/watch\?v=([^&]+)',
        r'(?:https?:\/\/)?(?:www\.)?youtu\.be\/([^?]+)',
        r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/embed\/([^?]+)',
        r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/v\/([^?]+)',
    ]
    
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    
    return None

def fetch_transcript(video_id):
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return transcript
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def save_transcript(transcript, output_file):
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(transcript, f, ensure_ascii=False, indent=2)

def print_transcript(transcript):
    for entry in transcript:
        start_time = entry['start']
        duration = entry['duration']
        text = entry['text']
        end_time = start_time + duration
        
        # Format timestamp as HH:MM:SS
        start_formatted = format_timestamp(start_time)
        end_formatted = format_timestamp(end_time)
        
        print(f"[{start_formatted} - {end_formatted}] {text}")

def format_timestamp(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

def save_formatted_transcript(transcript, output_file):
    with open(output_file, 'w', encoding='utf-8') as f:
        for entry in transcript:
            start_time = entry['start']
            duration = entry['duration']
            text = entry['text']
            end_time = start_time + duration
            
            start_formatted = format_timestamp(start_time)
            end_formatted = format_timestamp(end_time)
            
            f.write(f"[{start_formatted} - {end_formatted}] {text}\n")

### Playback YouTube video

In [None]:
import os
import tempfile
import subprocess
from pytube import YouTube
import time

def download_youtube_video(video_id, temp_dir=None):
    video_url = f"https://www.youtube.com/watch?v={video_id}"
    
    try:
        yt = YouTube(video_url)
        stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()

        # Create a temporary directory
        if not temp_dir:
            temp_dir = tempfile.mkdtemp()
            print(f"Temporary directory created: {temp_dir}")

        # Download the video
        print("Downloading video...")
        video_path = stream.download(output_path=temp_dir)
        print(f"Video downloaded to: {video_path}")

    except Exception as e:
        print(f"An error occurred while downloading and trimming: {e}")
        return None
    
    return temp_dir, video_path


def trim_youtube_video(video_path, start_time, end_time, temp_dir=None):
    try:
        if not temp_dir:
            temp_dir = tempfile.mkdtemp()
            print(f"Temporary directory created: {temp_dir}")

        # Trim the video using ffmpeg
        output_path = os.path.join(temp_dir, 'video.mp4')
        duration = end_time - start_time
        ffmpeg_command = f'ffmpeg -i "{video_path}" -ss {start_time} -t {duration} -c copy "{output_path}"'

        print("Trimming video...")
        print(f"FFmpeg command: {ffmpeg_command}")
        result = subprocess.run(ffmpeg_command, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            print(f"FFmpeg error: {result.stderr}")
            raise Exception("FFmpeg failed to trim the video")

        print(f"Trimmed video saved to: {output_path}")

        if not os.path.exists(output_path):
            raise FileNotFoundError(f"Trimmed video file not found: {output_path}")

        return temp_dir, output_path

    except Exception as e:
        print(f"An error occurred while downloading and trimming: {e}")
        return None, None


def play_youtube_video(video_path, duration):
    try:
        # Convert to absolute path
        abs_video_path = os.path.abspath(video_path)
        print(f"Absolute video path: {abs_video_path}")

        if not os.path.exists(abs_video_path):
            raise FileNotFoundError(f"Video file not found: {abs_video_path}")

        print(f"Attempting to play video: {abs_video_path}")

        # Play the trimmed video with the default player
        if os.name == 'nt':  # For Windows
            print("Using subprocess method...")
            subprocess.Popen(['start', '', abs_video_path], shell=True)
        else:
            raise NotImplementedError("This script currently supports Windows only")

        print("Default player command executed.")

        # Wait for the duration of the video
        print(f"Waiting for {duration} seconds...")
        time.sleep(duration + 2)  # Add a small buffer

        print("Playback duration completed.")

    except Exception as e:
        print(f"An error occurred while playing: {e}")

    print("Function play_youtube_video completed.")

In [None]:
import os
import tempfile
import subprocess

def trim_and_stitch_youtube_video(video_path, time_segments, output_path, temp_dir=None):
    try:
        assert os.path.isfile(file_path), f"ERROR: {video_path} doesn't exist"

        if not temp_dir:
            temp_dir = tempfile.mkdtemp()
            print(f"Temporary directory created: {temp_dir}")

        # List to store paths of trimmed segments
        trimmed_segments = []

        # Trim each segment
        for i, (start_time, end_time) in enumerate(time_segments):
            segment_path = os.path.join(temp_dir, f'segment_{i}.mp4')
            duration = end_time - start_time
            ffmpeg_command = f'ffmpeg -i "{video_path}" -ss {start_time} -t {duration} -c copy "{segment_path}"'

            print(f"Trimming segment {i+1}/{len(time_segments)}...")
            print(f"FFmpeg command: {ffmpeg_command}")
            result = subprocess.run(ffmpeg_command, shell=True, capture_output=True, text=True)

            if result.returncode != 0:
                print(f"FFmpeg error: {result.stderr}")
                raise Exception(f"FFmpeg failed to trim segment {i+1}")

            trimmed_segments.append(segment_path)

        # Create a file list for concatenation
        list_file_path = os.path.join(temp_dir, 'file_list.txt')
        with open(list_file_path, 'w') as list_file:
            for segment in trimmed_segments:
                list_file.write(f"file '{segment}'\n")

        # Concatenate all segments
        concat_command = f'ffmpeg -f concat -safe 0 -i "{list_file_path}" -c copy "{output_path}" -y'
        print("Concatenating segments...")
        print(f"FFmpeg command: {concat_command}")
        result = subprocess.run(concat_command, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            print(f"FFmpeg error: {result.stderr}")
            raise Exception("FFmpeg failed to concatenate the segments")

        print(f"Final video saved to: {output_path}")

        if not os.path.exists(output_path):
            raise FileNotFoundError(f"Final video file not found: {output_path}")

        return temp_dir, output_path

    except Exception as e:
        print(f"An error occurred while trimming and stitching: {e}")
        return None, None

# Example usage:
video_path = "path/to/your/video.mp4"
time_segments = [(10, 20), (30, 40), (50, 60)]  # List of (start_time, end_time) tuples
output_path = "path/to/output/final_video.mp4"

temp_dir, final_video_path = trim_and_stitch_youtube_video(video_path, time_segments, output_path)

if temp_dir and final_video_path:
    print("Video processing completed successfully!")
    # Don't forget to clean up the temp_dir when you're done
else:
    print("Video processing failed.")

In [None]:
# AMD at Computex 2024: AMD AI and High-Performance Computing with Dr. Lisa Su
youtube_url = 'https://www.youtube.com/watch?v=MCi8jgALPYA'

start_time = 45  # Start time in seconds
end_time = 67    # End time in seconds

temp_dir, video_path = download_youtube_video(youtube_url, './')
temp_dir, video_path = trim_youtube_video(video_path, start_time, end_time, './')
# video_path = "C:\\Users\\kovtchar\\Work\\gaia\\notebooks\\video.mp4"
play_youtube_video(video_path, 10)

In [None]:

# AMD at Computex 2024: AMD AI and High-Performance Computing with Dr. Lisa Su
youtube_url = 'https://www.youtube.com/watch?v=MCi8jgALPYA'

start_time = 45  # Start time in seconds
end_time = 67    # End time in seconds

video_id = extract_video_id(youtube_url)

if video_id:
    print(f"Extracted Video ID: {video_id}")
    transcript = fetch_transcript(video_id)

    if transcript:
        # Save the structured transcript (with timestamps)
        save_transcript(transcript, 'transcript_structured.json')
        save_formatted_transcript(transcript, 'transcript_formatted.txt')
        print_transcript(transcript)

        temp_dir, video_path = download_youtube_segment(video_id, start_time, end_time)
        if temp_dir and video_path:
            try:
                play_video_segment(video_path, end_time - start_time)
            finally:
                # Clean up: remove the temporary directory and its contents
                print(f"Cleaning up temporary directory: {temp_dir}")
                for file in os.listdir(temp_dir):
                    os.remove(os.path.join(temp_dir, file))
                os.rmdir(temp_dir)
        else:
            print("Failed to download and trim the video segment.")
                play_youtube_segment(video_id, start_time, end_time)
    else:
        print("Could not extract a valid YouTube video ID from the provided URL.")


In [None]:
play_youtube_segment(video_id, start_time, end_time)