<a href="https://colab.research.google.com/github/dannesbitt/GAIA-Agent/blob/main/GAIA_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
%pip install -q langgraph langchain_openai langchain_huggingface google-cloud-speech beautifulsoup4

In [16]:
import os
import re
import requests
import json
import base64
from typing import TypedDict, List
from openai import OpenAI
from langgraph.graph import Graph, END
from google.colab import userdata
from urllib.parse import urlparse, parse_qs
from google.cloud import speech_v1p1beta1 as speech
from bs4 import BeautifulSoup

# Set up OpenAI client
OPEN_API_KEY = userdata.get('OPENAI_API_KEY')
client = OpenAI(api_key=OPEN_API_KEY)

# Set up Google API key and Custom Search Engine ID for web search
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
GOOGLE_CSE_ID = userdata.get('GOOGLE_CSE_ID')  # Custom Search Engine ID

fetch_full_content = True  # Set to True to fetch full content of search results

# Define the state structure
class State(TypedDict):
    messages: List[dict]
    tool_calls: List[dict]
    final_response: str
    needs_tool_call: bool

# Define available tools, including the new "web_search" tool
tools = [
    # Other tools...
    {
        "type": "function",
        "function": {
            "name": "web_search",
            "description": "Perform a web search to find information. Can optionally fetch full content of the top 3 results.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The search query"
                    },
                    "fetch_full_content": {
                        "type": "boolean",
                        "description": "Whether to fetch and include full content of the top 3 search results. Defaults to False."
                    }
                },
                "required": ["query"]
            }
        }
    }
    # Other tools...
]

def execute_tool(tool_call):
    """Executes the specified tool and returns the result."""
    print("execute_tool : ", {tool_call.function.name})
    if tool_call.function.name == "get_weather":
        return "It's sunny today."
    elif tool_call.function.name == "web_search":
        try:
            args = json.loads(tool_call.function.arguments)
            query = args["query"]
#            fetch_full_content = args.get("fetch_full_content", False)  # Default to False if not provided
            print("execute_tool query: ", query)
            api_key = GOOGLE_API_KEY
            cse_id = GOOGLE_CSE_ID
            if not api_key or not cse_id:
                return "API key or Search Engine ID not set."
            search_url = f"https://www.googleapis.com/customsearch/v1?key={api_key}&cx={cse_id}&q={query}"
            response = requests.get(search_url)
            response.raise_for_status()
            search_results = response.json()
            results = []
            for item in search_results.get('items', [])[:3]:  # Limit to top 3 results
                title = item.get('title', 'No title')
                snippet = item.get('snippet', 'No snippet')
                if fetch_full_content:
                    url = item.get('link')
                    try:
                        page_response = requests.get(url, timeout=5)  # 5-second timeout
                        page_response.raise_for_status()
                        # Check if the content is HTML
                        if page_response.headers.get('Content-Type', '').startswith('text/html'):
                            soup = BeautifulSoup(page_response.text, 'html.parser')
                            # Remove scripts and styles
                            for script in soup(["script", "style"]):
                                script.decompose()
                            full_content = soup.get_text(separator=' ', strip=True)
                        else:
                            full_content = "Full content not available (non-HTML content)"
                    except Exception as e:
                        full_content = f"Full content not available: {str(e)}"
                    results.append(f"Title: {title}\nSnippet: {snippet}\nFull Content: {full_content}\n")
                else:
                    results.append(f"Title: {title}\nSnippet: {snippet}\n")
            return "\n---\n".join(results) if results else "No results found."
        except (json.JSONDecodeError, KeyError) as e:
            return f"Error in tool call arguments: {e}"
        except Exception as e:
            return f"Error performing web search: {e}"
    return "Tool not found."

# Modified helper function to fetch and process a specific file
def fetch_specific_file(task_id, file_name):
    """Fetches the content of a specific file. Returns content or path based on file type."""
    # Define image extensions
    IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp')
    url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        if file_name.lower().endswith('.mp3'):
            # Save the audio content to a temporary file
            audio_path = '/content/temp_audio.mp3'
            with open(audio_path, 'wb') as f:
                f.write(response.content)
            # Transcribe the audio
            transcript = transcribe_audio(audio_path)
            return transcript if transcript else "Transcription failed.", None
        elif file_name.lower().endswith(IMAGE_EXTENSIONS):
            # Save the image to a temporary file
            image_path = f"/content/temp_image{os.path.splitext(file_name)[1]}"
            with open(image_path, 'wb') as f:
                f.write(response.content)
            return None, image_path
        else:
            # For text files, return the content as text
            return response.text, None
    except Exception as e:
        print(f"Error fetching file {file_name} for task_id {task_id}: {e}")
        return None, None

# Helper function to encode image to base64
def encode_image_to_base64(image_path):
    try:
        with open(image_path, "rb") as image_file:
            encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
        return encoded_string
    except Exception as e:
        print(f"Error encoding image at {image_path}: {e}")
        return None

# Helper function to extract YouTube video ID from a URL (unchanged)
def extract_youtube_id(url):
    parsed = urlparse(url)
    if parsed.netloc == 'www.youtube.com' and parsed.path == '/watch':
        query = parse_qs(parsed.query)
        return query.get('v', [None])[0]
    elif parsed.netloc == 'youtu.be':
        return parsed.path[1:] if parsed.path else None
    return None

# Helper function to parse SBV caption format (unchanged)
def parse_sbv(sbv_text):
    blocks = sbv_text.strip().split('\n\n')
    transcript = []
    for block in blocks:
        lines = block.split('\n')
        if len(lines) > 1:
            transcript.append(' '.join(lines[1:]))
    return ' '.join(transcript)

# Helper function to fetch YouTube transcript using YouTube Data API v3 (unchanged)
def fetch_youtube_transcript(video_id, api_key):
    print("Fetching transcript for video ID:", video_id)
    if not api_key:
        print("YouTube API key not set, cannot fetch transcript.")
        return None
    try:
        list_url = f'https://www.googleapis.com/youtube/v3/captions?part=snippet&videoId={video_id}&key={api_key}'
        list_response = requests.get(list_url).json()
        if 'items' not in list_response or not list_response['items']:
            return None
        caption_id = list_response['items'][0]['id']
        download_url = f'https://www.googleapis.com/youtube/v3/captions/{caption_id}?tfmt=sbv&key={api_key}'
        sbv_text = requests.get(download_url).text
        transcript = parse_sbv(sbv_text)
        return transcript
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: {e}")
        return None

# Helper function to download audio file (unchanged)
def download_audio(url, save_path):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    except Exception as e:
        print(f"Error downloading audio from {url}: {e}")

# Helper function to transcribe audio using Google Cloud Speech-to-Text (unchanged)
def transcribe_audio(audio_path):
    try:
        client = speech.SpeechClient()
        with open(audio_path, 'rb') as audio_file:
            content = audio_file.read()
        audio = speech.RecognitionAudio(content=content)
        config = speech.RecognitionConfig(
            encoding=speech.RecognitionConfig.AudioEncoding.MP3,
            sample_rate_hertz=16000,  # Adjust if needed
            language_code='en-US',
        )
        response = client.recognize(config=config, audio=audio)
        transcript = ' '.join([result.alternatives[0].transcript for result in response.results])
        return transcript
    except Exception as e:
        print(f"Error transcribing audio at {audio_path}: {e}")
        return None

# Define the nodes
def input_node(state: State) -> State:
    """Fetches a question from the API, processes any additional data (files, YouTube links, audio), and constructs the initial user message."""
    if not state['messages']:
        # Add system message to inform the LLM about available tools
        state['messages'].append({
            "role": "system",
            "content": (
                "You are an assistant that can use tools to answer questions and process images. "
                "Available tools include 'web_search' for searching the web and 'get_weather' for weather information. "
                "When searching, be aware of synonyms for key terms: for example, 'equine veterinarian' may also be referred to as 'horse doctor', 'veterinary surgeon', 'animal doctor', or 'vet'."
            )
        })
        try:
            response = requests.get('https://agents-course-unit4-scoring.hf.space/random-question')
            response.raise_for_status()
            data = response.json()
            print("Full response body:\n", json.dumps(data, indent=4))
            question = data['question']
            print("Question:", question)
            file_name = data.get('file_name', None)
            task_id = data.get('task_id', None)
            print("File Name:", file_name)
            print("Task ID:", task_id)
        except Exception as e:
            print(f"Error fetching question: {e}")
            question = "What is the meaning of life?"
            file_name = None
            task_id = None

        # Construct the user message as a list for multi-part content
        user_message_content = [{"type": "text", "text": f"Question: {question}\n\n"}]
        if file_name and task_id:
            content, image_path = fetch_specific_file(task_id, file_name)
            if content:
                if file_name.lower().endswith('.mp3'):
                    user_message_content.append({"type": "text", "text": f"Transcript of {file_name}:\n{content}\n\n"})
                else:
                    user_message_content.append({"type": "text", "text": f"Content of {file_name}:\n{content}\n\n"})
            elif image_path:
                # Encode the image and include it in the prompt
                encoded_image = encode_image_to_base64(image_path)
                if encoded_image:
                    user_message_content.append({
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/{os.path.splitext(file_name)[1][1:]};base64,{encoded_image}"
                        }
                    })
                else:
                    user_message_content.append({"type": "text", "text": f"Failed to encode image {file_name}\n\n"})
            else:
                user_message_content.append({"type": "text", "text": f"Failed to fetch or process content of {file_name}\n\n"})

        # Check for YouTube links in the question
        youtube_urls = [url for url in re.findall(r'https?://\S+', question) if 'youtube.com' in url or 'youtu.be' in url]
        if youtube_urls:
            video_id = extract_youtube_id(youtube_urls[0])
            if video_id:
                transcript = fetch_youtube_transcript(video_id, GOOGLE_API_KEY)
                if transcript:
                    print("YouTube transcript: ", transcript)
                    user_message_content.append({"type": "text", "text": f"\n\nTranscript of the YouTube video: {transcript}"})
                else:
                    user_message_content.append({"type": "text", "text": "\n\nTranscript of the YouTube video: Not available"})

        # Check for audio file URLs (e.g., MP3) in the question
        audio_urls = [url for url in re.findall(r'https?://\S+', question) if url.lower().endswith('.mp3')]
        if audio_urls:
            audio_url = audio_urls[0]  # Process the first audio URL only
            audio_path = '/content/audio.mp3'
            download_audio(audio_url, audio_path)
            transcript = transcribe_audio(audio_path)
            if transcript:
                user_message_content.append({"type": "text", "text": f"\n\nTranscript of the audio file: {transcript}"})
            else:
                user_message_content.append({"type": "text", "text": "\n\nTranscript of the audio file: Not available"})

        state['messages'].append({"role": "user", "content": user_message_content})
    state['needs_tool_call'] = False
    return state

def llm_node(state: State) -> State:
    """Calls the OpenAI LLM with the current messages and processes the response."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=state['messages'],
        tools=tools,
        tool_choice="auto",
    )
    assistant_message = response.choices[0].message
    state['messages'].append(assistant_message)

    if assistant_message.tool_calls:
        state['needs_tool_call'] = True
    else:
        state['final_response'] = assistant_message.content
        state['needs_tool_call'] = False
    return state

def tool_node(state: State) -> State:
    """Executes tool calls and appends the results to the messages."""
    assistant_message = state['messages'][-1]
    for tool_call in assistant_message.tool_calls:
        result = execute_tool(tool_call)
        print("Tool call result:", result)
        state['messages'].append({
            "role": "tool",
            "content": result,
            "tool_call_id": tool_call.id,
        })
    return state

def output_node(state: State) -> State:
    """Prints the LLM's final response."""
    print("Response:", state['final_response'])
    return state

# Create the graph
graph = Graph()

# Add nodes
graph.add_node("input", input_node)
graph.add_node("llm", llm_node)
graph.add_node("tool", tool_node)
graph.add_node("output", output_node)

# Define edges
graph.add_edge("input", "llm")
graph.add_conditional_edges(
    "llm",
    lambda state: "tool" if state['needs_tool_call'] else "output",
    {"tool": "tool", "output": "output"}
)
graph.add_edge("tool", "input")
graph.add_edge("output", END)

# Set entry point
graph.set_entry_point("input")

# Compile the graph
app = graph.compile()

# Run the workflow
initial_state = {
    "messages": [],
    "tool_calls": [],
    "final_response": "",
    "needs_tool_call": False
}

# Define the config with recursion_limit
config = {"recursion_limit": 75}  # Adjust the value as needed

# Run the workflow with the config
result = app.invoke(initial_state, config=config)
print("Final State:", result)


Full response body:
 {
    "task_id": "a0c07678-e491-4bbc-8f0b-07405144218f",
    "question": "Who are the pitchers with the number before and after Taish\u014d Tamai's number as of July 2023? Give them to me in the form Pitcher Before, Pitcher After, use their last names only, in Roman characters.",
    "Level": "1",
    "file_name": ""
}
Question: Who are the pitchers with the number before and after Taishō Tamai's number as of July 2023? Give them to me in the form Pitcher Before, Pitcher After, use their last names only, in Roman characters.
File Name: 
Task ID: a0c07678-e491-4bbc-8f0b-07405144218f
execute_tool :  {'web_search'}
execute_tool query:  Taishō Tamai baseball card number July 2023
Tool call result: Title: andrewrreed/agents-benchmark-eval-results · Datasets at Hugging ...
Snippet: The players with numbers before and after Taishō Tamai's number in Japanese professional baseball as of July 2023 (last names only, in Roman characters) are ...
Full Content: andrewrreed/agent

BadRequestError: Error code: 400 - {'error': {'message': "This model's maximum context length is 128000 tokens. However, your messages resulted in 186557 tokens (186480 in the messages, 77 in the functions). Please reduce the length of the messages or functions.", 'type': 'invalid_request_error', 'param': 'messages', 'code': 'context_length_exceeded'}}