# Working with Message Stack payloads 
- there are currently three support schemes; openai, google, anthropic
- these have slightyl different payload structure for tool calls and responses in particular
- generally, we want to ACK a tool call with an id and follow it with a response in the message stack
- anthropic has a tool block and google has a functionResponse while open AI is easier with just the typically message with role and content
- we can test read message stacks as instructions from the database in different contexts
    - for a user question; trivial 
    - for a tool request with tool stack
    - for agents that provider system prompts

In [3]:
import sys
sys.path.append('../')
import percolate as p8
from percolate.models.p8 import AIResponse
from percolate.services import PostgresService
from percolate.services.llm.LanguageModel import request_anthropic,request_google,request_openai
from pydantic import BaseModel, model_validator, Field
import typing
import uuid

pg = PostgresService()

### illustrate that in each scheme we read messages that are ready for that scheme and functions too
- there is a corresponding database request_x that reads data in the same way

## The first test should take a singel turn example id
- at this point you have created a request with any scheme using e.g. percolate_with_agent


In [11]:
test_session_id = '8c51c161-7ac8-db55-68ec-7255ea4983e0'

In [12]:
goo_mm =  [d for d in pg.execute(f""" select * from p8.get_google_messages('{test_session_id}') """)[0]['messages']]  
fns =  [d for d in pg.execute(f""" select * from p8.get_tools_by_name(ARRAY['get_pet_findByStatus'],'google') """)[0]['get_tools_by_name']]  
request_google(goo_mm,fns).json()

In [13]:
ant_mm = [d for d in pg.execute(f""" select * from p8.get_anthropic_messages('{test_session_id}') """)[0]['messages']]
fns =  [d for d in pg.execute(f""" select * from p8.get_tools_by_name(ARRAY['get_pet_findByStatus'],'anthropic') """)[0]['get_tools_by_name']]  
request_anthropic(ant_mm,fns).json()

In [14]:
mm = [d for d in pg.execute(f""" select * from p8.get_canonical_messages('{test_session_id}') """)[0]['messages']]
fns =  [d for d in pg.execute(f""" select * from p8.get_tools_by_name(ARRAY['get_pet_findByStatus']) """)[0]['get_tools_by_name']]  
request_openai(mm,fns).json()

## Longer turn tests
- make sure function calls and responses are paired properly 
- test injecting in new quuestions

# Streaming

In [1]:
import sys
sys.path.append('../')
import percolate as p8
from percolate.models.p8 import AIResponse
from percolate.services import PostgresService
from percolate.services.llm.LanguageModel import request_anthropic,request_google,request_openai, LanguageModel,CallingContext
from pydantic import BaseModel, model_validator, Field
import typing
import uuid

In [2]:
"""create a simple printer that we can pass down"""
def printer(text):
    """streaming output"""
    print(text, end="", flush=True)  
context = CallingContext(streaming_callback=printer)

In [3]:
fns =[{
  "name": "get_weather",
  "description": "Get the weather forecast for a specific city and date",
  "parameters": {
    "type": "object",
    "properties": {
      "city": {
        "type": "string",
        "description": "The city for which to get the weather forecast"
      },
      "date": {
        "type": "string",
        "description": "The date for the weather forecast (YYYY-MM-DD)"
      }
    },
    "required": ["city", "date"]
  }
}]

#this maps to tools by just wrapping
tools = [{'type': 'function', 'function': f} for f in fns]

models = ['gpt-4o-mini', 'deepseek-chat', 'claude-3-5-sonnet-20241022', 'gemini-1.5-flash']
#we can test each of the models ^ 
#we keep one in eacn scheme - open ai, google and ahtnropic but we also use a second non open ai that uses the same schema to test consistency


In [4]:
models = ['gpt-4o-mini', 'deepseek-chat', 'claude-3-5-sonnet-20241022', 'gemini-1.5-flash']
#we can test each of the models ^ 
#we keep one in eacn scheme - open ai, google and ahtnropic but we also use a second non open ai that uses the same schema to test consistency

model = LanguageModel(models[0])
#1. no stream no tool
#model.ask("What is the capital of ireland", functions=fns)
# 2 stream no tool - streaming just add a context
model.ask("What is the capital of ireland", functions=fns, context=context, debug_response=False)
# 3 no stream and tool
#model.ask("What is the weather in paris tomorrow", functions=fns)
# 4 stream and tool
#model.ask("What is the weather in paris tomorrow", functions=fns,  context=context)

In [9]:
type(r)

In [15]:
model = LanguageModel(models[1])
#1. no stream no tool
#model.ask("What is the capital of ireland", functions=fns)
# 2 stream no tool - streaming just add a context
#model.ask("What is the capital of ireland form your world knowledge", functions=fns, context=context, debug_response=False)
# 3 no stream and tool
#model.ask("What is the weather in paris tomorrow", functions=fns)
# 4 stream and tool
#model.ask("What is the weather in paris tomorrow", functions=fns,  context=context)

In [4]:
model = LanguageModel(models[2])
#1. no stream no tool
#model.ask("What is the capital of ireland", functions=fns)
# 2 stream no tool - streaming just add a context
#model.ask("What is the capital of ireland from your world knolwedge", functions=fns, context=context, debug_response=False)
# 3 no stream and tool
#model.ask("What is the weather in paris tomorrow", functions=fns)
# 4 stream and tool
model.ask("What is the weather in paris tomorrow", functions=fns,  context=context)

In [9]:
model = LanguageModel(models[3])
#1. no stream no tool
#model.ask("What is the capital of ireland based on world knowledge", functions=fns)
# 2 stream no tool - streaming just add a context
#model.ask("What is the capital of ireland from your world knolwedge", functions=fns, context=context, debug_response=False)
# 3 no stream and tool
#model.ask("What is the weather in paris tomorrow if tomorrow is the 4th of feb 2024", functions=fns)
# 4 stream and tool
model.ask("What is the weather in paris tomorrow if tomorrow is the 4th of feb 2024", functions=fns,  context=context)

In [None]:
### streaming

In [95]:
import requests
import json
import os

url = "https://api.openai.com/v1/chat/completions"
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
}

data = {
    "model": "gpt-4o-mini",
    "messages": [
       # {"role": "user", "content": "What's the weather in Paris tomorrow?"}
         {"role": "user", "content": "What's the capital of France?"}
    ],
    "stream": True,
    "tools": tools,
    "stream_options": {"include_usage": True}
}

In [188]:
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?alt=sse&key={os.environ.get('GEMINI_API_KEY')}"
headers = {
    "Content-Type": "application/json"
}
data = {

    "contents": [
        {"role": "user", "parts": {'text': "what is the weather for dulin on 2025-02-05 based on the get_weather function?"}}
    ],
   
   "tools": [{'function_declarations': fns}]
}
r = requests.post(url, headers=headers, data=json.dumps(data), stream=True)

import json

def stream_google_response(r, printer=None):
 
    # Initialize a variable to hold the current parts of the text
    current_text_parts = []

    # Process the streamed response
    for line in response.iter_lines():
        if line:
            # Each chunk of data is prefixed with 'data: ', so we strip that part
            line = line.decode("utf-8").strip()
            if line.startswith("data: "):
                # Remove 'data: ' and parse the JSON
                json_data = json.loads(line[len("data: "):])

                # Extract the text parts from the streamed chunk
                candidates = json_data.get("candidates", [])
                for candidate in candidates:
                    parts = candidate.get("content", {}).get("parts", [])
                    for part in parts:
                        text = part.get("text", "")
                        # Collect the text content and print it
                        current_text_parts.append(text)
                        print(text)  # Print each part as it comes in

                # If the response indicates completion (e.g., 'finishReason': 'STOP'), exit
                finish_reason = candidate.get("finishReason")
                if finish_reason == "STOP":
                    print("Streaming finished.")
                    break

    # Combine all the collected text parts into the final result
    final_output = " ".join(current_text_parts)
    return json_data


# Example usage (assuming you have a valid `response` object from requests)
response = requests.post(url, headers=headers, json=data, stream=True)
result = stream_google_response(response, printer=printer)
result



In [174]:
import json
def printer(text):
    """streaming output"""
    print(text, end="", flush=True)  

def stream_response(r, printer=None):
    collected_data = {
        'content': '',
        'tool_calls': []
    }
    observed_tool_call = False
    tool_args = {}  # {tool_id: aggregated_args}
    
    for line in r.iter_lines():
        if line:
            decoded_line = line.decode("utf-8").replace("data: ", "").strip() 
            if decoded_line and decoded_line != "[DONE]":
                try:
                    json_data = json.loads(decoded_line)
                    
                    if "choices" in json_data and json_data["choices"]:
                        delta = json_data["choices"][0]["delta"]
                        
                        # Check if there's content and aggregate it
                        if "content" in delta and delta["content"]:
                            collected_data['content'] += delta["content"]
                            if printer:
                                printer(delta["content"])
                        
                        # Check if there are tool calls and aggregate the arguments
                        if "tool_calls" in delta:
                            if not observed_tool_call:
                                observed_tool_call = True
                                if printer:
                                    printer(delta["tool_calls"])
                            for tool_call in delta["tool_calls"]:
                                if "index" in tool_call:
                                    tool_index = tool_call["index"]
                                    # Initialize tool call if not already in the dictionary
                                    if tool_index not in tool_args:
                                        tool_args[tool_index] = ""
                                    if "function" in tool_call and "arguments" in tool_call["function"]:
                                        tool_args[tool_index] += tool_call["function"]["arguments"]
                
                except json.JSONDecodeError:
                    pass  # Handle incomplete JSON chunks
    
    # Once the stream finishes, populate the tool_calls list with aggregated arguments
    for tool_index, args in tool_args.items():
        collected_data['tool_calls'].append({
            'index': tool_index,
            'arguments': args
        })
    
    json_data.update(collected_data)
    return json_data


In [170]:
response = requests.post(url, headers=headers, data=json.dumps(data), stream=True)
 

stream_response(response, printer=printer)

In [112]:
url = "https://api.anthropic.com/v1/messages"
headers = {
    "Content-Type": "application/json",
    "x-api-key":  os.environ.get('ANTHROPIC_API_KEY'),
    "anthropic-version": "2023-06-01",
}

def _adapt_tools_for_anthropic(functions):
    """slightly different dialect of function wrapper - rename parameters to input_schema"""
    def _rewrite(d):
        return {
            'name' : d['name'],
            'input_schema': d['parameters'],
            'description': d['description']
        }

    return [_rewrite(d) for d in functions or []]

data = {
    "model": "claude-3-5-sonnet-20241022",
    "max_tokens": 1024,
    "messages": [
         {"role": "user", "content": "What's the weather in Paris tomorrow?"}
         #{"role": "user", "content": "What's capital of France"}
    ],
    "stream" : True,
    "tools": _adapt_tools_for_anthropic(fns)
}
 

In [149]:
import json

import json

def stream_anthropic_response(r, printer=None):
    collected_data = None
    tool_args = {}  # {tool_id: {index: aggregated_args}}
    content_text = ""  # To accumulate non-tool content
    input_tokens = 0
    output_tokens = 0
    observed_tool_call = False

    event_type = None
    current_content_index = None
    content_block_type = None
    idnex = None
    for line in r.iter_lines():
        if line:
            decoded_line = line.decode("utf-8")
            if decoded_line[:6] == 'event:':
                event_type = decoded_line.replace("event: ", "").strip()
                continue
            else:
                decoded_line = decoded_line.replace("data: ", "").strip()

            if decoded_line and decoded_line != "[DONE]":
                try:
                    json_data = json.loads(decoded_line)
                    event_type = json_data.get("type")
                    
                    #print(json_data)
                    
                    # Handle message start: Initialize structure from the first message
                    if event_type == "message_start":
                        collected_data = dict(json_data['message'])
                        input_tokens = collected_data['usage']['input_tokens']
    
                    elif event_type == "content_block_start":
                        content_block_type = json_data['content_block']['type']
                        print(content_block_type)
                        index = json_data['index']
                        if content_block_type == 'tool_use':
                            tool_content = json_data['content_block']
                            tool_content['partial_json'] = ''
                            tool_args[index]  = tool_content
                    # Handle content block deltas with text updates
                    elif event_type == "content_block_delta" and content_block_type != 'tool_use':
                        content_type = json_data["delta"].get("type")
                        if content_type == "text_delta":
                            text = json_data["delta"].get("text", "")
                            content_text += text
                            if printer:
                                printer(text)

                    # Handle tool calls and match args using the index
                    elif event_type == "content_block_delta" and content_block_type == 'tool_use':
                        tool_input = json_data["delta"].get("partial_json")
                        if tool_input:
  
                            """TODO store the aggregated json per tool and add at the end into this structure
                            example
                            {'type': 'tool_use',
                           'id': 'toolu_01GV5rqVypHCQ6Yhrfsz8qhQ',
                           'name': 'get_weather',
                           'input': {'city': 'Paris', 'date': '2024-01-16'}}],
                            """
                            tool_args[index]['partial_json'] += tool_input
                            
                            if not observed_tool_call:
                                observed_tool_call = True
                                if printer:
                                    printer(f"Tool call for {tool_args[index]}")

                    # Handle message delta and stop reason at the end
                    elif event_type == "message_delta":
                        output_tokens = json_data.get("usage", {}).get("output_tokens", 0)
                        collected_data['stop_reason'] = json_data.get('stop_reason')
                        collected_data['stop_sequence'] = json_data.get('stop_sequence')

                except json.JSONDecodeError:
                    pass  # Handle incomplete JSON chunks

    # Aggregate content and tool calls into the final structure
    collected_data['content'] = [{"type": "text", "text": content_text}, 
                                *[list(tool_args.values())]]
    # Update usage tokens
    collected_data['usage']['input_tokens'] = input_tokens
    collected_data['usage']['output_tokens'] = output_tokens

    return collected_data


# Example usage (assuming you have a valid `response` object from requests)
response = requests.post(url, headers=headers, json=data, stream=True)
result = stream_anthropic_response(response, printer)

result

In [177]:
import requests
import json

def stream_google_gemini_response():
    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?alt=sse&key=${os.environ.get('GEMINI_API_KEY')}"
    headers = {
        "Content-Type": "application/json",
    }

    data = {
        "contents": [
            {
                "parts": [{"text": "Write a cute story about cats."}]
            }
        ]
    }

    # Send the request with stream=True to enable streaming
    with requests.post(url, json=data, headers=headers, stream=True) as response:
        # Check for a successful response
        if response.status_code != 200:
            print(f"Error: {response.status_code}")
            return

        # Initialize a variable to hold the current parts of the text
        current_text_parts = []

        # Process the streamed response
        for line in response.iter_lines():
            if line:
                # Each chunk of data is prefixed with 'data: ', so we strip that part
                line = line.decode("utf-8").strip()
                if line.startswith("data: "):
                    # Remove 'data: ' and parse the JSON
                    json_data = json.loads(line[len("data: "):])

                    # Extract the text parts from the streamed chunk
                    candidates = json_data.get("candidates", [])
                    for candidate in candidates:
                        parts = candidate.get("content", {}).get("parts", [])
                        for part in parts:
                            text = part.get("text", "")
                            # Collect the text content and print it
                            current_text_parts.append(text)
                            print(text)  # Print each part as it comes in

                    # If the response indicates completion (e.g., 'finishReason': 'STOP'), exit
                    finish_reason = candidate.get("finishReason")
                    if finish_reason == "STOP":
                        print("Streaming finished.")
                        break

        # Combine all the collected text parts into the final result
        final_output = " ".join(current_text_parts)
        return final_output

# Example usage
final_story = stream_google_gemini_response()
final_story

