In [1]:
from pathlib import Path
from dotenv import load_dotenv
from openai import OpenAI
from composio import Composio  # type: ignore
import os
import json

In [3]:
load_dotenv(Path("../../../../.env"))

True

In [4]:
# Example tool function
def get_weather(location: str) -> str:
    # Here you would implement actual weather lookup logic
    return f"The current temperature in {location} is 72°F."


# Define your tools schema like your snippet
tools = [
    {
        "type": "function",
        "name": "get_weather",
        "description": "Get current temperature for a given location.",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City and country e.g. Bogotá, Colombia",
                }
            },
            "required": ["location"],
            "additionalProperties": False,
        },
        "strict": True,
    },
]

In [5]:
# Define prompt for planner agent
CHATBOT_PROMPT = """
You are an expert AI assistant. 


You take in a prompt from a user and respond to it directly with your personal knowledge.
Or you use tools given to you to get live information and genereate a response using the results of the tools.

CORE PRINCIPLE: Be direct, concise and . Minimize follow-up questions.
You MUST effectively communicate the topic in a clear and engaging manner.

DEFAULT ASSUMPTIONS FOR REQUESTS:
- Promopts can be about anything
- The promopt might be vague or unclear, one word, or unclear intent
- The request might be very specific or clear


IMMEDIATE PLANNING APPROACH:
**WORKFLOW:**
1. Respond directly if you have personal knowledge about the topic
2. Use tools to get live information on the topic and then generate a response
3. If you require a tool call or several. ALWAYS RESPOND WITH TEXT ALONG WITH A TOOL CALL. This will let the user know what is going on.
4. Use tools APPROPRIATELY

SAMPLE RESPONSE FOR ANSWERING A PROMPT WITH PERSONAL KNOWLEDGE (NOT LIMITED TO ONLY THESE STEPS)
Get user message
Simply respond directly


SAMPLE RESPONSE FOR ANSWERING A PROMPT WITH TOOLS (NOT LIMITED TO ONLY THESE STEPS)
Get user message
IF message contains words like today, recent, current, live or other words with similar meaning then use tools to get live information on the message and then generate a response
Call tool or several tools
Use tool results to generate a response



TOOL CALLING STRATEGY:
- YOU MUST ALWAYS RESPOND WITH TEXT ALONG WITH A TOOL CALL
- THIS WILL LET THE USER KNOW WHAT IS GOING ON
- FAILURE TO RESPOND WITH A TEXT AND A TOOL CALL WILLRESULT IN FAILURE
- AVOID repetative tool calls
- ONLY USE TOOLS IF message contains words like today, recent, current, live or other words with similar meaning

MINIMAL QUESTIONS STRATEGY:
- For vauge requests such as single words: generate an interesting topic ie: star wars -> star wars impact on modern culture, then plan and create tasks
- For detailed requests: Create multiple tasks 


Generate text response and tool calls (if applicable) withput asking follow-up questions unless absolutely necessary.
"""


In [10]:
class ChatService:
    def __init__(self):
        self.chat_history: list[dict] = []
        self.llm: OpenAI = None
        self.tools = tools
        self.tool_functions = {"get_weather": get_weather}
        self.model_name: str = "gpt-4.1-mini"
        self.composio = Composio()
        self.user_id = "0000-1111-2222"
        self.previous_task_results: list[dict] = [
            {
                "task_id": "0",
                "task": "first task, no previous task yet",
                "results": "first task, no results yet",
            }
        ]
        pass

    def init_chat_services(self):
        self.add_chat_history(role="developer", message=CHATBOT_PROMPT)

    def add_chat_history(self, role: str, message: str):
        self.chat_history.append({"role": role, "content": message})

    def call_function(self, name, args):
        if name == "get_weather":
            res = get_weather(args["location"])
            return res

    def process_message(self, message):
        self.add_chat_history(role="user", message=message)
        print(f"process_message called with message: {message}")  # DEBUG

        stream = self.llm.responses.create(
            model=self.model_name,
            input=self.chat_history,
            tools=self.tools,
            tool_choice="auto",
            stream=True,
        )

        assistant_text = ""
        tool_call = None

        print("Starting LLM stream")
        for event in stream:
            print(f"Received event: {event}")

            # Check if event has a direct 'delta' attribute (like ResponseTextDeltaEvent)
            if hasattr(event, "delta"):
                # 'delta' could be a string chunk or dict; handle both
                delta_content = event.delta
                if isinstance(delta_content, str):
                    content = delta_content
                elif isinstance(delta_content, dict):
                    # If dict, extract 'content' field safely
                    content = delta_content.get("content", "")
                else:
                    content = ""

                assistant_text += content
                yield content
                continue

            for tool_call in event.output:
                if tool_call.type != "function_call":
                    continue
                # select tool name
                tool_name = tool_call.name
                # get the arguments for the tool
                tool_args = json.loads(tool_call.arguments)

                # call the function
                if tool_name == "get_weather":
                    # call the function
                    result = self.call_function(tool_name, tool_args)

                    # Add tool result to chat history
                    self.add_chat_history(
                        role="system",
                        message=f"Tool {tool_name} returned: {result}",
                    )

                    stream2 = self.llm.responses.stream(
                        model=self.model_name,
                        input=self.chat_history,
                    )

                    assistant_text = ""
                    for event in stream2:
                        print(f"Received event: {event}")

                        # Check if event has a direct 'delta' attribute (like ResponseTextDeltaEvent)
                        if hasattr(event, "delta"):
                            # 'delta' could be a string chunk or dict; handle both
                            delta_content = event.delta
                            if isinstance(delta_content, str):
                                content = delta_content
                            elif isinstance(delta_content, dict):
                                # If dict, extract 'content' field safely
                                content = delta_content.get("content", "")
                            else:
                                content = ""

                            assistant_text += content
                            yield content
                            continue

                    self.add_chat_history("assistant", assistant_text)

                    return

            else:
                continue

        print(self.chat_history)

        # If no tool call, add full assistant response after stream ends
        if not tool_call:
            self.add_chat_history("assistant", assistant_text)

In [22]:
chat_bot = ChatService()

In [23]:
def generate_response(message):
    try:
        for chunk in chat_bot.process_message(message):
            print(f"Yielding chunk: {chunk}")  # DEBUG
            yield f"data: {chunk}\n\n"
            sys.stdout.flush()  # 🔹 flush after each yield
    except Exception as e:
        print(f"Exception in generate_response: {e}")
        yield f"data: [Error] {str(e)}\n\n"

In [25]:
res = generate_response('What is the weather in San Francisco')

In [32]:
dir(res)

['__class__',
 '__class_getitem__',
 '__del__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__lt__',
 '__name__',
 '__ne__',
 '__new__',
 '__next__',
 '__qualname__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 'close',
 'gi_code',
 'gi_frame',
 'gi_running',
 'gi_suspended',
 'gi_yieldfrom',
 'send',
 'throw']

In [21]:
f_res

NameError: name 'f_res' is not defined

In [33]:
        stream = self.llm.responses.create(
            model=self.model_name,
            input=self.chat_history,
            tools=self.tools,
            tool_choice="auto",
            stream=True,
        )

        assistant_text = ""
        tool_call = None

        print("Starting LLM stream")
        for event in stream:
            print(f"Received event: {event}")

            # Check if event has a direct 'delta' attribute (like ResponseTextDeltaEvent)
            if hasattr(event, "delta"):
                # 'delta' could be a string chunk or dict; handle both
                delta_content = event.delta
                if isinstance(delta_content, str):
                    content = delta_content
                elif isinstance(delta_content, dict):
                    # If dict, extract 'content' field safely
                    content = delta_content.get("content", "")
                else:
                    content = ""

                assistant_text += content
                yield content
                continue

NameError: name 'self' is not defined

In [34]:
from openai import OpenAI
client = OpenAI()

In [56]:
chat_history = [{'role': "developer", 'content': 'do whatever the user says'}]

In [None]:
chat_history.append({'role': 'user', 'content': 'Say 'double bubble bath' ten times fast.'})

In [57]:
stream = client.responses.create(
    model="gpt-4.1-mini",
    input=chat_history,
    stream=True,
)

for event in stream:
    print(f'EVENT: {event}')
    print(f'EVENT: {dir(event)}')
    print(f'EVENT TYPE: {type(event)}')

EVENT: ResponseCreatedEvent(response=Response(id='resp_689bb5b8294c81908f8c703cf87f515a0adbdc3bb28298f2', created_at=1755035064.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4.1-mini-2025-04-14', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, background=False, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Reasoning(effort=None, generate_summary=None, summary=None), safety_identifier=None, service_tier='auto', status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium'), top_logprobs=0, truncation='disabled', usage=None, user=None, store=True), sequence_number=0, type='response.created')
EVENT: ['__abstractmethods__', '__annotations__', '__class__', '__class_getitem__', '__class_vars__', '__copy__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq_

In [51]:
dir(stream.response)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__static_attributes__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_decoder',
 '_elapsed',
 '_get_content_decoder',
 '_num_bytes_downloaded',
 '_prepare',
 '_request',
 'aclose',
 'aiter_bytes',
 'aiter_lines',
 'aiter_raw',
 'aiter_text',
 'aread',
 'charset_encoding',
 'close',
 'content',
 'cookies',
 'default_encoding',
 'elapsed',
 'encoding',
 'extensions',
 'has_redirect_location',
 'headers',
 'history',
 'http_version',
 'is_client_error',
 'is_closed',
 'is_error',
 'is_informational',
 'is_redirect',
 'is_server_error',
 'is_stream_consumed',
 'is_success',
 'iter_bytes',
 'iter_lines',
 'iter_raw',
 'iter_text',
 'j

In [108]:
chat_history = [{'role': "developer", 'content': 'You are an assistant tasked with helping the user'}]

In [109]:
chat_history.append({'role': "user", 'content': 'what is the current weather in san francisco'})

NameError: name 'chat_historya' is not defined

In [61]:
from openai import OpenAI
client = OpenAI()

stream = client.responses.create(
    model="gpt-5",
    input=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)
full_text = ''
for event in stream:
    if hasattr(event, "delta"):
        print(event.delta)
        full_text += event.delta

double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath
 double
 bubble
 bath


In [62]:
full_text

'double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath double bubble bath'

In [66]:
chat_history.append({'role': 'user', 'content': 'what is the weather in sf'})

In [71]:
from pydantic import BaseModel, Field

In [73]:
class InitResponse(BaseModel):
    thought: str = Field(
        description="Given the user input what needs to be done and how",
    )
    more_info: bool = Field(description='If we need more information to answer something')
    response: str = Field(description='The response if we dont need more information')

In [107]:
chat_history

[]

In [80]:
from typing import List

from openai import OpenAI
from pydantic import BaseModel

In [None]:
class EntitiesModel(BaseModel):
    attributes: List[str]
    colors: List[str]
    animals: List[str]

In [98]:
TOOL_HANDLERS = {
    "get_weather": get_weather,
}

In [103]:
def process_and_execute_tool(output_index):
    """
    Called when we have final_tool_calls[output_index]['done'] == True.
    Parses args, runs the tool, appends tool result to chat_history and streams final answer.
    """
    entry = final_tool_calls.get(output_index)
    if not entry:
        print(f"[DEBUG] No entry found for output_index={output_index}")
        return

    tool_name = entry.get("name")
    args_str = entry.get("arguments", "")
    print(f"[DEBUG] Processing tool call at index={output_index}: name={tool_name}, raw_args={args_str}")

    # Fallback to single tool if name wasn't sent
    if not tool_name:
        if len(tools) == 1 and "name" in tools[0]:
            tool_name = tools[0]["name"]
            print(f"[DEBUG] No tool name in stream; falling back to single provided tool: {tool_name}")
        else:
            print("[DEBUG] No tool name and multiple/no tools available — cannot execute safely.")
            return

    try:
        parsed_args = json.loads(args_str or "{}")
    except json.JSONDecodeError:
        parsed_args = {}
        print("[DEBUG] Failed to parse JSON args; using empty dict")

    print(f"[DEBUG] Parsed args for {tool_name}: {parsed_args}")

    handler = TOOL_HANDLERS.get(tool_name)
    if not handler:
        print(f"[DEBUG] No local handler for tool '{tool_name}'")
        return

    # Execute the tool
    try:
        # adapt call depending on expected signature
        tool_result = handler(**parsed_args) if isinstance(parsed_args, dict) else handler(parsed_args)
    except TypeError:
        # fallback if handler expects single positional arg
        tool_result = handler(parsed_args.get("location") if isinstance(parsed_args, dict) else parsed_args)

    print(f"[DEBUG] Tool result: {tool_result}")

    # Append tool output for the model to consume
    chat_history.append({
        "role": "assistant",
        "content": f'TOOL_NAME: {tool_name}, RESULT: {tool_result}'
    })

    # Re-call the model to get its final answer (streamed)
    print("[DEBUG] Re-calling model to get final answer after tool execution...")
    final_stream = client.responses.create(
        model="gpt-4.1-mini",
        input=chat_history,
        tools=tools,
        stream=True
    )

    print("Assistant (final): ", end="", flush=True)
    for ev in final_stream:
        print(f"\n[DEBUG EVENT FINAL] type={ev.type}, delta={getattr(ev, 'delta', None)}")
        if ev.type == "response.output_text.delta":
            print(ev.delta, end="", flush=True)
        elif ev.type == "response.output_text.done":
            print()  # newline

In [104]:
stream = client.responses.create(
    model="gpt-4.1-mini",
    input=chat_history,
    tools=tools,
    stream=True)
final_tool_calls = {}  # output_index -> {"item": ..., "name": None, "arguments": "", "done": False}
# Main streaming loop: aggregate and trigger processing when arguments are done
for event in stream:
    # show the raw event for debugging
    print(f"\n[DEBUG EVENT] type={event.type}, output_index={getattr(event, 'output_index', None)}, delta={getattr(event, 'delta', None)}")

    # Normal text streaming
    if event.type == "response.output_text.delta":
        print(event.delta, end="", flush=True)

    elif event.type == "response.output_text.done":
        print()

    # When an output item is added, initialize storage for it
    elif event.type == "response.output_item.added":
        idx = getattr(event, "output_index", 0)
        final_tool_calls[idx] = {
            "item": getattr(event, "item", None),
            "name": None,
            "arguments": "",
            "done": False
        }
        print(f"[DEBUG] output_item.added -> initialized final_tool_calls[{idx}]")

    # Some streams send name under function_call.delta or tool_call.delta
    elif event.type in ("response.function_call.delta", "response.tool_call.delta"):
        idx = getattr(event, "output_index", 0)
        # ensure slot exists
        if idx not in final_tool_calls:
            final_tool_calls[idx] = {"item": None, "name": None, "arguments": "", "done": False}
        # sometimes delta is a dict with "name"
        delta = getattr(event, "delta", None)
        if isinstance(delta, dict) and "name" in delta:
            final_tool_calls[idx]["name"] = delta["name"]
            print(f"[DEBUG] Captured function/tool name for index={idx}: {delta['name']}")

    # Arguments come token-by-token as strings
    elif event.type == "response.function_call_arguments.delta":
        idx = getattr(event, "output_index", 0)
        if idx not in final_tool_calls:
            final_tool_calls[idx] = {"item": None, "name": None, "arguments": "", "done": False}
        # delta may be a string fragment
        frag = event.delta if not isinstance(event.delta, dict) else json.dumps(event.delta)
        final_tool_calls[idx]["arguments"] += frag
        print(f"[DEBUG] Appended arg fragment to index={idx}: {frag}")

    elif event.type == "response.function_call_arguments.done":
        idx = getattr(event, "output_index", 0)
        if idx not in final_tool_calls:
            final_tool_calls[idx] = {"item": None, "name": None, "arguments": "", "done": False}
        final_tool_calls[idx]["done"] = True
        print(f"[DEBUG] function_call_arguments.done for index={idx}. full_args={final_tool_calls[idx]['arguments']}")

        # process this particular tool call immediately
        process_and_execute_tool(idx)
        
# after stream finished, show what we collected
print("\n[DEBUG] final_tool_calls dump:")
for idx, v in final_tool_calls.items():
    print(f"  index={idx}: name={v.get('name')}, done={v.get('done')}, args={v.get('arguments')}")

BadRequestError: Error code: 400 - {'error': {'message': "Invalid value: 'tool'. Supported values are: 'assistant', 'system', 'developer', and 'user'.", 'type': 'invalid_request_error', 'param': 'input[2]', 'code': 'invalid_value'}}

In [69]:
full_text

'Do you mean San Francisco, California? I can’t access live weather data here, but typical summer (Aug) conditions are:\n- Coast (Outer Sunset/Richmond): cool, foggy mornings, highs ~60–66°F (16–19°C)\n- Downtown/SOMA: partly sunny, highs ~64–70°F (18–21°C)\n- Mission/Potrero/Bernal: sunnier, highs ~68–74°F (20–23°C)\n- Afternoon westerly winds 10–20 mph; rain is rare\n\nIf you want the exact current conditions or a forecast, tell me the date and neighborhood/ZIP, and I’ll tailor it or guide you to a quick source.'

In [68]:
chat_history

[{'role': 'developer', 'content': 'do whatever the user says'},
 {'role': 'user', 'content': 'what is the weather in sf'}]