In [27]:
import json
import openai
from typing import List, Dict, Any, Optional
import requests
import os
from pydantic import BaseModel, Field
import openai
from dotenv import load_dotenv

load_dotenv()

class RetrievalRequest(BaseModel):
    chat_history: List[Dict[str, str]]
    top_k: int = 5

# Rest of your existing code...
class Settings:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    BACKEND_URL = os.getenv("BACKEND_URL")

settings = Settings()

class ActionCollectiveRequest(BaseModel):
    """Discover and execute actions to solve a task that may require computation or retrieval of information"""
    thought: str = Field(..., description="Thoughts about the task that needs to be solved and how to resolve it.")
    is_action_needed: bool = Field(..., description="Whether an action is needed to resolve the task.")
    tool_description: str = Field(..., description="Description of the tool that can be used to resolve the task.")

class ActionDataGenerator(BaseModel):
    """Action data for general use of action"""
    input_json_schema: str = Field(
        ..., 
        description="""JSON Schema defining the expected input parameters for the action function.
Simple example of a JSON Schema for input_json_schema and output_json_schema:
```
{
    "type": "object",
    "description": "The description of this item",
    "properties": {
        "id": {
            "description": "The id of this inner item",
            "type": "integer"
        },
        "value": {
            "type": "array",
            "description": "The list of values of this inner item",
            "items": {
                "type": "string",
                "description": "The value of this inner item",
                "enum": ["a", "b"]
            },
        }
    },
    "required": ["value", "id"],
    "additionalProperties": false,
}```
MAKE SURE TO INCLUDE A DESCRIPTION FOR ALL PROPERTINES NOT ONLY THE ROOT BUT ALL NESTED PROPERTIES"""
    )
    output_json_schema: str = Field(
        ..., 
        description="JSON Schema defining the expected return value from the action function. FOLLOW THE EXAMPLE DELIMITED BY TRIPLE BACKTICKS"
    )
    code: str = Field(
        ..., 
        description="""Python code implementing a function named 'action' that takes parameters matching the input_json_schema.
It must return a value matching the output_json_schema.
You only have access to the following libraries: numpy, requests. You must import them at the top of the file if you would like to use them.
Example delimited by triple backticks:
```
def action(input1: int, input2: int) -> dict:
    # sum of the two inputs

    return {"result": input1 + input2}
```"""
    )
    test: str = Field(
        ..., 
        description="""This is code that will be appended to the previous code. It will be used to test the action function.
It must use assertions to validate the action function works correctly with static test inputs derived from the current task.
The test should NEVER be quantitative, it should only be qualitative. If it does need to be quantitative than it should be code not hard values.
Example delimited by triple backticks:
```
i1 = 7
i2 = 8
sum = action(input1, input2)
assert isinstance(sum["result"], int)
assert sum["result"] == i1 + i2
```"""
    )

class ActionData(ActionDataGenerator):
    chat_history: List[Dict[str, str]]  # List of chat messages

class ActionExecutionPayload(BaseModel):
    action_data: ActionData
    params: dict

class ActionClient:
    def __init__(self):
        self.client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
        self.chat_history: List[Dict[str, str]] = []
        self.internal_chat_history: List[Dict[str, str]] = []

    def retrieve_actions(self, k: int = 5, threshold: float = 0.7) -> list[ActionData]:
        response = requests.post(
            f"{settings.BACKEND_URL}/retrieve_actions",
            json={"chat_history": self.chat_history, "top_k": k, "threshold": threshold},
        )
        # TODO: can query with the action though included in the chat history\
        response = response.json()
        print("\n\nretrieve_actions:\n", json.dumps(response, indent=4))
        retrieval_response = [ActionData.model_validate(action) for action in response]
        return retrieval_response


    def produce_action_thought(self) -> ActionCollectiveRequest:
        completion = self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=self.chat_history + self.internal_chat_history,
            response_format=ActionCollectiveRequest,
        )

        parsed_response = completion.choices[0].message.parsed

        print("\n\nproduce_action_thought:\n", parsed_response)

        # if no action is needed
        if not parsed_response or not parsed_response.is_action_needed:
            raise Exception("No action thought created")
        
        return parsed_response
    
    def create_action_data(self) -> ActionData:
        response = self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=self.chat_history + self.internal_chat_history,
            response_format=ActionDataGenerator,
        )

        action_data_generator = response.choices[0].message.parsed

        print("\n\ncreate_action_data:\n", action_data_generator.model_dump_json(indent=4))

        if not action_data_generator:
            raise Exception("No action created")
        
        # MITIGATE COMMON ERROR: add additionalProperties to the input_json_schema
        action_data_generator.input_json_schema = json.dumps({**json.loads(action_data_generator.input_json_schema), "additionalProperties": False})
        action_data_generator.output_json_schema = json.dumps({**json.loads(action_data_generator.output_json_schema), "additionalProperties": False})


        action_data_generator.code = action_data_generator.code.replace("```python", "").replace("```", "").strip()
        action_data_generator.test = action_data_generator.test.replace("```python", "").replace("```", "").strip()
        
        action_data = ActionData.model_validate({**action_data_generator.model_dump(), "chat_history": self.chat_history})
        
        return action_data
    
    def upload_action(self, action_schema: ActionData) -> bool:
        response = requests.post(
            f"{settings.BACKEND_URL}/submit_action",
            json=action_schema.model_dump(),
        )
        updated_action = response.json()
        print("\n\nupdated_action:\n", updated_action)
        if not updated_action:
            raise Exception("Failed to upload action")
        return updated_action
    
    def validate_schema(self, schema: dict) -> None:
        try:    
            self.client.beta.chat.completions.parse(
                model="gpt-4o-mini",
                messages=[
                    {
                        "role": "user",
                        "content": "a"
                    }
                ],
                max_completion_tokens=1,
                response_format={
                    "type": "json_schema",
                    "json_schema": {
                        "name": "action_items",
                        "description": "The action items to be completed",
                        "strict": True,
                        "schema": schema,
                    },
                }
            )
            if not schema["description"]:
                raise Exception("ERROR: Description is required for all properties")
        except openai._exceptions.LengthFinishReasonError as e:
            print("\n\nPASSED schema validation")
            pass
        except Exception as e:
            print("\n\nFAILED schema validation", e)
            raise e


    def get_action_data(self, max_retries: int = 3) -> ActionData:
        # First, try to retrieve existing actions
        retrieval_response = self.retrieve_actions(k=5, threshold=0.7)

        # if action is retrieved
        if retrieval_response:
            return retrieval_response[0]

        # If suitable actions found, use them
        # TODO: Implement logic to determine if retrieved actions are suitable


        retries = 0 
        while retries < max_retries:
            action_schema = self.create_action_data()
            try:
                complete_test = action_schema.code + "\n" + action_schema.test
                loaded_schema = json.loads(action_schema.input_json_schema)
                self.validate_schema(loaded_schema)
                print("\n\nexecuting:\n\n", complete_test)
                exec(complete_test, {})
                print("\n\nPASSED")
                break
            except Exception as e:
                print(f"\n\nRETRY {retries} FAILED", e)
                
                self.internal_chat_history.append({"role": "assistant", "content": f"""Action data content
```
{action_schema.model_dump_json()}
```
RETRY {retries} FAILED: {e}
Make sure to maintain a simple JSON Schema as in the example."""})
                retries += 1
        else:
            raise Exception("Failed to create action data")

        self.upload_action(action_schema)
        return action_schema
    
    def build_action_execution_payload(self, action_data: ActionData) -> ActionExecutionPayload:
        print("\n\nChat History Pre Params:\n", self.chat_history)
        action_params = self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=self.chat_history[:-1],
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": "action_items",
                    "description": "The action items to be completed",
                    "strict": True,
                    "schema": json.loads(action_data.input_json_schema),
                },
            },
        )

        if not action_params.choices[0].message.content:
            raise Exception("Failed to get action params")
        
        params = json.loads(action_params.choices[0].message.content)

        print("\n\nparams:\n", params)

        action_execution_payload = ActionExecutionPayload(
            action_data=action_data,
            params=params
        )

        return action_execution_payload
    
    def execute_action(self, action_execution_payload: ActionExecutionPayload):
        action_data = action_execution_payload.action_data
        params = action_execution_payload.params
        
        # Create a namespace for execution
        namespace = {}
        
        # Execute the action code to define the function in our namespace
        exec(action_data.code, namespace)
        
        # Execute the action function with unpacked parameters
        result = namespace['action'](**params)

        print("\n\nresult:\n", result)
        
        return result

    def pipe(self, user_message: str, max_retries: int = 3) -> Optional[str]:
        # Add user message to chat history
        self.chat_history.append({"role": "user", "content": user_message})

        action_thought = self.produce_action_thought()
        
        self.chat_history.append(
            {
                "role": "assistant",
                "content": f"""
                            THOUGHT:
                                {action_thought.thought}
                            TOOL DESCRIPTION:
                                {action_thought.tool_description}"""
            }
        )

        action_data = self.get_action_data(max_retries=max_retries)

        print("\n\naction_data:\n", action_data.model_dump_json(indent=4))

        action_execution_payload = self.build_action_execution_payload(action_data)
    

        action_result = self.execute_action(action_execution_payload)

        self.chat_history.append({"role": "assistant", "content": f"RESULT FROM ACTION: {action_result} \n\n Now I will summarize the result and return it."})

        summary_response = self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=self.chat_history,
        )

        summary = summary_response.choices[0].message.content

        self.chat_history.append({"role": "assistant", "content": summary})

        return summary


In [28]:
client = ActionClient()
response = client.pipe(
"""Please perform the matrix multiplication of A x B and return the result, here are the variables:
A = [[1, 2, 3, 4, 5],
        [6, 7, 7, 9, 10],
        [11, 12, 13, 14, 15],
        [16, 17, 7, 19, 20],
        [21, 22, 23, 24, 25]]
B = [[1, 2, 3, 4, 5],
        [6, 7, 8, 9, 10],
        [11, 12, 7, 14, 15],
        [16, 17, 18, 19, 20],
        [21, 22, 23, 24, 25]]"""
)
print(response)






produce_action_thought:
 thought='To perform the matrix multiplication of A and B, I need to follow the rules of matrix multiplication, which involves taking the dot product of rows from A with columns from B. I will calculate the resulting matrix step by step.' is_action_needed=True tool_description='I can use a computation tool or algorithm to perform matrix multiplication.'


retrieve_actions:
 [
    {
        "input_json_schema": "{\"type\": \"object\", \"description\": \"Matrices for multiplication\", \"properties\": {\"A\": {\"type\": \"array\", \"description\": \"The first matrix\", \"items\": {\"type\": \"array\", \"items\": {\"type\": \"integer\"}}}, \"B\": {\"type\": \"array\", \"description\": \"The second matrix\", \"items\": {\"type\": \"array\", \"items\": {\"type\": \"integer\"}}}}, \"required\": [\"A\", \"B\"], \"additionalProperties\": false}",
        "output_json_schema": "{\"type\": \"array\", \"description\": \"Resultant matrix after multiplying A and B\", \"item

In [26]:
# dot product these two matrices:
A = [[1, 2, 3, 4, 5],
        [6, 7, 7, 9, 10],
        [11, 12, 13, 14, 15],
        [16, 17, 7, 19, 20],
        [21, 22, 23, 24, 25]]
B = [[1, 2, 3, 4, 5],
        [6, 7, 8, 9, 10],
        [11, 12, 7, 14, 15],
        [16, 17, 18, 19, 20],
        [21, 22, 23, 24, 25]]

import numpy as np

print(np.dot(A, B))


[[ 215  230  227  260  275]
 [ 479  518  515  596  635]
 [ 765  830  817  960 1025]
 [ 919  998 1035 1156 1235]
 [1315 1430 1407 1660 1775]]
