In [1]:
!git clone https://github.com/Mohit-coder-droid/13-Pathway-Ideas

Cloning into '13-Pathway-Ideas'...
remote: Enumerating objects: 9, done.[K
remote: Counting objects: 100% (9/9), done.[K
remote: Compressing objects: 100% (6/6), done.[K
remote: Total 9 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (9/9), done.


In [2]:
import google.generativeai as genai
import os
import json

In [14]:
from google.colab import userdata
os.environ["GEMINI_API_KEY"] = userdata.get('GEMINI_API_KEY')
genai.configure(api_key=os.environ["GEMINI_API_KEY"])

## Checking how to incorporate google gemini model on swarm

In [4]:
# Standard library imports
import copy
import json
from collections import defaultdict
from typing import List, Callable, Union

# Package/library imports
from typing import List, Callable, Union, Optional

# Third-party imports
from pydantic import BaseModel
AgentFunction = Callable[[], Union[str, "Agent", dict]]

In [6]:
import sys
sys.path.insert(1, '/content/13-Pathway-Ideas')

In [7]:
# Local imports
from util import function_to_json, debug_print, merge_chunk

In [8]:
# A function which takes nothing but returns either string, Agent, or dict
AgentFunction = Callable[[], Union[str, "Agent", dict]]

In [9]:
class Agent(BaseModel):
    name: str = "Agent"
    model: str = "gemini-1.5-flash"
    instructions: Union[str, Callable[[], str]] = "You are a helpful agent."
    functions: List[AgentFunction] = []
    tool_choice: str = None
    parallel_tool_calls: bool = True


class Response(BaseModel):
    messages: List = []
    agent: Optional[Agent] = None
    context_variables: dict = {}

class Result(BaseModel):
    """
    Encapsulates the possible return values for an agent function.

    Attributes:
        value (str): The result value as a string.
        agent (Agent): The agent instance, if applicable.
        context_variables (dict): A dictionary of context variables.
    """

    value: str = ""
    agent: Optional[Agent] = None
    context_variables: dict = {}

In [None]:
# In this code I am not dealing with the context variables to keep things simple and because of lack of time
# I am also not tackling the stream part
# In my code the model gets only one chance to say something but in the original one, it takes multiple iteration and functions calling to get to the right result
__CTX_VARS_NAME__ = "context_variables"

## genai.GenerativeModel()

In [10]:
class Swarm:
    def __init__(self):
        self.client = genai.GenerativeModel()

    def get_chat_completion(
            self,
            agent:Agent,
            history:List,
            context_variables: dict,
            model_override: str,
            stream: bool,
            debug: bool,
            max_history:int = 10,  # max amount of history that the model can see
    ):
        context_variables = defaultdict(str, context_variables)

        instructions = (
            agent.instructions(context_variables)
            if callable(agent.instructions)
            else agent.instructions
        )

        messages = [{"role": "system", "content": instructions}]

        if history[-1]:
            if "role" in history[-1] and history[-1]['role'] == "model":
                messages.append(f"In the following conversation your last response was {history[-1]}. So, consider this and following conversation and tell your response. ")

        if len(history)>max_history:
            messages.append(history[-max_history:])
        else:
            messages.append( history)
        debug_print(debug, "Getting chat completion for...:", messages)


        # I have to do something to include context_variables here



        # In google generative ai, we just have to pass the name of the function
        create_params = {
            "model_name": model_override or agent.model,
            "tools": agent.functions,
        }

        self.client = genai.GenerativeModel(**create_params)
        return self.client.generate_content(str(messages))
        # return self.client.generate_content("Can you tell me what things can you do")

    def run(
        self,
        agent: Agent,
        messages: List,
        context_variables: dict = {},
        model_override: str = None,
        stream: bool = False,
        debug: bool = False,
        max_turns: int = float("inf"),
        max_history:int = 10,  # max amount of history that the model can see
        execute_tools: bool = True,
    ):
        active_agent = agent
        context_variables = copy.deepcopy(context_variables)
        history = copy.deepcopy(messages)
        init_len = len(messages)

        n = 1

        # An agent can be called multiple times after implementing a function
        # while len(history) - init_len < max_turns and active_agent:

        # get completion with current history, agent
        completion = self.get_chat_completion(
            agent=active_agent,
            history=history,
            context_variables=context_variables,
            model_override=model_override,
            stream=stream,
            debug=debug,
            max_history=max_history
        )


        # For now I am making it so that it's able to call only one function
        message = completion.to_dict()['candidates'][0]['content']
        message['sender'] = active_agent.name
        debug_print(debug, "Received completion:", message)

        # if not message.function_call or not execute_tools:
        history.append(message)

        # if there is no function calling then break
        if "function_call" not in message['parts'][0] or not execute_tools:
            debug_print(debug, "Ending turn.")
            return Response(
                messages=history[init_len:],
                # messages=[message],
                agent=active_agent,
                context_variables=context_variables,
            )


        # handle function calls, updating context_variables, and switching agents
        partial_response,history = self.handle_function_calls(
            message['parts'], functions = active_agent.functions,context_variables = context_variables, debug=debug,history=history
        )
        history.extend(partial_response.messages)

        context_variables.update(partial_response.context_variables)
        if partial_response.agent:
            active_agent = partial_response.agent

        return Response(
            messages=history[init_len:],
            # messages=[message],
            agent=active_agent,
            context_variables=context_variables,
        )

    # Handle the function result
    def handle_function_result(self, result, debug) -> Result:
        match result:
            case Result() as result:
                return result

            case Agent() as agent:
                return Result(
                    value=json.dumps({"assistant": agent.name}),
                    agent=agent,
                )
            case _:
                try:
                    return Result(value=str(result))
                except Exception as e:
                    error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}"
                    debug_print(debug, error_message)
                    raise TypeError(error_message)


    # For now implementing only for single function call, also check for context_variables



    def handle_function_calls(
        self,
        function_call,
        functions,
        context_variables:dict,
        debug:bool,
        history
    ) -> Response:
        function_map = {f.__name__: f for f in functions}
        partial_response = Response(
            messages=[], agent=None, context_variables={})

        for func in function_call:
            name = func['function_call']['name']
            args = func['function_call']['args']

            # If our model has done mistake while calling the function_name
            if name not in function_map:
                debug_print(debug, f"Tool {name} not found in function map.")
                partial_response.messages.append(
                    {
                        "role": "tool",
                        "tool_name": name,
                        "content": f"Error: Tool {name} not found.",
                    }
                )

                continue

        debug_print(
                debug, f"Processing tool call: {name} with arguments {args}")

        func = function_map[name]

        # If there is something which the user hadn't mentioned thinking that the model already know that then it must be included in context_var
        # pass context_variables to agent functions
        # if __CTX_VARS_NAME__ in func.__code__.co_varnames:
        #     raw_result = functions[function_name](__CTX_VARS_NAME__,**function_args)
        # else:

        raw_result = func(**args)
        # print("No Problem")
        result: Result = self.handle_function_result(raw_result, debug)

        # Also check what it tool_id
        partial_response.messages.append(
            {
                "role": "tool",
                "tool_name": name,
                "content": result.value,
            }
        )

        partial_response.context_variables.update(result.context_variables)
        if result.agent:
            partial_response.agent = result.agent

            create_params = {
                "model_name": result.agent.model,
            }

            self.client = genai.GenerativeModel(**create_params)
            message =  self.client.generate_content("Introduce yourself to user in just one line strictly. " + result.agent.instructions).to_dict()['candidates'][0]['content']

            # For now I am making it so that it's able to call only one function
            message['sender'] = result.agent.name

            # if not message.function_call or not execute_tools:
            history.append(message)


        return partial_response,history


In [11]:
def run_demo_loop(
    starting_agent, context_variables=None, stream=False, debug=False
) -> None:
    client = Swarm()
    print("Starting Swarm CLI 🐝")

    messages = []
    agent = starting_agent

    while True:
        user_input = input("User Input: ")
        if user_input.lower() == "/exit":
          print("Exiting the loop. Goodbye!")
          break  # Exit the loop
        messages.append({"role": "user", "content": user_input})

        response = client.run(
            agent=agent,
            messages=messages,
            context_variables=context_variables or {},
            stream=stream,
            debug=debug,
        )

        # return response
        # print(response)

        pretty_print_messages(response.messages)

        messages.extend(response.messages)
        agent = response.agent



def pretty_print_messages(messages) -> None:
    for message in messages:

        if ('role' in message and (message['role']=='model' or message['role']=='assisstant')):
            # print agent name in blue
            print(f"\033[94m{message['sender']}\033[0m:", end=" ")

            if 'parts' in message:
                for msg in message['parts']:
                    if 'text' in msg:
                        print(msg['text'])

                    if 'function_call' in msg:
                        print(f"{msg['function_call']['name']}()")


## Cricket App:-

### Functions:-

It's not working properly maybe because of it's complexity the model is giving wrong outputs. so I didn't tested it fully

This is just the architecture of my swarm agents

In [30]:
def player_detail(name:str,team:str)->dict:
    """
        Returns a brief overview of about any player in the form of json, what is his performance since last month, what is his overall performance, performance in this tournamnt

        Args:
            name: Name of the player
            team: Team which player belongs to
    """

    player_detail = {
        name:"Mahendra Singh Dhoni",
        team:"Chennai Super King",
        overall_performance:[90,100,110,23],
        total_sixes:230,
        total_fours:500,
        performance_ipl_wise:{"2023":"Avergage 230 runs in all matches"}
    }

    return player_detail

def player_comparision(player1:str,player2:str)->str:
    """
        Returns a comparision between two players
    """

    return "Virat Kohli is better than Rohit Sharma"

def probability_winning(team1:dict,team2:dict)->str:
    """Calculates the probability of winning of a match between two teams"""

    return "CSK will win against Mumbai Indians by 130 runs"

def get_weather(location:str,time:str="now"):
    """Returns weather Forecasting of any location for any time."""

    return "It will be a sunny day"

def live_match(match:str)->dict:
    """Gives live detail about a match"""

    return {"batting team":{
        "run":200,
        "wickets":3,
        "batters":["Virat Kohli","Suryakumar"],
        "remaining balls":10
    }}

def get_news():
    """Returns latest news of cricket"""

    return "CSK won the yesterday match"

def book_ticket(match:str,day:str,location:str):
    """Book tickets for match that is going to happen"""

    return "Your ticket has been booked"

def cancel_ticket(booking_id:int):
    """Cancels the ticket which already has been booked"""

    return "Your ticket has been cancelled"

### Transfer functions:-

In [39]:
def transfer_to_player():
    """Call this function when the player_agent() => [gives information about player] seems suitable for handling the task"""
    return player_agent

def transfer_to_analytic():
    """Call this function when the analytic_agent() => [Do some analysis] seems suitable for handling the task"""
    return analytic_agent

def transfer_to_live():
    """Call this function when the live_agent() => [Gives live information about match] seems suitable for handling the task"""
    return live_agent

def transfer_to_book():
    """Call this function when the book_agent() => [Book and cancel ticket for matches] seems suitable for handling the task"""
    return book_agent

def transfer_to_triage():
    """Call this function when a user needs to be transferred to a differnt agent and a different policy.
    For instance, if a user is asking about a topic that is not handled by the current agent, call this function.
    """
    return triage_agent

Start_prompt = """
    You are an intelligent and empathetic customer support representative for cricket app. Your task is to call the function that can perform the task asked by user very well. Don't hesitate to ask the user again if you think that sufficient information is not given about the task.
    Strictly don't assume anything from your side and don't hallucinate.

"""

end_prompt = """
    NOTE: If the user requests for something that you can't tackle then call 'transfer_to_triage' function strictly
"""

triage_agent_instruc = """
    You are an expert triaging agent for cricket app.
    You have to transfer the task to the right agent"""

### Agents:-

In [40]:
player_agent = Agent(
    name = "Player Agent",
    instructions =Start_prompt +  "Your task is to give information about some player, his performance in past matches, his upcoming matches schedule" + end_prompt,
    tools = [player_detail,transfer_to_triage]
)

analytic_agent = Agent(
    name = "Analytics Agent",
    instructions = Start_prompt + "You can perform any kind of analytical task, like finding out probability which team will win, which player is likely to perform well in the upcoming matches" + end_prompt,
    tools = [player_comparision,probability_winning,transfer_to_triage]
)

live_agent = Agent(
    name = "Live Agent",
    instructions = Start_prompt + "Your task is to give live details about any match. " + end_prompt,
    tools = [live_match,get_weather,get_news,transfer_to_triage]

)

book_agent = Agent(
    name="Book Agent",
    instructions = Start_prompt + "Your task is to book ticket for matches. " + end_prompt,
    tools = [book_ticket,cancel_ticket,transfer_to_triage]
)

triage_agent = Agent(
    name = "Triage Agent",
    instructions = triage_agent_instruc,
    tools = [transfer_to_player,transfer_to_analytic,transfer_to_live,transfer_to_book]
)

In [41]:
run_demo_loop(triage_agent,debug=True)  # here it should have returned role as model but now it's giving role as system, before this run it return role as assisstant. So, it's not working very properly here

Starting Swarm CLI 🐝
User Input: Hello there
[97m[[90m2024-10-21 14:06:15[97m][90m Getting chat completion for...: [{'role': 'system', 'content': 'You are a model and You have to transfer the task to the right agent'}, [{'role': 'user', 'content': 'Hello there'}]][0m
[97m[[90m2024-10-21 14:06:18[97m][90m Received completion: {'parts': [{'text': '```json\n[\n  {\n    "role": "system",\n    "content": "This is a simple greeting. No specific task is required."\n  }\n]\n```'}], 'role': 'model', 'sender': 'Triage Agent'}[0m
[97m[[90m2024-10-21 14:06:18[97m][90m Ending turn.[0m
[94mTriage Agent[0m: ```json
[
  {
    "role": "system",
    "content": "This is a simple greeting. No specific task is required."
  }
]
```


KeyboardInterrupt: Interrupted by user

# For checking the Working of Gemini-Swarm:

## Weather Agent:-

In [19]:
def get_weather(location:str, time:str="now"):
    """Get the current weather in a given location.

        Args:
            location: City or state
    """
    print(f"Weather at {location} will be sunny today")


def send_email(recipient:str, subject:str, body:str):
    """
        For sending email to recipient
        Args:
            recipient: Receiver of email
            subject: subject of email
            body: body of email
    """
    print("Sending email...")
    print(f"To: {recipient}")
    print(f"Subject: {subject}")
    print(f"Body: {body}")
    return "Sent!"


weather_agent = Agent(
    name="Weather Agent",
    instructions="You are a helpful agent. Ask questions from user if you think that there is some missing information in the query instead of directly calling the function",
    model="gemini-1.5-flash",
    functions=[get_weather, send_email],
)

In [20]:
run_demo_loop(weather_agent,debug=True)

Starting Swarm CLI 🐝
User Input: Hi there
[97m[[90m2024-10-21 14:30:32[97m][90m Getting chat completion for...: [{'role': 'system', 'content': 'You are a helpful agent. Ask questions from user if you think that there is some missing information in the query instead of directly calling the function'}, [{'role': 'user', 'content': 'Hi there'}]][0m
[97m[[90m2024-10-21 14:30:36[97m][90m Received completion: {'parts': [{'text': 'What can I help you with today? \n'}], 'role': 'model', 'sender': 'Weather Agent'}[0m
[97m[[90m2024-10-21 14:30:36[97m][90m Ending turn.[0m
[94mWeather Agent[0m: What can I help you with today? 

User Input: I want to know the weather at Patna
[97m[[90m2024-10-21 14:30:44[97m][90m Getting chat completion for...: [{'role': 'system', 'content': 'You are a helpful agent. Ask questions from user if you think that there is some missing information in the query instead of directly calling the function'}, [{'role': 'user', 'content': 'Hi there'}, {'par

## Triage Agent

In [21]:

def process_refund(item_id:int, reason:str="NOT SPECIFIED"):
    """Refund an item. Refund an item. Make sure you have the item_id of the form item_... Ask for user confirmation before processing the refund."""
    print(f"[mock] Refunding item {item_id} because {reason}...")
    return "Success!"


def apply_discount():
    """Apply a discount to the user's cart."""
    print("[mock] Applying discount...")
    return "Applied discount of 11%"


triage_agent = Agent(
    name="Triage Agent",
    instructions="Determine which agent is best suited to handle the user's request, and transfer the conversation to that agent.",
)
sales_agent = Agent(
    name="Sales Agent",
    instructions="Be super enthusiastic about selling bees.",
)
refunds_agent = Agent(
    name="Refunds Agent",
    instructions="Help the user with a refund. If the reason is that it was too expensive, offer the user a refund code. If they insist, then process the refund.",
    functions=[process_refund, apply_discount],
)


def transfer_back_to_triage():
    """Call this function if a user is asking about a topic that is not handled by the current agent."""
    return triage_agent


def transfer_to_sales():
    return sales_agent


def transfer_to_refunds():
    return refunds_agent


triage_agent.functions = [transfer_to_sales, transfer_to_refunds]
sales_agent.functions.append(transfer_back_to_triage)
refunds_agent.functions.append(transfer_back_to_triage)

In [24]:
run_demo_loop(triage_agent,debug=True)

Starting Swarm CLI 🐝
User Input: I want my refund
[97m[[90m2024-10-21 14:32:47[97m][90m Getting chat completion for...: [{'role': 'system', 'content': "Determine which agent is best suited to handle the user's request, and transfer the conversation to that agent."}, [{'role': 'user', 'content': 'I want my refund'}]][0m
[97m[[90m2024-10-21 14:32:52[97m][90m Received completion: {'parts': [{'function_call': {'name': 'transfer_to_refunds', 'args': {}}}], 'role': 'model', 'sender': 'Triage Agent'}[0m
[97m[[90m2024-10-21 14:32:52[97m][90m Processing tool call: transfer_to_refunds with arguments {}[0m
[94mTriage Agent[0m: transfer_to_refunds()
[94mRefunds Agent[0m: Hello, I can help you with a refund - what's the reason for your request? 

User Input: I want my refund for honey whose id=123
[97m[[90m2024-10-21 14:33:02[97m][90m Getting chat completion for...: [{'role': 'system', 'content': 'Help the user with a refund. If the reason is that it was too expensive, offer 