In [15]:
! pip install -U openai haystack-ai duckduckgo-api-haystack jsonschema 

Collecting openai
  Downloading openai-1.60.2-py3-none-any.whl (456 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m456.1/456.1 kB[0m [31m34.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting haystack-ai
  Downloading haystack_ai-2.9.0-py3-none-any.whl (419 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m419.2/419.2 kB[0m [31m65.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting duckduckgo-api-haystack
  Downloading duckduckgo_api_haystack-0.1.14-py3-none-any.whl (9.8 kB)
Collecting sniffio
  Downloading sniffio-1.3.1-py3-none-any.whl (10 kB)
Collecting distro<2,>=1.7.0
  Downloading distro-1.9.0-py3-none-any.whl (20 kB)
Collecting jiter<1,>=0.4.0
  Downloading jiter-0.8.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (345 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m345.0/345.0 kB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting httpx<1,>=0.23.0
  Downloading httpx-0.28.1-py3-none-any.whl (73 kB)
[2K   

In [16]:
from getpass import getpass
import os


if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key:")

In [17]:
from typing import Annotated, Callable, Tuple
from dataclasses import dataclass, field

import random, re

from typing import Annotated, Callable, Tuple
from dataclasses import dataclass, field

import random, re

from haystack.dataclasses import ChatMessage, ChatRole
from haystack.tools import create_tool_from_function
from haystack.components.tools import ToolInvoker
from haystack.components.generators.chat import OpenAIChatGenerator

## Agent Setup

Orchestrator Agent: Coordinate workflow, Answer general questions, manage inter-agent communication, and finalize outputs.

Research Agent: Investigate the industry/company, trends, competitors, and market dynamics.

Plus 2 More Agents

## Orchestrator Agent

### Agent Functions/Tools

In [19]:
#Handoff Template Defination
HANDOFF_TEMPLATE = "Transferred to: {agent_name}. Adopt persona immediately."
HANDOFF_PATTERN = r"Transferred to: (.*?)(?:\.|$)"


@dataclass
class SwarmAgent:
    name: str = "SwarmAgent"
    llm: object = OpenAIChatGenerator(model= "gpt-4o-mini")
    instructions: str = "You are a helpful Agent"
    functions: list[Callable] = field(default_factory=list)

    def __post_init__(self):
        self._system_message = ChatMessage.from_system(self.instructions)
        self.tools = [create_tool_from_function(fun) for fun in self.functions] if self.functions else None
        self._tool_invoker = ToolInvoker(tools=self.tools, raise_on_failure=False) if self.tools else None

    def run(self, messages: list[ChatMessage]) -> Tuple[str, list[ChatMessage]]:
        # generate response
        agent_message = self.llm.run(messages=[self._system_message] + messages, tools=self.tools)["replies"][0]
        new_messages = [agent_message]

        if agent_message.text:
            print(f"\n{self.name}: {agent_message.text}")

        if not agent_message.tool_calls:
            return self.name, new_messages

        # handle tool calls
        for tc in agent_message.tool_calls:
            # trick: Ollama do not produce IDs, but OpenAI and Anthropic require them.
            if tc.id is None:
                tc.id = str(random.randint(0, 1000000))
        tool_results = self._tool_invoker.run(messages=[agent_message])["tool_messages"]
        new_messages.extend(tool_results)

        # handoff
        last_result = tool_results[-1].tool_call_result.result
        match = re.search(HANDOFF_PATTERN, last_result)
        new_agent_name = match.group(1) if match else self.name

        return new_agent_name, new_messages


In [21]:
def escalate_to_human(summary: Annotated[str, "A summary"]):
    """Only call this if explicitly asked to."""
    print("Escalating to human agent...")
    print("\n=== Escalation Report ===")
    print(f"Summary: {summary}")
    print("=========================\n")
    exit()

def transfer_to_research_agent():
    """User for anything that requires investigating the industry/company, trends, competitors, market dynamics related."""
    return HANDOFF_TEMPLATE.format(agent_name="Research Agent")


def transfer_back_to_orchestrator():
    """Call this if the user brings up a topic outside of your purview,
    including escalating to human."""
    return HANDOFF_TEMPLATE.format(agent_name="Orchestrator Agent")

### Agent Definition

In [25]:
orch_agent = SwarmAgent(
    name="Orchestrator Agent",
    instructions=(
        """Parse the user’s input to determine the target industry/company, research scope, and desired outputs. 
        Break the query into subtasks for Research Agent, Use Case Generator, and Resource Collector.       """
        "Trigger agents in sequence: Research Agent → Use Case Generator → Resource Collector. Ensure each agent’s output is validated before passing it to the next agent. "
        "Extract structured outputs (e.g., JSON) from each agent and store them in a shared knowledge base. Pass only relevant data to downstream agents. "
        "Monitor agent tasks for failures (e.g., API errors, incomplete data). Retry up to 3 times. If unresolved, escalate to the user with a summary of the issue."
        "Combine outputs from all agents into a final report. Structure it as: Market Overview → Use Cases → Data Resources."
        "Make tool calls only if necessary and make sure to provide the right arguments."
    ),
    functions=[transfer_to_research_agent, escalate_to_human],
)



# Remains of Old Code

In [95]:
assist_pipe.connect("search.documents", "prompt_builder.documents")
assist_pipe.connect("prompt_builder.prompt", "llm.prompt")

<haystack.core.pipeline.pipeline.Pipeline object at 0x7ff3f05c77c0>
🚅 Components
  - search: DuckduckgoApiWebSearch
  - prompt_builder: PromptBuilder
  - llm: OpenAIGenerator
🛤️ Connections
  - search.documents -> prompt_builder.documents (List[Document])
  - prompt_builder.prompt -> llm.prompt (str)

In [97]:
from haystack_experimental.dataclasses import Tool
from typing import Annotated, Literal

In [105]:
def get_info(company_name: Annotated[str, "The item company name to search"]):
    """
    Search the web to get the information of a company associated with a domain
    """

    search_query = f"""What are all the technical domains and fronts that {company_name} is currently working in?
    Elaborate its ongoing and upcoming projects and recently released products.
    """
    question = f"""
    What is the presence of {company_name} in different technical domains? If the given documents do not contain the correct knowledge output:`ERROR: CANNOT FIND CORRECT DATA`, Else Respond with Multiple Entries with single entry being 30 words description & URL associated.
    Format:
    1:
    Description: <description>
    URL: <url>
    ...
    N:
    Description: <description>
    URL: <url>
    """

    data = {"search": {"query": search_query}, "prompt_builder": {"query": question}}

    return assist_pipe.run(data=data)["llm"]["replies"][0]

In [107]:
print(get_info("Zomato"))

  results = self.ddgs.text(**payload)
1:
Description: Zomato employs Java for building scalable services and Python for data analysis, machine learning tasks, and automation, showcasing its presence in software development and data science.
URL: https://www.technologywithvivek.com/2024/09/Top+technologies+and+programming+languages+used+in+zomato+app.html

2:
Description: The platform utilizes a variety of technologies, including HTML5 and Bootstrap, to enhance its user experience and functionality, indicating strong technical expertise in web development.
URL: https://www.crunchbase.com/organization/zomato/technology

3:
Description: Zomato's observability platform efficiently handled massive metrics during peak times, highlighting its capability in real-time data processing and technical operations at scale.
URL: https://blog.zomato.com/a-tale-of-scale-behind-the-scenes-at-zomato-tech-for-nye-2023

4:
Description: The company focuses on app development investments, ensuring a user-fri

In [109]:
get_info_tool = Tool.from_function(get_info)

In [113]:
def get_domain_growth(domain: Annotated[str, "The domain to be studied"]):
    """
    Search the web to get the information for the domain
    """

    search_query = f"{domain}: Growth Trends 2025. What are the growth opportunities in {domain} in 2025? Has {domain} grown in 2025 or remained static or declined."
    question = f"""
    What growth opportunities does {domain} has? Respond NA if none present. Respond AWESOME if it is more than previous years. Respond BAD if it is less than previous years. If AWESOME or BAD, Respond with Multiple Entries with single entry being 30 words description, along with URL.
    Format:
    NA/AWESOME/BAD
    1:
    Description: <description>
    URL: <url>
    ...
    N:
    Description: <description>
    URL: <url>
    """

    data = {"search": {"query": search_query}, "prompt_builder": {"query": question}}

    return assist_pipe.run(data=data)["llm"]["replies"][0]

In [115]:
print(get_domain_growth("Spacial Computing"))

  results = self.ddgs.text(**payload)
AWESOME  
1:  
Description: The spatial computing market is expected to grow at a CAGR of 20.4% from 2023 to 2030, driven by increasing demand for immersive experiences and technological advancements.  
URL: https://www.grandviewresearch.com/industry-analysis/spatial-computing-market-report  
2:  
Description: Global Spatial Computing Market is projected to reach US$ 511.55 billion by 2032, reflecting a significant growth opportunity fueled by advancements and adoption across multiple industries.  
URL: https://www.datamintelligence.com/research-report/spatial-computing-market  
3:  
Description: The convergence of spatial computing with AI and IoT unlocks transformative possibilities, enhancing collaborative efforts and innovative applications across various sectors.  
URL: https://blog.bccresearch.com/navigating-the-future-unveiling-the-dynamics-of-the-spatial-computing-industry  
4:  
Description: The North American region leads in market share 

In [117]:
get_domain_growth_tool = Tool.from_function(get_domain_growth)

In [119]:
from haystack_experimental.components.generators.chat import OpenAIChatGenerator
from haystack_experimental.components.tools.tool_invoker import ToolInvoker
from haystack_experimental.dataclasses import ChatMessage

In [121]:
tools = [get_info_tool, get_domain_growth_tool]

chat_generator = OpenAIChatGenerator(tools=tools)

tool_invoker = ToolInvoker(tools=tools)

messages = [
    ChatMessage.from_system(
        """You are TrendSage, a market analyst and buisness growth Engine. Always talk with  references. You are expected to talk with a client and tell them what to do.

            Ask the client for there company name and current domain, their business goals.

            Deduce list of potential domains of the company.

            Then try to find the growth opportunities of the company.

            Prepare a tool call if needed, otherwise use your knowledge to respond to the user.
            If the invocation of a tool requires the result of another tool, prepare only one call at a time.

            Each time you receive the result of a tool call, ask yourself: "Am I done with the task?".
            If not and you need to invoke another tool, prepare the next tool call.
            If you are done, respond with just the final result."""
    )
]

while True:
    user_input = input("\n\nwaiting for input (type 'exit' or 'quit' to stop)\n🧝: ")
    if user_input.lower() == "exit" or user_input.lower() == "quit":
        break
    messages.append(ChatMessage.from_user(user_input))

    while True:
        print("⌛ iterating...")

        replies = chat_generator.run(messages=messages)["replies"]
        messages.extend(replies)

        # Check for tool calls and handle them
        if not replies[0].tool_calls:
            break
        tool_calls = replies[0].tool_calls

        tool_messages = tool_invoker.run(messages=replies)["tool_messages"]
        messages.extend(tool_messages)

    # Print the final AI response after all tool calls are resolved
    print(f"🤖: {messages[-1].text}")

⌛ iterating...
🤖: Hello! How can I assist you today? If you're looking to discuss your business or explore growth opportunities, please share your company name, current domain, and your business goals!
⌛ iterating...
⌛ iterating...
🤖: I've gathered some information regarding Zepto and the potential growth in the 10-minute delivery domain. 

### Current Insights:
1. **Market Trend**: The trend of 10-minute deliveries is expected to significantly grow, especially as companies expand their offerings. Consumers in metropolitan areas are increasingly willing to pay extra for rapid service. [Read more here.](https://economictimes.indiatimes.com/industry/services/retail/10-minute-deliveries-cant-miss-profit-and-regulatory-hurdles-to-add-profit-in-big-basket/articleshow/116853691.cms)

2. **Projected Growth**: Quick commerce is projected to scale by 2025, with companies like Zepto experiencing 200% order increases. This hints at a substantial growing market alongside traditional e-commerce. [L

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=334056e4-dfa5-4173-a17f-b0fa1780ca52' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>