In [2]:
from typing import List, Tuple
from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import CouldNotRetrieveTranscript
import urllib.request
import re, os
import numpy as np
import pandas as pd
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.core.llms import ChatMessage
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding

In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
local_llm = Ollama(
    model="mistral", 
    request_timeout=60.0
)

local_embed_model = OllamaEmbedding(
    model_name="mxbai-embed-large",
    base_url="http://localhost:11434",
    ollama_additional_kwargs={"mirostat": 0},
)

llm = AzureOpenAI(
    engine = "gpt-35-turbo",
    model = "gpt-35-turbo",
    api_key = os.getenv('AZURE_OPENAI_API_KEY'),  
    api_version = "2024-02-01",
    azure_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT')
)

embed_model = AzureOpenAIEmbedding(
    model = "text-embedding-3-large",
    deployment_name = "text-embedding-3-large",
    api_key = os.getenv('AZURE_OPENAI_API_KEY'),
    api_version = "2024-02-01",
    azure_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT')
)

Settings.llm = llm
Settings.embed_model = embed_model

In [5]:
def get_youtube_videos(search_keyword: str) -> list:
    html = urllib.request.urlopen("https://www.youtube.com/results?search_query=" + search_keyword.replace(' ','+'))
    video_ids = re.findall(r"watch\?v=(\S{11})", html.read().decode())
    unique_ids = []
    for id in video_ids:
        if len(unique_ids) == 10:
            break

        if id not in unique_ids:
            try:
                YouTubeTranscriptApi.get_transcript(id)
                unique_ids.append(id)
            except CouldNotRetrieveTranscript:
                pass
            
    return ["https://www.youtube.com/watch?v="+i for i in unique_ids]

get_youtube_videos_tool = FunctionTool.from_defaults(
    fn=get_youtube_videos,
    description="Returns list of 10 youtube video links for given search keyword."
)

In [6]:
def summarize_text(text: str) -> str:
    prompt = f"Summarize the following text in a concise way:\n\n{text}"
    
    response = local_llm.chat([
        ChatMessage(role="user",content=prompt)
    ])
    
    return response.content

def youtube_links_to_summary(youtube_links: List[str]) -> bool:
    loader = YoutubeTranscriptReader()
    documents = loader.load_data(ytlinks=youtube_links)
    pd.DataFrame([{'doc_id':doc.doc_id,'text':summarize_text(doc.text)} for doc in documents]).to_csv("top_10_summaries.csv",index=False, sep="|")
    return True

summarize_youtube_video_tool = FunctionTool.from_defaults(
    fn=youtube_links_to_summary,
    description="Returns True if summaries are stored in file."
)

In [7]:
def cosine_similarity(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

def get_top_n_similar_videos(top_n: int) -> List[dict]:
    df = pd.read_csv("top_10_summaries.csv",sep="|")
    documents = []
    for t in df.itertuples():
        documents.append(
            Document(
                doc_id=t.doc_id,
                text=t.text,
                metadata={'video_id':t.doc_id},
                embedding=local_embed_model.get_text_embedding(t.text)
            )
        )

    sim_matrix = np.zeros(shape=(10,10))
    for i in range(len(documents)):
        for j in range(i+1,len(documents)):
            sim_matrix[i][j] = cosine_similarity(documents[i].embedding,documents[j].embedding)
    
    flat_sim_matrix = sim_matrix.flatten()
    indices = np.argpartition(flat_sim_matrix, -top_n)[-top_n:]
    indices = np.flip(indices[np.argsort(flat_sim_matrix[indices])])

    most_similar_pairs = []
    for idx in indices:
        max_idx = np.unravel_index(idx,sim_matrix.shape)
        most_similar_pairs.append({
            'video1': "https://www.youtube.com/watch?v="+documents[max_idx[0]].doc_id,
            'video2': "https://www.youtube.com/watch?v="+documents[max_idx[1]].doc_id,
            'similarity': sim_matrix[max_idx[0]][max_idx[1]]
        })
    
    return most_similar_pairs

most_similar_videos_tool = FunctionTool.from_defaults(
    fn=get_top_n_similar_videos,
    description="Returns list of pairs of youtube links with most similarity."
)

In [8]:
agent = ReActAgent.from_tools(
    tools=[get_youtube_videos_tool,summarize_youtube_video_tool,most_similar_videos_tool],
    llm=llm,
    verbose=True,
    context="Purpose: The primary role of this agent is to search youtube for given keyword and find out which videos are most similar to each other."
)

In [9]:
agent.query("Find two most similar videos for Iphone 16 pro max review.")

> Running step e0d1724c-22f4-4c64-972c-cfadc4c99b9a. Step input: Find two most similar videos for Iphone 16 pro max review.
[1;3;38;5;200mThought: The current language of the user is: English. I need to use a tool to help me answer the question.
Action: get_youtube_videos
Action Input: {'search_keyword': 'Iphone 16 pro max review'}
[0m[1;3;34mObservation: ['https://www.youtube.com/watch?v=Lqc87JD1xQ4', 'https://www.youtube.com/watch?v=xuaf3anGj64', 'https://www.youtube.com/watch?v=xQwfnYh2dmY', 'https://www.youtube.com/watch?v=1yfANDX8Q-g', 'https://www.youtube.com/watch?v=sLEhSwk6EZM', 'https://www.youtube.com/watch?v=fRJQ-I0hArU', 'https://www.youtube.com/watch?v=XHqOfJ8jTzU', 'https://www.youtube.com/watch?v=Yl_19rCQQB8', 'https://www.youtube.com/watch?v=KoaOxIq-HGU', 'https://www.youtube.com/watch?v=tYJWcs-qnAc']
[0m> Running step b0611f63-b765-49d1-82ac-4cf017354d6d. Step input: None
[1;3;38;5;200mThought: Now that I have the YouTube links, I can use the "get_top_n_similar_vi

Response(response='The two most similar videos for "Iphone 16 pro max review" are:\n1. [https://www.youtube.com/watch?v=5_yMa3hzCLs]\n2. [https://www.youtube.com/watch?v=2CvO99eGMLk]', source_nodes=[], metadata=None)

In [10]:
agent.query("Find 3 pairs most similar videos for Iphone 16 pro max review.")

> Running step 8b9bc7e6-be67-432b-b811-6e5d20de385f. Step input: Find 3 pairs most similar videos for Iphone 16 pro max review.
[1;3;38;5;200mThought: The user wants to find the three pairs of the most similar videos for the search query "Iphone 16 pro max review" on YouTube.
Action: get_youtube_videos
Action Input: {'search_keyword': 'Iphone 16 pro max review'}
[0m[1;3;34mObservation: ['https://www.youtube.com/watch?v=Lqc87JD1xQ4', 'https://www.youtube.com/watch?v=chwb1IYx-bQ', 'https://www.youtube.com/watch?v=YiYTPI9TWeI', 'https://www.youtube.com/watch?v=2wojW0VES9c', 'https://www.youtube.com/watch?v=rjKTx4epl0Y', 'https://www.youtube.com/watch?v=ukM5QGM4bZ8', 'https://www.youtube.com/watch?v=x6O_p6Rm_Mw', 'https://www.youtube.com/watch?v=6-ISYKo48qo', 'https://www.youtube.com/watch?v=U08urz1w87c', 'https://www.youtube.com/watch?v=utwniUUnSUc']
[0m> Running step df71bfdd-0b63-4e63-a0c8-8f9f27d7377f. Step input: None
[1;3;38;5;200mThought: I have retrieved the YouTube links for 

Response(response='The three pairs of the most similar videos for "Iphone 16 pro max review" are:\n1. Video 1: [https://www.youtube.com/watch?v=5_yMa3hzCLs], Video 2: [https://www.youtube.com/watch?v=2CvO99eGMLk], Similarity: 0.821\n2. Video 1: [https://www.youtube.com/watch?v=2CvO99eGMLk], Video 2: [https://www.youtube.com/watch?v=IA62EZx_HFE], Similarity: 0.817\n3. Video 1: [https://www.youtube.com/watch?v=K-JGaqfIOmI], Video 2: [https://www.youtube.com/watch?v=5_yMa3hzCLs], Similarity: 0.812', source_nodes=[], metadata=None)