In [None]:
import logging
from dotenv import load_dotenv

load_dotenv()

# Get the root logger
logger = logging.getLogger()
# Create a handler to output log messages to the console
handler = logging.StreamHandler()
# Create a formatter to format log messages
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

In [None]:
import requests
from jsonpath_ng import parse

SEARCHXNG_URL = "http://searxng.cloud.internal/search"
CRAWL4AI_URL = "http://crawl.cloud.internal/md"

# "Passive" functions (external systems)
def find_web_sources(query:str, max_search_urls=1):
    """Find web pages related to a search query"""
    url = SEARCHXNG_URL
    url_query = {'q': query, 'format': 'json', 'language': 'en-US'}
    response = requests.get(url, params=url_query)
    search_results = response.json().get('results', [])
    # Sort by score
    search_results_sorted = sorted(search_results, key=lambda x: x['score'], reverse=True)
    jsonpath_expression = parse('$.[*].url')
    urls = jsonpath_expression.find(search_results_sorted[:max_search_urls])
    urls = [url.value for url in urls]
    return urls

def visit_web_page(url:str):
    """Get the contents of a web page"""
    try:
        response = requests.post(
            CRAWL4AI_URL,
            json={
                "url": url,
                "f": "fit",
                "q": None,
                "c": "0"
            }
        )
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        return data
    except requests.RequestException as e:
        return {"url": url, "markdown": str(e), "is_fail":True}

In [None]:
from local_llms import Llama4Maverick
from lmflux.flow import (
    new_toolbox,
    create_agent,
    tool
)
from lmflux.agents.sessions import Session
from lmflux.agents.structure import Agent
from lmflux.core.components import (SystemPrompt, TemplatedPrompt, Message)

# == Agent Definitions == 
# -- 1. "Expert" maker agent --
expert_maker_llm = Llama4Maverick(SystemPrompt('general.user_preference'))

# Act function
def expert_maker_act_function(agent:Agent, session:Session):
    agent.reset_agent_state()
    message = TemplatedPrompt("expert_maker.instruct", "user").get_message(
        session.context_as_dict()
    )
    response = agent.conversate(message, session)
    session.context.set("persona", response.content)
    agent.log_agent_step(session, "has defined the persona", [response])
        

agent_expert_maker = (
    create_agent(expert_maker_llm, "expert_maker")
        .with_act(expert_maker_act_function)
        .build()
)

In [4]:
# -- 2. "Expert" plan maker agent -- 
planner_llm = Llama4Maverick(SystemPrompt('general.role_play'))

# Act function
def expert_planner_act_function(agent:Agent, session:Session):
    agent.reset_agent_state()
    message = TemplatedPrompt("expert_planner.instruct", "user").get_message(
        session.context_as_dict()
    )
    response = agent.conversate(message, session)
    session.context.set("research_plan", response.content)
    agent.log_agent_step(session, "has defined the plan", [response])

agent_planner = (
    create_agent(planner_llm, "planner")
        .with_act(expert_planner_act_function)
        .build()
)

In [5]:
# -- 3a. "Text Compressor" agent
system_prompt = SystemPrompt('text_compressor.system')
reader_llm = Llama4Maverick(system_prompt)

# Act function
def text_compressor_act_function(agent:Agent, session:Session):
    agent.reset_agent_state()
    message = TemplatedPrompt("text_compressor.instruct", "user").get_message(
        session.context_as_dict()
    )
    response = agent.conversate(message, session)
    session.context.set("compressed_text", response.content)
    agent.log_agent_step(session, "has compressed a text", [response], print_full_message=False)

agent_text_compressor = (
    create_agent(reader_llm, "text_compressor")
        .with_act(text_compressor_act_function)
        .build()
)

# Auxiliary call (the agent is defined "outside the scope" of the other guys)
def llm_text_compress(text: str, source:str, attention_instructions:str) -> dict:
    transient_session = Session()
    transient_session.context.set("web_page_result", text+f'\nSOURCE:{source}')
    transient_session.context.set("important_details", attention_instructions)
    agent_text_compressor.act(transient_session)
    compressed_text = transient_session.context.get('compressed_text')
    return {
        "compressed_text": compressed_text,
        "source": source
    }


In [None]:

# -- 3b. "Researcher" agent
@tool
def get_information(search_queries: list[str], attention_instructions:str):
    """
    This tool calls for a downstream executor to fetch relevant information.
        - search_queries: Use this parameter to specify all the queries that need to be responded to execute a plan.call_id
        - attention_instructions: Use these instructions to refine what you wish to respond with this query. 
    """
    
    if type(search_queries) != list:
        raise ValueError("Not able to run, incorrect type")
    
    logger.info(f"Search algorithm called with {search_queries}")
    results = {}
    for query in search_queries:
        logger.info(f"Search algorithm searching for: {query}")
        web_pages = find_web_sources(query)
        first_page = web_pages[-1]
        data = visit_web_page(first_page)
        llm_summary = llm_text_compress(data.get("markdown"), first_page, attention_instructions)
        results[query] = {"llm_summary": llm_summary, "url": data.get("url")}
    return results

def tool_agg(agent: Agent, tool_call, result, session: Session):
    # Add to the "researched_information" pile
    agent.log_agent_step(session, "Agent got refined data back", [])
    session.context.set_cumulative("researched_information", result)
    session.context.set("researcher__tool_called", True)

def researcher_act_function(agent:Agent, session:Session):
    first_query = True
    agent.reset_agent_state()
    refine_upper_bound = session.context.get("refine_upper_bound", 3)
    rounds = 0
    # Maybe we need to use a thinking agent here (Lets test)
    while (True):
        session.context.set("researcher__tool_called", False)
        if first_query:
            agent.log_agent_step(session, "Started research process", [], print_full_message=False)
            message = TemplatedPrompt("researcher.query", "user").get_message(
                session.context.get_context()
            )
            first_query = False
        else:
            agent.log_agent_step(session, "Started reflecting on the information", [], print_full_message=False)
            message = TemplatedPrompt("researcher.reflect", "user").get_message(
                session.context.get_context()
            )       
        agent.conversate(message, session)
        rounds += 1 
        if not session.context.get("researcher__tool_called") or rounds >= refine_upper_bound:
            break

system_prompt = SystemPrompt('researcher.system')
researcher_llm = Llama4Maverick(system_prompt)
toolbox = new_toolbox()
toolbox.add_to_toolbox(get_information)
agent_researcher = (
    create_agent(researcher_llm, "researcher")
        .with_act(researcher_act_function)
        .with_tool_callback(tool_agg)
        .with_tools(toolbox)
        .build()
)

In [7]:
def report_act_function(agent:Agent, session:Session):
    agent.reset_agent_state()
    # Compile summary
    researched_information = session.context.get_cumulative("researched_information")
    dict_to_list = lambda k, v: dict(v, **{"term": k})
    researched_information = [dict_to_list(k, item) for sublist in researched_information for k, item in sublist.items()]
    summaries = [
        f"<TERM>{data.get('term')}</TERM>\n<SOURCE>{data.get('source')}</SOURCE>\n<CONTENT>\n{data.get('llm_summary').get('compressed_text')}</CONTENT>\n" 
        for data in researched_information
    ]
    session.context.set("summary", '\n'.join(summaries))
    message = TemplatedPrompt("report_generator.instruct", "user").get_message(
        session.context.get_context()
    )
    result = agent.conversate(message, session)
    session.context.set("report", result.content)
    agent.log_agent_step(session, "Wrote a report", [result])

def post_act_function(agent:Agent, session:Session):
    request_uuid = session.session_id
    report = session.context.get("report")
    with open(f"results/{request_uuid}.md", 'w') as f:
        f.write(report)
    agent.log_agent_step(session, "Saved the report to a file", [])
    
    
system_prompt = SystemPrompt('report_generator.system')
report_llm = Llama4Maverick(system_prompt)
agent_reporter = (
    create_agent(report_llm, "report_generator")
        .with_act(report_act_function)
        .with_post_act(post_act_function)
        .build()
)

In [8]:
def perform_deep_research(query, refine_upper_bound=8):
    session = Session()
    session.context.set("user_query", query)
    session.context.set("refine_upper_bound", refine_upper_bound)

    # Agent "chaining"
    agent_expert_maker.act(session)
    agent_planner.act(session)
    agent_researcher.act(session)
    agent_reporter.act(session)

In [None]:
query = "What defines a Microsservice? I need to be able to discern between `microsservice` and `nanoservices`"
perform_deep_research(query, refine_upper_bound=1)

2025-08-04 18:07:30,345 - pipelines_logger - INFO - [92m(expert_maker) has defined the persona[0m
2025-08-04 18:07:32,599 - pipelines_logger - INFO - [92m(planner) has defined the plan[0m
2025-08-04 18:07:32,599 - pipelines_logger - INFO - [92m(researcher) Started research process[0m


The agent called the search algorithm with ['definition of microservice from reputable sources', 'characteristics and principles of microservices architecture', 'definition and characteristics of nanoservices', 'comparison between microservices and nanoservices', 'expert opinions and case studies on microservices and nanoservices']
The agent is searching with definition of microservice from reputable sources


2025-08-04 18:07:43,960 - pipelines_logger - INFO - [92m(text_compressor) has compressed a text[0m


The agent is searching with characteristics and principles of microservices architecture


2025-08-04 18:08:03,137 - pipelines_logger - INFO - [92m(text_compressor) has compressed a text[0m


The agent is searching with definition and characteristics of nanoservices


2025-08-04 18:08:20,967 - pipelines_logger - INFO - [92m(text_compressor) has compressed a text[0m


The agent is searching with comparison between microservices and nanoservices


2025-08-04 18:08:33,901 - pipelines_logger - INFO - [92m(text_compressor) has compressed a text[0m


The agent is searching with expert opinions and case studies on microservices and nanoservices


2025-08-04 18:08:41,215 - pipelines_logger - INFO - [92m(text_compressor) has compressed a text[0m
2025-08-04 18:08:41,215 - pipelines_logger - INFO - [92m(researcher) Agent got refined data back[0m
2025-08-04 18:08:46,251 - pipelines_logger - INFO - [92m(report_generator) Wrote a report[0m
2025-08-04 18:08:46,253 - pipelines_logger - INFO - [92m(report_generator) Saved the report to a file[0m
