# RSS Feed Agent

1. Write up the tools necessary to:
   1. Search websites for new info
   2. Write an organized report to a file
2. Write up the agents and setup their communication
3. Test the pipeline

In [1]:
import os
import getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"var: ")

_set_env("OPENAI_API_KEY")

In [1]:
import requests
from bs4 import BeautifulSoup
import time


def google_search(query: str, num_results: int = 2, max_chars: int = 500) -> list:
    import requests
    from bs4 import BeautifulSoup
    import time
    
    url = "https://customsearch.googleapis.com/customsearch/v1"
    params = {
        "key": os.environ["GOOGLE_API_KEY"],
        "cx": os.environ["GOOGLE_CSE_ID"],
        "q": query,
        "num": num_results
    }
    
    response = requests.get(url, params=params)
    
    if response.status_code != 200:
        raise Exception(f"Error in API request: {response.status_code}")
        
    results = response.json().get("items", [])
    
    def get_page_content(url: str) -> str:
        try:
            response = requests.get(url, timeout=10)
            soup = BeautifulSoup(response.content, "html.parser")
            text = soup.get_text(separator=" ", strip=True)
            return text[:max_chars]
        except Exception as e:
            print(f"Error fetching {url}: {str(e)}")
            return ""
            
    enriched_results = []
    for item in results:
        body = get_page_content(item["link"])
        enriched_results.append({
            "title": item["title"],
            "link": item["link"], 
            "snippet": item["snippet"],
            "body": body
        })
        time.sleep(1)
        
    return enriched_results

In [3]:
import os

output = google_search("hacker news AI")

Exception: Error in API request: 400

In [4]:
for result in output:
    print(result)

NameError: name 'output' is not defined

In [5]:
def write_report(summary_insights: str, report_file: str):
    """
    Write a summary of the insights to a file
    """
    with open(report_file, "w") as f:
        f.write(summary_insights)

write_report("AI is the future", "report.txt")

In [6]:
from autogen_core.tools import FunctionTool

search_tool = FunctionTool(
    func=google_search,
    name="search_tool",
    description="Search the web for information",
)

write_report_tool = FunctionTool(
    func=write_report,
    name="write_report_tool",
    description="Write a summary of the insights to a file",
)


# Let's build out Agents!

We'll have 3 agents:
1. To Search the web
2. The extract insights from web results
3. Write those insights to file

In [7]:
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core import CancellationToken

In [8]:
search_agent = AssistantAgent(
    name="search_agent",
    model_client=OpenAIChatCompletionClient(
        model="gpt-4o",
    ),
    system_message="You research the web for information and you communicate your findings to the insights_agent",
    tools=[search_tool],
)

model_client_structured = OpenAIChatCompletionClient(
    model="gpt-4o",
    response_format={"type": "json_object"},
)

insights_agent = AssistantAgent(
    name="insights_agent",
    model_client=model_client_structured,
    system_message="You 10 extract insights from the web results and you communicate your findings to the write_report_agent using a json structure.",
)

write_report_agent = AssistantAgent(
    name="write_report_agent",
    model_client=OpenAIChatCompletionClient(
        model="gpt-4o",
    ),
    system_message="You write a report from the insights provided by the insights_agent using the write_report_tool. You always output\
        the word 'DONE' when you are finished.",
    tools=[write_report_tool],
)

In [9]:
cancellation_token = CancellationToken()

In [10]:
from autogen_agentchat.conditions import TextMentionTermination

In [None]:
from autogen_agentchat.teams import RoundRobinGroupChat

do_team = RoundRobinGroupChat(
    [search_agent, insights_agent, write_report_agent],
    termination_condition=TextMentionTermination(text="DONE"),
)

response = await do_team.run(task="Search hacker news for the latest open source LLMs and extact the best insights about the latest models into a file and call it rss-feed-do.md")

print(response)