<a href="https://colab.research.google.com/github/TechnophileOG/Agronaut/blob/main/AI_Agent_Workshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AI Agent from Scratch

## What will we learn? How can it help you?

### Installing Libraries

In [None]:
%pip install openai
%pip install ping3
%pip install python-dotenv



### Chain
##### Sequence of Actions is Hard-Coded
![Chain](https://drive.google.com/uc?id=1svRaRr-7AgfD2AEiX9Ke9lxXRnToK3i8)

### Agent
![Agent](https://drive.google.com/uc?id=1dabLbC_eN-Xy2f8dYctAMtopVcC4y7Au)

## The Prompt

In [None]:
system_prompt_template = """
You run in a loop of Thought, Action, PAUSE, Action_Response.
At the end of the loop you output an Answer.

Use Thought to understand the question you have been asked.
Use Action to run one of the actions available to you - then return PAUSE.
Action_Response will be the result of running those actions.

Thought and Action should occur in the same turn.

If you have multiple actions to run, you can run them in consecutive turns.

Your available actions are:
{tool_descriptions}


# Example session:

Question: what is the response time category for something.com?
Thought: I should check the response time for the web page first.
Action:

{{
  "function_name": "get_response_time",
  "function_params": {{
    "url": "something.com"
  }}
}}

PAUSE

You will be called again with this:

Action_Response: 5

Thought: I should now output the response time ranking.

Action:

{{
  "function_name": "get_response_time_category",
  "function_params": {{
    "response_time": 5
  }}
}}

PAUSE

You will be called again with this:

Action_Response: Fast

You then output:

Answer: The response time category for something.com is Fast.
"""

## Tools

In [None]:
# Get Response Time
from ping3 import ping

def get_response_time(url):
    response_time = ping(url, unit='ms') # 's' for seconds and 'ms' for milliseconds
    if response_time is None:
        return -1
    else:
        return response_time

In [None]:
# Get Response Time Category
def get_response_time_category(response_time):
    if response_time <= 10:
        return "Fast"
    if response_time > 10:
        return "Slow"

In [None]:
tools = [
    {
        "function_name": "get_response_time",
        "function_call": get_response_time,
        "function_params": [
            {   "param_name": "url",
                "type": str
            }
        ],
        "example_input": "google.com",
        "return_type": int,
        "description": "Returns the response time of a website in ms, returns -1 if the website is unreachable"
    },

    {
        "function_name": "get_response_time_category",
        "function_call": get_response_time_category,
        "function_params": [
            {   "param_name": "response_time",
                "type": int
            }
        ],
        "example_input": "5",
        "return_type": str,
        "description": "Returns the category based upon the response time of a website"
    }
]

### Add Tools to the Prompt

In [None]:
def get_tool_descriptions(tools):

    tool_descriptions = ""

    for tool in tools:
      tool_descriptions += "\n"
      tool_descriptions += tool['function_name'] + ":"
      tool_descriptions += "\nDescription: " + tool["description"]
      tool_descriptions += "\nParameters:"
      for param in tool["function_params"]:
        tool_descriptions += "\n\t" + param["param_name"] + ": " + str(param["type"])
      tool_descriptions += "\n\tReturn type: " + str(tool["return_type"])
      tool_descriptions += "\ne.g. " + tool["function_name"] + ": " + tool["example_input"]
      tool_descriptions += "\n"

    return tool_descriptions

tool_descriptions=get_tool_descriptions(tools)
print(tool_descriptions)

system_prompt = system_prompt_template.format(tool_descriptions=tool_descriptions)

### Helper Function to extract JSON

In [None]:
import re
import json

def extract_json(text_response):
    pattern = r'\{.*?\}'
    matches = re.finditer(pattern, text_response, re.DOTALL)
    json_objects = []

    for match in matches:
        json_str = extend_search_new(text_response, match.span())
        try:
            json_obj = json.loads(json_str)
            json_objects.append(json_obj)
        except json.JSONDecodeError:
            continue

    return json_objects if json_objects else None

def extend_search_new(text, span):
    start, end = span
    nest_count = 1  # Starts with 1 since we know '{' is at the start position
    for i in range(end, len(text)):
        if text[i] == '{':
            nest_count += 1
        elif text[i] == '}':
            nest_count -= 1
            if nest_count == 0:
                return text[start:i+1]
    return text[start:end]

## Creating the Agent

In [None]:
from openai import OpenAI

from google.colab import userdata

client = OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

# import os
# from dotenv import load_dotenv
# load_dotenv()
# client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))




In [None]:
def generate_text_with_conversation(messages, model = "gpt-3.5-turbo"):
    response = client.chat.completions.create(
        model=model,
        messages=messages
        )
    return response.choices[0].message.content

In [None]:
available_actions = {
    tool["function_name"]: tool["function_call"] for tool in tools
}

available_actions

In [None]:
user_prompt = "what is the response time category for youtube.com?"

messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_prompt},
]

# Coversation Loop

In [None]:
turn_count = 1
max_turns = 10

while turn_count < max_turns:
    print (f"Loop: {turn_count}")
    print("-------------------------------------------------------------")
    turn_count += 1

    response = generate_text_with_conversation(messages, model="gpt-3.5-turbo") #
    messages.append({"role": "assistant", "content": response})
    print(response)
    json_function = extract_json(response)

    if json_function:
            function_name = json_function[0]['function_name']
            function_parms = json_function[0]['function_params']
            if function_name not in available_actions:
                raise Exception(f"Unknown action: {function_name}: {function_parms}")
            print(f" -- running {function_name} {function_parms}")
            action_function = available_actions[function_name]
            #call the function
            result = action_function(**function_parms)
            function_result_message = f"Action_Response: {result}"
            messages.append({"role": "user", "content": function_result_message})
            print(function_result_message)
    else:
        break