In [1]:
import json
from copy import deepcopy

# from pydantic.dataclasses import dataclass
from dataclasses import asdict, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union, get_args

import logfire
import nest_asyncio
from dotenv import load_dotenv
from IPython.display import Markdown
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler, field_validator
from pydantic.dataclasses import dataclass

# from dataclasses import dataclass
from pydantic_ai import Agent
from pydantic_ai.agent import EndStrategy
from pydantic_ai.models import KnownModelName
from pydantic_ai.result import ResultData
from pydantic_ai.settings import ModelSettings
from pydantic_ai.tools import AgentDeps, Tool, ToolDefinition, ToolFuncContext, ToolFuncEither
from pydantic_core import ValidationError, core_schema
from rich import print

# from agentgenius.builtin_tools import get_datetime, get_installed_packages, get_location_by_ip
load_dotenv()

nest_asyncio.apply()
logfire.configure(send_to_logfire="if-token-present")

<logfire._internal.main.Logfire at 0x7092c1f020f0>

In [2]:
def get_datetime(fromat: str = "%Y-%m-%d %H:%M:%S") -> str:
    """Get the current datetime as a string in the specified python format."""
    from datetime import datetime

    return datetime.now().strftime(fromat)

In [3]:
ToolType = Union[str, Callable[[], str]]


@dataclass
class ToolDef:
    function: ToolType = field(default="", repr=True)

    _function: Optional[ToolType] = field(default=None, repr=False, init=False)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __init__(self, function: str | Callable, namespace: dict = globals()):
        self.function = function
        if isinstance(self.function, str):
            self.function = function
            self._function = self._get_callable(globals())
        elif isinstance(self.function, Callable):
            self._function = self.function
            self.function = self.function.__name__
        self.__qualname__ = f"ToolDef.{self.function}"

    def _get_callable(self, namespace: dict) -> Callable:
        namespace = globals() | namespace
        return namespace.get(self.function, None)

    # def __repr__(self):
    #     return f"ToolDef(function={self.function})"

    @property
    def __name__(self) -> str:
        return str(self.function)

    def __call__(self):
        return self._function

In [4]:
t_def = ToolDef(get_datetime)
print(t_def)
t_def()

In [5]:
from pydantic.dataclasses import dataclass


@dataclass
class MyClass:
    a: int
    b: int  # Replace 'something' with the appropriate type

    def __post_init__(self):
        if self.a:
            self._internal = self.a * self.b  # Initialize the internal variable

    def get_internal(self):
        return getattr(self, "_internal", None)  # Safely access the internal variable


# Usage
my_instance = MyClass(a=5, b=10)
print(my_instance.get_internal())  # Output: 50

In [6]:
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage, ModelResponse
from pydantic_ai.models.function import AgentInfo, FunctionModel

agent = Agent()


@agent.tool_plain
def foobar(a: int, b: str, c: dict[str, list[float]]) -> str:
    """Get me foobar.

    Args:
        a: apple pie
        b: banana cake
        c: carrot smoothie
    """
    return f"{a} {b} {c}"


def print_schema(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
    tool = info.function_tools[0]
    print(tool.description)
    # > Get me foobar.
    print(tool.parameters_json_schema)
    """
    {
        'properties': {
            'a': {'description': 'apple pie', 'title': 'A', 'type': 'integer'},
            'b': {'description': 'banana cake', 'title': 'B', 'type': 'string'},
            'c': {
                'additionalProperties': {'items': {'type': 'number'}, 'type': 'array'},
                'description': 'carrot smoothie',
                'title': 'C',
                'type': 'object',
            },
        },
        'required': ['a', 'b', 'c'],
        'type': 'object',
        'additionalProperties': False,
    }
    """
    return ModelResponse.from_text(content="foobar")


agent.run_sync("hello", model=FunctionModel(print_schema))

21:29:49.555 agent run prompt=hello
21:29:49.556   preparing model and tools run_step=1
21:29:49.556   model request


21:29:49.755   handle model response


RunResult(_all_messages=[ModelRequest(parts=[UserPromptPart(content='hello', timestamp=datetime.datetime(2025, 1, 13, 21, 29, 49, 556040, tzinfo=datetime.timezone.utc), part_kind='user-prompt')], kind='request'), ModelResponse(parts=[TextPart(content='foobar', part_kind='text')], timestamp=datetime.datetime(2025, 1, 13, 21, 29, 49, 560769, tzinfo=datetime.timezone.utc), kind='response')], _new_message_index=0, data='foobar', _result_tool_name=None, _usage=Usage(requests=1, request_tokens=51, response_tokens=1, total_tokens=52, details=None))

In [7]:
@dataclass
class ToolDef:
    name: str = field(default="", repr=True)
    # _function: Optional[Tool] = field(default=None, repr=False, init=False)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __post_init__(self):
        self._function = self._get_callable(namespace=globals())
        self.__qualname__ = f"ToolDef.{self.name}"

    def _get_func(self):
        return getattr(self, "_function", None)

    def _get_callable(self, *, namespace: dict) -> Callable:
        result = namespace.get(self.name, None)
        if result:
            self._function = result
            return result
        else:
            raise ValueError(f"Tool {self.name} not found in namespace")

    def __call__(self):
        return self._function()

    # def __repr__(self):
    #     return f'ToolDef(name="{self.name}")'

    # def __getitem__(self, item):
    #     return self.getattr(self, "_function")

    @property
    def __name__(self) -> str:
        return self.name

    # def model_dump(self, *args, **kwargs):
    #     kwargs["exclude"] = {"_function", *kwargs.get("exclude", set())}
    #     return super().model_dump(*args, **kwargs)


# TypeVar("ToolList", bound=List[str])
# ToolList = TypeVar("ToolList", bound=List[ToolDef])

In [8]:
tool = ToolDef("get_datetime")
print(tool)
tool()

'2025-01-13 22:29:49'

In [9]:
type(tool)

__main__.ToolDef

In [10]:
ToolType = Union[str, Callable, Sequence[Union[str, Callable]]]


@dataclass(init=False)
class ToolSet:
    tools: List[ToolDef] = field(init=True, default_factory=list, repr=True)

    @field_validator("tools", mode="plain")
    @classmethod
    def accept_other(cls, v):
        if isinstance(v, (list, str, Callable)):
            return v
        else:
            raise ValueError(f"Tool must be a callable or a string, not {type(v)}")

    def __post_init__(self):
        tools = deepcopy(self.tools)
        self.tools = []
        for tool in tools:
            self.add(tool)

    def add(self, tool: ToolType) -> None:
        if isinstance(tool, Callable):
            t = ToolDef(name=tool.__name__)
            self.tools.append(t)
        elif isinstance(tool, str):
            t = ToolDef(name=tool)
            self.tools.append(t)
        elif isinstance(tool, list):
            for t in tool:
                print(t)
                self.add(t)
        else:
            raise ValueError(f"Tool must be a callable or a string, not {type(tool)}")

    def remove(self, name: str) -> Tool:
        tool = self.get(name)
        if tool:
            self.tools.remove(tool)
        return tool

    def get(self, name: str, default=None, *, namespace: dict = globals()):
        return next((tool._get_callable(namespace=globals()) for tool in self.tools if tool.name == name), default)

    def __iter__(self):
        return iter(self.tools)

    # def __getitem__(self, item):
    #     return self.get(item)
    def __getitem__(self, item):
        return self.tools[item]._get_callable(namespace=globals())

    def __len__(self):
        return len(self.tools)

    def __contains__(self, value):
        if isinstance(value, str):
            return self.get(value) is not None
        elif isinstance(value, Callable):
            return value in self.tools
        else:
            raise ValueError("value must be a type of string or callable")

    def __str__(self):
        return str(self.tools)

    def all(self):
        return [tool.name for tool in self.tools]

    def init(self, *, namespace: dict = globals()):
        # return [(tool._get_callable(namespace=namespace).__name__) for tool in self.tools]
        for tool in self.tools:
            tool._get_callable(namespace=namespace)

In [11]:
# ToolSet = list[ToolDefinition]


def test(_: int = 0):
    """Test tool."""
    return "1 passed"


def test2():
    """Test tool."""
    return "2 passed"


tset = ToolSet()
print(tset)

# try:
#     tset = ToolSet([Tool(test)])
# except ValueError as e:
#     print("Error:", e)

tset.add(test)
print(tset)

tset = ToolSet(["test"])
print(tset)

tset = ToolSet([test])
print(tset)

tset = ToolSet([test, test2])
print(tset)
print(tset.all())
t = tset.get("test")
print(t())

print(tset[0]())
# print(tset["test"]())
print(t.__name__)
# print(f"{json.dumps(t)=}", end="\n\n")

In [12]:
Toolset = List[Tool]

In [13]:
# @dataclass
# class AgentParams:
#     params: dict[str, Any]
#     # result_type: type[ResultData] = str,
#     # deps_type: type[AgentDeps] = NoneType,
#     # model_settings: ModelSettings | None = None,
#     # retries: int = 1,
#     # result_tool_name: str = 'final_result',
#     # result_tool_description: str | None = None,
#     # result_retries: int | None = None,
#     # tools: Sequence[Tool[AgentDeps] | ToolFuncEither[AgentDeps, ...]] = (),
#     # defer_model_check: bool = False,
#     # end_strategy: EndStrategy = 'early',


AgentParams = Dict[str, Any]


@dataclass
class AgentDef:
    model: KnownModelName
    name: str
    system_prompt: str
    params: AgentParams = field(default_factory=dict, repr=True)

In [14]:
a1 = AgentDef(
    model="openai:gpt-4o-mini",
    name="planner",
    system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Do not answer the user questions. Just make a plan how to do this.",
    params={
        "result_type": "TaskDef",
        "retries": "3",
    },
)
a2 = AgentDef(
    model="openai:gpt-4o-mini",
    name="date agent",
    system_prompt="You are date agent.",
)
print(a1)
print(a2)

In [15]:
def avilable_agents() -> list[AgentDef]:
    a1 = AgentDef(
        model="openai:gpt-4o-mini",
        name="planner",
        system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Do not answer the user questions. Just make a plan how to do this.",
        params={
            "result_type": "TaskDef",
            "retries": "3",
        },
    )
    a2 = AgentDef(
        model="openai:gpt-4o-mini",
        name="date agent",
        system_prompt="You are date agent.",
    )
    return [a1, a2]


print(f"{avilable_agents()=}", end="\n\n")

In [16]:
@dataclass
class TaskDef:
    name: str
    question: str
    priority: int
    agent: Optional[AgentDef] = field(default=None, init=True, repr=False)
    toolset: Optional[ToolSet] = field(default=None, init=True, repr=False)

    def __lt__(self, other):
        return self.priority < other.priority

In [17]:
t = TaskDef(name="get_datetime", question="make a plan on how to get the current datetime", priority=1)
print(f"{t=}", end="\n\n")
t2 = TaskDef(name="get_datetime", question="make a plan on how to get the current datetime", priority=2)
print(f"{t2=}", end="\n\n")
print(f"{t < t2 = }", end="\n\n")
print(f"{t2 < t = }", end="\n\n")

In [18]:
@dataclass
class Task:
    agent: Agent = field(init=False, repr=False)
    task: TaskDef
    agent_def: AgentDef
    toolset: ToolSet = field(default_factory=list, init=True, repr=True)

    def __post_init__(self):
        self.agent = Agent(
            model=self.agent_def.model,
            name=self.agent_def.name,
            system_prompt=self.agent_def.system_prompt,
            tools=self.toolset,
            **self.agent_def.params if self.agent_def.params else {},
        )

    async def run(self, *args, **kwargs):
        if self.task.question and args:
            query = f"{self.task.question}: {args[0]}"
        else:
            query = self.task.question
        return await self.agent.run(query)

    def run_sync(self, *args, **kwargs):
        if self.task.question and args:
            query = f"{self.task.question}: {args[0]}"
        else:
            query = self.task.question
        return self.agent.run_sync(query)

In [19]:
# print(f"{get_datetime()=}", end="\n\n")
resp = Tuple[TaskDef, AgentDef]
t = Task(
    task=TaskDef(name="get_datetime", question="make a plan on how to get the current datetime", priority=1),
    agent_def=AgentDef(
        model="openai:gpt-4o-mini",
        name="planner",
        system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Get the list of available agents by calling 'agents_available' or create a new agent . Do not answer the user questions. Just make a plan how to do this.",
        params={
            "result_type": list[str],
            "retries": 3,
        },
    ),
    toolset=ToolSet([get_datetime]),
)


@t.agent.tool_plain
def tools_available():
    """get list of available tools"""
    return ["get_datetime"]


@t.agent.tool_plain
def agents_available():
    "get list of available agents"
    return [
        AgentDef(
            model="openai:gpt-4o-mini",
            name="date agent",
            system_prompt="You are a date agent.",
        ),
        AgentDef(
            model="openai:gpt-4o-mini",
            name="planner",
            system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Do not answer the user questions. Just make a plan how to do this.",
        ),
    ]


# print(t)
# result = t.run_sync()
# print(result.data)
# td = Task(result)
# print(td())

In [22]:
def get_datetime(format: str = "%Y-%m-%d %H:%M:%S") -> str:
    """Get the current datetime as a string in the specified python format."""
    from datetime import datetime

    return datetime.now().strftime(format)


def get_user_ip_and_location() -> str:
    """Get the public IP address and location of the current machine using an external service."""
    import requests

    try:
        response = requests.get("https://ipinfo.io")
        return response.text
    except requests.RequestException as e:
        return f"Error: {str(e)}"


def get_installed_packages() -> str:
    """Get a list of all installed Python packages."""
    import pkg_resources

    installed_packages = pkg_resources.working_set
    sorted_packages = sorted([f"{i.key}" for i in installed_packages])
    return ", ".join(sorted_packages)

In [23]:
planner = Task(
    task=TaskDef(name="planner", question="make a short plan how to archive this task", priority=1),
    agent_def=AgentDef(
        model="openai:gpt-4o",
        name="planner",
        system_prompt="""You are a planner. your goal is to make a step by step plan for other agents. 
        Do not answer the user questions. Just make a very short plan how to do this. 
        AlWAYS MAKE SURE TO ADD APPROPRIATE TOOLS TO THE PLAN. You can get the list of available tools by calling 'get_available_tools'.
        Efficiently is a priority, so don't waste time on things that are not necessary.
        LESS STEPS IS BETTER (up to 3 steps), so make it as short as possible.""",
        params={
            "result_type": list[Task],
            "retries": 3,
        },
    ),
    # toolset=ToolSet(["get_datetime", "get_user_ip_and_location", "get_installed_packages"]),
)


@planner.agent.system_prompt
def get_available_tools():
    """Return a list of available tools. Do not use these tools.
    Just let the other agents to use them."""
    tools = ["get_datetime", "get_user_ip_and_location", "get_installed_packages"]
    return f"Available tools: {', '.join(tools)}"


result = planner.run_sync("what time is it?")
# result = planner.run_sync("how to get my location by IP?")
print(result.data)
plan = result.data[0].run_sync()
print(plan.data)

21:30:07.972 planner run prompt=make a short plan how to archive this task: what time is it?
21:30:07.973   preparing model and tools run_step=1
21:30:07.973   model request
21:30:09.640   handle model response


21:30:09.644 get_time_agent run prompt=What time is it?
21:30:09.645   preparing model and tools run_step=1
21:30:09.645   model request
21:30:10.649   handle model response
21:30:10.650     running tools=['get_datetime']
21:30:10.651   preparing model and tools run_step=2
21:30:10.651   model request
21:30:11.241   handle model response


In [24]:
select_agents = Task(
    task=TaskDef(name="select_agents", question="select agents to work on this task or create a new agent", priority=1),
    agent_def=AgentDef(
        model="openai:gpt-4o-mini",
        name="agent selector",
        system_prompt="""Your goal is to select agents to work on this task. Get the list of available agents by calling 'agents_available'.
        IF NO APPLICABLE AGENTS ARE AVAILABLE, CREATE A NEW AGENT.
        Do not answer the user questions, just select agent to work on this task.""",
        params={
            "result_type": AgentDef,
            "retries": 3,
        },
    ),
)


# @select_agents.agent.tool_plain
# def agents_available():
#     "get list of available agents"
#     return [
#         AgentDef(
#             model="openai:gpt-4o-mini",
#             name="date agent",
#             system_prompt="You are a date agent.",
#         ),
#         AgentDef(
#             model="openai:gpt-4o-mini",
#             name="planner",
#             system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Do not answer the user questions. Just make a plan how to do this.",
#         ),
#     ]


agents = []
for task in plan:
    result = select_agents.run_sync(task.question)
    agents.append(result.data)

print(agents)

result = select_agents.run_sync("what time is today?")
print(result.data)

TypeError: 'RunResult' object is not iterable

In [None]:
@dataclass
class TaskList:
    tasks: list[TaskDef]

    def __iter__(self):
        return iter(self.tasks)

    def __getitem__(self, item):
        return self.tasks[item]

    def append(self, task: TaskDef):
        self.tasks.append(task)

    # @classmethod
    # def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
    #     instance_schema = core_schema.is_instance_schema(cls)

    #     args = get_args(source)
    #     if args:
    #         # replace the type and rely on Pydantic to generate the right schema
    #         # for `Sequence`
    #         sequence_t_schema = handler.generate_schema(list[args[0]])
    #     else:
    #         sequence_t_schema = handler.generate_schema(list)

    #     non_instance_schema = core_schema.no_info_after_validator_function(TaskList, sequence_t_schema)
    #     return core_schema.union_schema([instance_schema, non_instance_schema])


# TaskList = Dict[int,TaskDef]

tl = TaskList(
    [
        TaskDef(
            name="test3",
            description="test 3",
            priority=3,
            agent=AgentDef(name="test3", model="openai:gpt-4o-mini", system_prompt="You are agent 3"),
        ),
        TaskDef(name="test1", question="test 1", priority=1),
        TaskDef(name="test2", question="test 2", priority=2),
    ]
)

print(f"{tl=}")
print(f"{sorted(tl)=}")

In [None]:
toolset = ToolSet(["get_datetime"])
# tollset = None
toolset = [get_datetime]
print(toolset)

agent = AgentDef(
    model="openai:gpt-4o-mini",
    name="planner",
    system_prompt="You are a planner. your goal is to make a step by step plan for other agents. Do not answer the user questions. Just make a plan how to do this.",
    params={"result_type": TaskList, "retries": 3},
)
print(agent)

task_def = TaskDef(name="get_datetime", description="make a plan on how to get the current datetime", priority=1)
print(task_def)


task = Task(task=task_def, agent_def=agent, toolset=toolset)
print(task)

# @task.agent.tool_plain
# def tools_available():
#     """get list of available tools"""
#     return Toolset([get_datetime])


# @task.agent.tool_plain
# def agents_available():
#     """get list of available agents"""
#     return [
#         AgentDef(
#             model="openai:gpt-4o-mini",
#             name="date agent",
#             system_prompt="You are a date agent.",
#         ),
#         AgentDef(
#             model="openai:gpt-4o-mini",
#             name="planner",
#             system_prompt="""You are a planner. your goal is to make a step by step plan for other agents.
#             Get the list of available agents by calling 'agents_available' or create a new agent.
#             You must assign an agent and toolset to every task.
#             Do not answer the user questions. Just make a plan how to do this.""",
#         ),
#     ]

t
result = task.run_sync()

print(f"{result=}", end="\n\n")
print(f"{result.data=}", end="\n\n")

t = result.data[0]

test_task = Task(task=t, agent_def=t.agent, toolset=t.toolset)
test_result = test_task.run_sync()
Markdown(test_result.data)

In [None]:
def foo():
    pass


f = foo
print(f)
print(f.__name__)
print(f.__qualname__)

from agentgenius.builtin_tools import get_datetime

print(get_datetime)
print(get_datetime.__name__)
print(get_datetime.__qualname__)