In [None]:
import dotenv
import os
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
import nest_asyncio
import uuid
import asyncio
import aiohttp
import yt_dlp
import tempfile
from pathlib import Path
from llama_index.core.agent.workflow import AgentWorkflow, ToolCallResult, AgentStream
from typing import List

# For async tool calls in a notebook
nest_asyncio.apply()

dotenv.load_dotenv()

HF_TOKEN = os.getenv("HF_API_KEY")
llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-32B-Instruct", token=HF_TOKEN)

# Whisper HFInference API
WHISPER_HF_API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3"

## Define tools for transcription and video fetching

In [None]:
async def download_youtube_audio(url: str) -> str:
    """
    Downloads a YouTube video and extracts its audio as an MP3 file, storing it in a temporary directory.
    Returns the path to the downloaded MP3 file and the title of the video.
    """
    temp_dir = Path(tempfile.gettempdir())  # Use system temp directory
    audio_file_id = str(uuid.uuid4())
    output_dir = str(temp_dir / f'{audio_file_id}')

    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': str(output_dir),
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'quiet': True,
    }

    loop = asyncio.get_running_loop()
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            info_dict = await loop.run_in_executor(None, ydl.extract_info, url)
            video_title = info_dict.get('title', 'Unknown Title')
        except Exception as e:
            return str(e)
    
    audio_file = f"{output_dir}.mp3"
    response = f"Audio file: {audio_file}, Title: {video_title}"
    return response

async def transcribe_audio(audio_file: str) -> str:
    """
    Asynchronously transcribes an MP3 audio file into text using the Hugging Face Inference API.
    """

    # Load HF token
    HF_TOKEN = os.getenv("HF_API_KEY")
    headers = {"Authorization": f"Bearer {HF_TOKEN}"}

    # Verify audio file ends with .mp3 and remove duplicate '.mp3' 
    if not audio_file.endswith('.mp3'):
        return "Audio file must be in MP3 format."

    # Using aiohttp for non-blocking HTTP requests
    async with aiohttp.ClientSession() as session:
        with open(audio_file, "rb") as f:
            audio_data = f.read()

        # Send the audio file for transcription
        async with session.post(WHISPER_HF_API_URL, headers=headers, data=audio_data) as response:
            result = await response.json()

    # Return transcription text or error message
    return result.get("text", "Transcription failed for {audio_file}. Either no word was detected or there is something wrong with the audio file.")

async def video_information_response(summaries: List[str], titles: List[str], topics: List[str], urls: List[str]) -> str:
    """ Formats the response with the summaries, titles, topics, and URLs for all videos in a multi-line string."""
    # Ensure all lists have the same length
    if not (len(summaries) == len(titles) == len(topics) == len(urls)):
        return "Error: Input lists must have the same length."

    # Construct a formatted string with a neat layout
    formatted_response = []
    
    for i in range(len(summaries)):
        formatted_response.append(f"Video {i+1}:")
        formatted_response.append(f"  Title: {titles[i]}")
        formatted_response.append(f"  URL: {urls[i]}")
        formatted_response.append(f"  Topic: {topics[i]}")
        formatted_response.append(f"  Summary: {summaries[i]}")
        formatted_response.append("")  # Blank line between videos

    # Join all lines into a single string with line breaks
    return "\n".join(formatted_response)

## Build an Agent workflow and test reasoning

In [None]:
from llama_index.core.workflow import Context

agent = AgentWorkflow.from_tools_or_functions(
    tools_or_functions=[transcribe_audio, download_youtube_audio, video_information_response],
    llm=llm,
    system_prompt="You are an AI assistant that can use tools to transcribe audio from YouTube videos, create summaries of the video's content's, provide contextual information of the videos and answer related questions "
                  "related to the videos. You will NEVER respond to tasks unrelated to your role, instead alert the user that you are unable to perform the task. "
                  "ALWAYS remember to provide summaries, titles, video topics, and video url's for all videos that the user asks for if you successfully transcribe the audio. "
                  "If some video failed to be transcribed, reply only with the succesfull results and a detailed explanation of the errors for the unsuccessful videos. "
                  "Always use the available tools to format the response.",
)


ctx = Context(agent)

handler = agent.run(
    "I want summaries for the following videos: "
    "1. https://www.youtube.com/watch?v=rEDzUT3ymw4 "
    "2. https://www.youtube.com/watch?v=epVW0_iVBX8",
    ctx=ctx
    )
async for ev in handler.stream_events():
    if isinstance(ev, ToolCallResult):
        print("")
        print("Called tool: ", ev.tool_name, ev.tool_kwargs, "=>", ev.tool_output)
    elif isinstance(ev, AgentStream):
        print(ev.delta, end="", flush=True)

resp = await handler


In [None]:
print("\n*******************Agent response********************")
print(resp.response.blocks[-1].text)