In [None]:
!pip install openai colorama streamlit beautifulsoup4

# Real World Project: Hacker News ChatBot

<img src="https://drive.google.com/uc?id=1NxdOPhZ09Td4BwOTz5mKtiZX9xD5cxj5" alt="Alt text" width="500"/>

---

In this final lab, we are going to implement the architecture you can see above. It's a ReAct Agent that will choose between three tools (**StoriesTool**, **CommentsTool** and **ContentTool**) to solve the user task.

In addition, we'll implement a Streamlit interface, that will enable us to chat with out agent.

## Building the `scratch_agent` library

We are going to define the `scratch_agent` library. This library contains all the code implemented in the previous lessons. This code will be used by the Streamlit application.

### Creating parent folder

In [None]:
!mkdir -p scratch_agent

### Creating `__init__`

In [None]:
%%writefile scratch_agent/__init__.py

### Creating `utils.py`

In [None]:
# @title
%%writefile scratch_agent/utils.py

"""
This is a collection of helper functions and methods we are going to use in
the Agent implementation. You don't need to know the specific implementation
of these to follow the Agent code. But, if you are curious, feel free to check
them out.
"""

import re
import time

from colorama import Fore
from colorama import Style

from dataclasses import dataclass


def completions_create(client, messages: list, model: str) -> str:
    """
    Sends a request to the client's `completions.create` method to interact with the language model.

    Args:
        client (OpenAI): The OpenAI client object
        messages (list[dict]): A list of message objects containing chat history for the model.
        model (str): The model to use for generating tool calls and responses.

    Returns:
        str: The content of the model's response.
    """
    response = client.chat.completions.create(messages=messages, model=model)
    return str(response.choices[0].message.content)


def build_prompt_structure(prompt: str, role: str, tag: str = "") -> dict:
    """
    Builds a structured prompt that includes the role and content.

    Args:
        prompt (str): The actual content of the prompt.
        role (str): The role of the speaker (e.g., user, assistant).

    Returns:
        dict: A dictionary representing the structured prompt.
    """
    if tag:
        prompt = f"<{tag}>{prompt}</{tag}>"
    return {"role": role, "content": prompt}

def update_chat_history(history: list, msg: str, role: str):
    """
    Updates the chat history by appending the latest response.

    Args:
        history (list): The list representing the current chat history.
        msg (str): The message to append.
        role (str): The role type (e.g. 'user', 'assistant', 'system')
    """
    history.append(build_prompt_structure(prompt=msg, role=role))


class ChatHistory(list):
    def __init__(self, messages: list | None = None, total_length: int = -1):
        """Initialise the queue with a fixed total length.

        Args:
            messages (list | None): A list of initial messages
            total_length (int): The maximum number of messages the chat history can hold.
        """
        if messages is None:
            messages = []

        super().__init__(messages)
        self.total_length = total_length

    def append(self, msg: str):
        """Add a message to the queue.

        Args:
            msg (str): The message to be added to the queue
        """
        if len(self) == self.total_length:
            self.pop(0)
        super().append(msg)



class FixedFirstChatHistory(ChatHistory):
    def __init__(self, messages: list | None = None, total_length: int = -1):
        """Initialise the queue with a fixed total length.

        Args:
            messages (list | None): A list of initial messages
            total_length (int): The maximum number of messages the chat history can hold.
        """
        super().__init__(messages, total_length)

    def append(self, msg: str):
        """Add a message to the queue. The first messaage will always stay fixed.

        Args:
            msg (str): The message to be added to the queue
        """
        if len(self) == self.total_length:
            self.pop(1)
        super().append(msg)

def fancy_print(message: str) -> None:
    """
    Displays a fancy print message.

    Args:
        message (str): The message to display.
    """
    print(Style.BRIGHT + Fore.CYAN + f"\n{'=' * 50}")
    print(Fore.MAGENTA + f"{message}")
    print(Style.BRIGHT + Fore.CYAN + f"{'=' * 50}\n")
    time.sleep(0.5)


def fancy_step_tracker(step: int, total_steps: int) -> None:
    """
    Displays a fancy step tracker for each iteration of the generation-reflection loop.

    Args:
        step (int): The current step in the loop.
        total_steps (int): The total number of steps in the loop.
    """
    fancy_print(f"STEP {step + 1}/{total_steps}")


@dataclass
class TagContentResult:
    """
    A data class to represent the result of extracting tag content.

    Attributes:
        content (List[str]): A list of strings containing the content found between the specified tags.
        found (bool): A flag indicating whether any content was found for the given tag.
    """

    content: list[str]
    found: bool


def extract_tag_content(text: str, tag: str) -> TagContentResult:
    """
    Extracts all content enclosed by specified tags (e.g., <thought>, <response>, etc.).

    Parameters:
        text (str): The input string containing multiple potential tags.
        tag (str): The name of the tag to search for (e.g., 'thought', 'response').

    Returns:
        dict: A dictionary with the following keys:
            - 'content' (list): A list of strings containing the content found between the specified tags.
            - 'found' (bool): A flag indicating whether any content was found for the given tag.
    """
    # Build the regex pattern dynamically to find multiple occurrences of the tag
    tag_pattern = rf"<{tag}>(.*?)</{tag}>"

    # Use findall to capture all content between the specified tag
    matched_contents = re.findall(tag_pattern, text, re.DOTALL)

    # Return the dataclass instance with the result
    return TagContentResult(
        content=[content.strip() for content in matched_contents],
        found=bool(matched_contents),
    )


### Creating `tools.py`

In [None]:
# @title
%%writefile scratch_agent/tools.py

import os
import json
import re
from dataclasses import dataclass
from typing import Callable
from openai import OpenAI
from google.colab import userdata


def get_fn_signature(fn: Callable) -> dict:
    """
    Generates the signature for a given function.

    Args:
        fn (Callable): The function whose signature needs to be extracted.

    Returns:
        dict: A dictionary containing the function's name, description,
              and parameter types.
    """
    fn_signature: dict = {
        "name": fn.__name__,
        "description": fn.__doc__,
        "parameters": {"properties": {}},
    }
    schema = {
        k: {"type": v.__name__} for k, v in fn.__annotations__.items() if k != "return"
    }
    fn_signature["parameters"]["properties"] = schema
    return fn_signature


def validate_arguments(tool_call: dict, tool_signature: dict) -> dict:
    """
    Validates and converts arguments in the input dictionary to match the expected types.

    Args:
        tool_call (dict): A dictionary containing the arguments passed to the tool.
        tool_signature (dict): The expected function signature and parameter types.

    Returns:
        dict: The tool call dictionary with the arguments converted to the correct types if necessary.
    """
    properties = tool_signature["parameters"]["properties"]

    # TODO: This is overly simplified but enough for simple Tools.
    type_mapping = {
        "int": int,
        "str": str,
        "bool": bool,
        "float": float,
    }

    for arg_name, arg_value in tool_call["arguments"].items():
        expected_type = properties[arg_name].get("type")

        if not isinstance(arg_value, type_mapping[expected_type]):
            tool_call["arguments"][arg_name] = type_mapping[expected_type](arg_value)

    return tool_call


class Tool:
    """
    A class representing a tool that wraps a callable and its signature.

    Attributes:
        name (str): The name of the tool (function).
        fn (Callable): The function that the tool represents.
        fn_signature (str): JSON string representation of the function's signature.
    """

    def __init__(self, name: str, fn: Callable, fn_signature: str):
        self.name = name
        self.fn = fn
        self.fn_signature = fn_signature

    def __str__(self):
        return self.fn_signature

    def run(self, **kwargs):
        """
        Executes the tool (function) with provided arguments.

        Args:
            **kwargs: Keyword arguments passed to the function.

        Returns:
            The result of the function call.
        """
        return self.fn(**kwargs)


def tool(fn: Callable):
    """
    A decorator that wraps a function into a Tool object.

    Args:
        fn (Callable): The function to be wrapped.

    Returns:
        Tool: A Tool object containing the function, its name, and its signature.
    """

    def wrapper():
        fn_signature = get_fn_signature(fn)
        return Tool(
            name=fn_signature.get("name"), fn=fn, fn_signature=json.dumps(fn_signature)
        )

    return wrapper()

### Creating `react.py`

In [None]:
# @title
%%writefile scratch_agent/react.py

import json
import re

from colorama import Fore
from openai import OpenAI

from scratch_agent.tools import tool, Tool, validate_arguments
from scratch_agent.utils import ChatHistory, completions_create, extract_tag_content, update_chat_history, build_prompt_structure

BASE_SYSTEM_PROMPT = ""


REACT_SYSTEM_PROMPT = """
You operate by running a loop with the following steps: Thought, Action, Observation.
You are provided with function signatures within <tools></tools> XML tags.
You may call one or more functions to assist with the user query. Don' make assumptions about what values to plug
into functions. Pay special attention to the properties 'types'. You should use those types as in a Python dict.

For each function call return a json object with function name and arguments within <tool_call></tool_call> XML tags as follows:

<tool_call>
{"name": <function-name>,"arguments": <args-dict>, "id": <monotonically-increasing-id>}
</tool_call>

Here are the available tools / actions:

<tools>
%s
</tools>

Example session:

<question>What's the current temperature in Madrid?</question>
<thought>I need to get the current weather in Madrid</thought>
<tool_call>{"name": "get_current_weather","arguments": {"location": "Madrid", "unit": "celsius"}, "id": 0}</tool_call>

You will be called again with this:

<observation>{0: {"temperature": 25, "unit": "celsius"}}</observation>

You then output:

<response>The current temperature in Madrid is 25 degrees Celsius</response>

Additional constraints:

- If the user asks you something unrelated to any of the tools above, answer freely enclosing your answer with <response></response> tags.
"""


class ReactAgent:
    """
    A class that represents an agent using the ReAct logic that interacts with tools to process
    user inputs, make decisions, and execute tool calls. The agent can run interactive sessions,
    collect tool signatures, and process multiple tool calls in a given round of interaction.

    Attributes:
        client (OpenAI): The OpenAI client used to handle model-based completions.
        model (str): The name of the model used for generating responses. Default is "gpt-4o".
        tools (list[Tool]): A list of Tool instances available for execution.
        tools_dict (dict): A dictionary mapping tool names to their corresponding Tool instances.
    """

    def __init__(
        self,
        tools: Tool | list[Tool],
        model: str = "gpt-4o",
        system_prompt: str = BASE_SYSTEM_PROMPT,
        api_key: str = ""
    ) -> None:
        self.client = OpenAI(
            api_key=api_key
        )
        self.model = model
        self.system_prompt = system_prompt
        self.tools = tools if isinstance(tools, list) else [tools]
        self.tools_dict = {tool.name: tool for tool in self.tools}

    def add_tool_signatures(self) -> str:
        """
        Collects the function signatures of all available tools.

        Returns:
            str: A concatenated string of all tool function signatures in JSON format.
        """
        return "".join([tool.fn_signature for tool in self.tools])

    def process_tool_calls(self, tool_calls_content: list) -> dict:
        """
        Processes each tool call, validates arguments, executes the tools, and collects results.

        Args:
            tool_calls_content (list): List of strings, each representing a tool call in JSON format.

        Returns:
            dict: A dictionary where the keys are tool call IDs and values are the results from the tools.
        """
        observations = {}
        for tool_call_str in tool_calls_content:
            tool_call = json.loads(tool_call_str)
            tool_name = tool_call["name"]
            tool = self.tools_dict[tool_name]

            print(Fore.GREEN + f"\nUsing Tool: {tool_name}")

            # Validate and execute the tool call
            validated_tool_call = validate_arguments(
                tool_call, json.loads(tool.fn_signature)
            )
            print(Fore.GREEN + f"\nTool call dict: \n{validated_tool_call}")

            result = tool.run(**validated_tool_call["arguments"])
            print(Fore.GREEN + f"\nTool result: \n{result}")

            # Store the result using the tool call ID
            observations[validated_tool_call["id"]] = result

        return observations

    def run(
        self,
        user_msg: str,
        max_rounds: int = 10,
    ) -> str:
        """
        Executes a user interaction session, where the agent processes user input, generates responses,
        handles tool calls, and updates chat history until a final response is ready or the maximum
        number of rounds is reached.

        Args:
            user_msg (str): The user's input message to start the interaction.
            max_rounds (int, optional): Maximum number of interaction rounds the agent should perform. Default is 10.

        Returns:
            str: The final response generated by the agent after processing user input and any tool calls.
        """
        user_prompt = build_prompt_structure(
            prompt=user_msg, role="user", tag="question"
        )
        if self.tools:
            self.system_prompt += (
                "\n" + REACT_SYSTEM_PROMPT % self.add_tool_signatures()
            )

        chat_history = ChatHistory(
            [
                build_prompt_structure(
                    prompt=self.system_prompt,
                    role="system",
                ),
                user_prompt,
            ]
        )

        if self.tools:
            # Run the ReAct loop for max_rounds
            for _ in range(max_rounds):

                completion = completions_create(self.client, chat_history, self.model)

                response = extract_tag_content(str(completion), "response")
                if response.found:
                    return response.content[0]

                thought = extract_tag_content(str(completion), "thought")
                tool_calls = extract_tag_content(str(completion), "tool_call")

                update_chat_history(chat_history, completion, "assistant")

                print(Fore.MAGENTA + f"\nThought: {thought.content[0]}")

                if tool_calls.found:
                    observations = self.process_tool_calls(tool_calls.content)
                    print(Fore.BLUE + f"\nObservations: {observations}")
                    update_chat_history(chat_history, f"{observations}", "user")

        return completions_create(self.client, chat_history, self.model)


After running the code above, you should see a `scratch_agent` folder in the filesystem, containing all the code implemented in the previous lessons.

## Defining the Tools

First of all, let's define the three Tools. Before, we need to import the Tool abstractions we implemented in Module 3 (`scratch_agent/tool`).

We'll write this file to the parent folder (`./tools.py`) so that we can use it from Streamlit.

In [None]:
%%writefile tools.py

import json
import requests
from bs4 import BeautifulSoup
from scratch_agent.tools import tool

BASE_URL = "https://hacker-news.firebaseio.com/v0"

def fetch_item(item_id: int):
    """
    Fetches details of a story by its ID.

    Args:
        item_id (int): The ID of the item to fetch.

    Returns:
        dict: Details of the story.
    """
    url = f"{BASE_URL}/item/{item_id}.json"
    response = requests.get(url)
    return response.json()

def fetch_story_ids(story_type: str = "top", limit: int = None):
    """
    Fetches the top story IDs.

    Args:
        story_type: The story type. Defaults to top (`topstories.json`)
        limit: The limit of stories to be fetched.

    Returns:
        List[int]: A list of top story IDs.
    """
    url = f"{BASE_URL}/{story_type}stories.json"
    response = requests.get(url)
    story_ids = response.json()

    if limit:
        story_ids = story_ids[:limit]

    return story_ids

def fetch_text(url: str):
    """
    Fetches the text from a URL (if there's text to be fetched). If it fails,
    it will return an informative message to the LLM.

    Args:
        url: The story URL

    Returns:
        A string representing whether the story text or an informative error (represented as a string)
    """
    try:
        response = requests.get(url)
        if response.status_code == 200:

            html_content = response.content
            soup = BeautifulSoup(html_content, 'html.parser')
            text_content = soup.get_text()

            return text_content
        else:
            return f"Unable to fetch content from {url}. Status code: {response.status}"
    except Exception as e:
        return f"An error occurred: {e}"

@tool
def get_hn_stories(limit: int = 5, story_type: str = "top"):
    """
    Fetches the top Hacker News stories based on the provided parameters.

    Args:
        limit (int): The number of top stories to retrieve. Default is 10.
        keywords (List[str]): A list of keywords to filter the top stories.
        story_type (str): The story type

    Returns:
        list[Dict[str, Union[str, int]]]: A list of dictionaries containing
        'story_id', 'title', 'url', and 'score' of the stories.
    """

    if limit:
        story_ids = fetch_story_ids(story_type, limit)
    else:
        story_ids = fetch_story_ids(story_type)

    def fetch_and_filter_stories(story_id):
        return fetch_item(story_id)

    stories = [fetch_and_filter_stories(story_id) for story_id in story_ids]
    fromatted_stories = []

    for story in stories:
        story_info = {
            "title": story.get("title"),
            "url": story.get("url"),
            "score": story.get("score"),
            "story_id": story.get("id"),
        }
        fromatted_stories.append(story_info)

    return fromatted_stories[:limit]


@tool
def get_relevant_comments(story_id: int, limit: int =10):
    """
    Get the most relevant comments for a Hacker News item.

    Args:
        story_id: The ID of the Hacker News item.
        limit: The number of comments to retrieve (default is 10).

    Returns:
        A list of dictionaries, each containing comment details.
    """
    story = fetch_item(story_id)

    if 'kids' not in story:
        return "This item doesn't have comments."

    comment_ids = story['kids']

    comment_details = [fetch_item(cid) for cid in comment_ids]
    comment_details.sort(key=lambda comment: comment.get('score', 0), reverse=True)

    relevant_comments = comment_details[:limit]
    relevant_comments = [comment["text"] for comment in relevant_comments]

    return json.dumps(relevant_comments)

@tool
def get_story_content(story_url: str):
    """
    Gets the content of the story.

    Args:
        story_url: A string representing the story URL

    Returns:
        The content of the story
    """
    return fetch_text(story_url)


# Creating the ReAct Agent

Now that we have all the relevant code from previous modules imported, it's time to define the Agent.

In [None]:
%%writefile hn_bot.py

import os
import re
import math
import json
from google.colab import userdata

from openai import OpenAI

from scratch_agent.react import ReactAgent
from tools import get_hn_stories, get_relevant_comments, get_story_content

def get_hn_bot(api_key: str):
  bot_system_prompt = """You are the Singularity Incarnation of Hacker News.
  The human will ask you for information about Hacker News.
  If you can't find any information  about the question asked
  or the result is incomplete, apologise to the human and ask him if
  you can help him with something else.
  If the human asks you to show him stories, do it using a markdown table.
  The markdown table has the following format:

  story_id | title | url | score"""

  agent = ReactAgent(
      system_prompt=bot_system_prompt,
      tools=[get_hn_stories, get_relevant_comments, get_story_content],
      api_key=api_key
  )
  return agent

# Streamlit Application

Finally, let's create the Streamlit Application. Don't forget to copy your OpenAI API Key, as you'll need it to interact with the app.

In [None]:
%%writefile app.py

import os
import asyncio

from PIL import Image
import streamlit as st

from hn_bot import get_hn_bot

# Set Streamlit page config
st.set_page_config(page_title="Analytics Vidhya HN Bot 🤖📰")
st.title("Analytics Vidhya HN Bot 🤖📰")


# Sidebar - API Key input
with st.sidebar:
    st.markdown("""
    # **Greetings, Digital Explorer!**

    Are you fatigued from navigating the expansive digital realm in search of your daily tech tales
    and hacker happenings? Fear not, for your cyber-savvy companion has descended upon the scene –
    behold the extraordinary **Analytics Vidhya HN Bot**!
    """)
    api_key = st.text_input("Enter your OpenAI API Key:", type="password")
    st.session_state["agent"] = get_hn_bot(api_key=api_key)

# Initialize session state
if "messages" not in st.session_state:
    st.session_state["messages"] = []


def generate_response(question):
    """Generate response while passing conversation history as context."""
    context = "\n".join([msg['bot'] for msg in st.session_state["messages"]])
    response = st.session_state["agent"].run(f"Context: {context} Question: {question}")
    return response

# Display chat history
for msg in st.session_state["messages"]:
    st.chat_message("human").write(msg["user"])
    st.chat_message("ai").write(msg["bot"])

# Chat input handling
if prompt := st.chat_input():
    st.chat_message("human").write(prompt)
    with st.spinner("Thinking ..."):
        response = generate_response(prompt)
        st.chat_message("ai").write(response)

    # Store conversation in history
    st.session_state["messages"].append({"user": prompt, "bot": response})

In [None]:
!npm install localtunnel

In [None]:
!streamlit run app.py --server.address=localhost &>/content/logs.txt &


In [None]:
import urllib
print("Password/Enpoint IP for localtunnel is:",urllib.request.urlopen('https://ipv4.icanhazip.com').read().decode('utf8').strip("\n"))

In [None]:
!npx localtunnel --port 8501