In [1]:

from dataclasses import dataclass
from pydantic import BaseModel, ConfigDict, SecretStr
from langchain_core.language_models import BaseLanguageModel

from configs.apis import APIS
from libs.printing import eprint


In [None]:
DEFAULT_MODELS = {
    "llm": {
        "api": "openai",  # MODELS[api]['key']
        "size": "default",  # ['models']['llms'][size]
    },
    "embedder": {
        "api": "nvidia",  # MODELS[api]['key']
        "size": "default",  # ['models']['embedders'][size]
    },
}

class ModelConfig(BaseModel):
    """Class for model configuration"""
    type: str | None = "llm"
    size: str | None = "default"
    api: str | None = DEFAULT_MODELS["llm"]["api"]
    api_key: SecretStr | str | None = None

    def __init__(self, *args, **kwargs): 
        self.type = kwargs.get("type", "llm")
        self.size = kwargs.get("size", DEFAULT_MODELS["llm"]["size"])
        self.api = kwargs.get("size", DEFAULT_MODELS["llm"]["api"])
        self.api_key = APIS[self.api]["key"].get_secret_value()



In [None]:

# @dataclass(slots=False)
class Model(BaseModel):
    """Class for LLM and Embedder clients"""

    config: ModelConfig = ModelConfig()
    name: str | None
    client: BaseLanguageModel | None

    def __init__(self, model_config: ModelConfig = ModelConfig()):
        self.config = model_config
        self.name = MODELS[self.api][self.type][self.size]
        self.client = BaseLanguageModel

    def instantiate_client(self, agent_config):
        if self.api == "openai":

            self.client = ChatOpenAI(
                model=self.name,
                openai_api_key=self.api_key.get_secret_value(),
                max_tokens=agent_config.max_tokens,
                temperature=agent_config.temperature,
                # top_p=agent_config.top_p,
                max_retries=agent_config.max_retries,
                timeout=agent_config.timeout,
                response_format=agent_config.response_format,
            )
        elif self.api == "nvidia":
            self.client = ChatNVIDIA(
                model=self.name,
                api_key=self.api_key.get_secret_value(),
                temperature=agent_config.temperature,
                # top_p=agent_config.top_p,
                seed=agent_config.seed,
                max_tokens=agent_config.max_tokens,
            )
        elif self.api == "anthropic":
            self.client = ChatAnthropic(
                model=self.name,
                api_key=self.api_key.get_secret_value(),
                temperature=agent_config.temperature,
                # top_p=agent_config.top_p,
                seed=agent_config.seed,
                max_tokens=agent_config.max_tokens,
            )


In [None]:
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field

# Import the latest OpenAI LLM wrapper from Langchain.
from langchain.llms import OpenAI

In [None]:
class Task(BaseModel):
    """
    Represents a Task assigned to an Agent.
    """
    name: str
    description: Optional[str] = None
    parameters: Optional[Dict[str, Any]] = None

    def execute(self, input_data: Optional[Dict[str, Any]] = None) -> str:
        """
        Execute the task with the provided input data.
        This creates a prompt or command string that the Agent's Model will process.
        """
        data = input_data or self.parameters or {}
        return f"Executing task '{self.name}' with data: {data}"


class Model(BaseModel):
    """
    Represents an LLM backend following the latest OpenAI and Langchain API best practices.
    
    This model wraps an OpenAI language model instance using Langchain's OpenAI LLM.
    """
    name: str
    version: Optional[str] = "1.0"
    openai_api_key: Optional[str] = None
    model_name: Optional[str] = None

    # Private field for the underlying Langchain LLM instance.
    _llm: Optional[OpenAI] = None

    class Config:
        # Allow non-Pydantic types (like the Langchain LLM instance).
        arbitrary_types_allowed = True

    def init_llm(self) -> None:
        """
        Initialize the Langchain OpenAI LLM using the provided API key and model name.
        """
        if not self.openai_api_key or not self.model_name:
            raise ValueError("Both 'openai_api_key' and 'model_name' must be provided to initialize the LLM.")
        self._llm = OpenAI(openai_api_key=self.openai_api_key, model_name=self.model_name)

    def predict(self, prompt: str, **kwargs) -> str:
        """
        Generate a response for the given prompt using the LLM.
        
        This method delegates to the Langchain LLM instance, automatically initializing it if needed.
        """
        if self._llm is None:
            self.init_llm()
        return self._llm(prompt, **kwargs)

In [None]:

a = File("erik.hodges/yes/filename.csv")

a.path

In [None]:


class Agent(BaseModel):
    """
    Represents an Agent that executes a Task using a Model.
    
    In an Observer pattern, each Agent acts as an observer that receives notifications from the Team.
    """
    name: str
    task: Task
    model: Model
    team: Optional["Team"] = None  # Forward reference to Team

    def perform_task(self, input_data: Optional[Dict[str, Any]] = None) -> str:
        """
        Executes the Agent's Task using its Model.
        
        The prompt is generated by the Task, the response is produced by the Model, and the result is logged by the Team.
        """
        prompt = self.task.execute(input_data)
        response = self.model.predict(prompt)
        if self.team:
            self.team.log_action(self.name, "performed_task", response)
        return response

    def set_team(self, team: "Team") -> None:
        """
        Set the Team (subject) for the Agent (observer).
        """
        self.team = team

    def update(self, event: Dict[str, str]) -> None:
        """
        Observer callback method invoked by the Team when an event occurs.
        
        In a more complex implementation, an Agent could analyze events to update its internal state or trigger actions.
        """
        print(f"Agent '{self.name}' received update: {event}")


class Team(BaseModel):
    """
    Represents a Team of Agents working toward a shared Goal.
    
    This class follows the Observer design pattern:
      - **Subject:** The Team maintains a list of Agents (observers).
      - **Observers:** Each Agent registers with the Team and is notified of events.
      
    **Langgraph Integration:**  
      The Team can be interpreted as a graph, where each Agent is a node.  
      Communication events (such as broadcasts) can be represented as edges between nodes.
      The `to_graph` method returns a simple graph representation compatible with the Langgraph library.
    """
    name: str
    goal: str
    agents: List[Agent] = Field(default_factory=list)
    log: List[Dict[str, str]] = Field(default_factory=list)

    def register_agent(self, agent: Agent) -> None:
        """
        Registers an Agent with the Team, logs the registration, and notifies all observers.
        """
        agent.set_team(self)
        self.agents.append(agent)
        self.log_action(agent.name, "registered", f"Agent '{agent.name}' registered to team '{self.name}'.")
        self.notify_observers({"event": "agent_registered", "agent": agent.name})

    def unregister_agent(self, agent: Agent) -> None:
        """
        Unregisters an Agent from the Team, logs the unregistration, and notifies observers.
        """
        self.agents = [a for a in self.agents if a.name != agent.name]
        self.log_action(agent.name, "unregistered", f"Agent '{agent.name}' unregistered from team '{self.name}'.")
        self.notify_observers({"event": "agent_unregistered", "agent": agent.name})

    def log_action(self, agent: str, action: str, response: str) -> None:
        """
        Logs an action performed by an Agent and notifies observers of the event.
        """
        event = {"agent": agent, "action": action, "response": response}
        self.log.append(event)
        self.notify_observers(event)

    def broadcast(self, sender: Agent, message: str) -> None:
        """
        Broadcasts a message from one Agent to all others in the Team.
        """
        self.log_action(sender.name, "broadcast", message)
        event = {"event": "broadcast", "sender": sender.name, "message": message}
        self.notify_observers(event)

    def notify_observers(self, event: Dict[str, str]) -> None:
        """
        Notifies all registered Agents (observers) of a given event.
        """
        for agent in self.agents:
            agent.update(event)

    def to_graph(self) -> Dict[str, Any]:
        """
        Returns a graph representation of the Team.
        
        - **Nodes:** The names of all registered Agents.
        - **Edges:** Derived from broadcast events in the log; for example, a broadcast from one agent to all others.
        
        This representation is designed to be used with the Langgraph library for visualization and further orchestration.
        """
        nodes = [agent.name for agent in self.agents]
        edges = []
        for entry in self.log:
            if entry.get("action") == "broadcast":
                sender = entry.get("agent")
                for node in nodes:
                    if node != sender:
                        edges.append((sender, node))
        return {"nodes": nodes, "edges": edges}

    def get_log(self) -> List[Dict[str, str]]:
        """
        Returns the log of all actions/events in the Team.
        """
        return self.log


# Resolve forward references between Agent and Team.
Agent.update_forward_refs()
Team.update_forward_refs()


# ==============================
# Example usage (for testing)
# ==============================
if __name__ == "__main__":
    # Create a team with a shared goal.
    team = Team(name="Beta", goal="Collaborative problem solving")
    
    # Create a Model instance using the latest OpenAI API via Langchain.
    model = Model(
        name="OpenAI-GPT",
        version="4.0",
        openai_api_key="your-api-key",  # Replace with your actual API key.
        model_name="gpt-4"
    )
    
    # Create two Tasks.
    task1 = Task(
        name="Data Analysis",
        description="Analyze the provided dataset",
        parameters={"data": "dataset.csv"}
    )
    task2 = Task(
        name="Report Generation",
        description="Generate a report from analysis",
        parameters={"template": "summary"}
    )
    
    # Create two Agents, each with a different Task.
    agent1 = Agent(name="Agent 1", task=task1, model=model)
    agent2 = Agent(name="Agent 2", task=task2, model=model)
    
    # Register the Agents with the Team.
    team.register_agent(agent1)
    team.register_agent(agent2)
    
    # Agents perform their assigned tasks.
    print("Agent 1 task result:")
    print(agent1.perform_task())
    print("\nAgent 2 task result:")
    print(agent2.perform_task())
    
    # One Agent broadcasts a message to the entire team.
    team.broadcast(sender=agent1, message="Data analysis complete. Proceeding to report generation.")
    
    # Display the graph representation of the Team.
    print("\nTeam Graph Representation:")
    print(team.to_graph())
    
    # Print the Team's log of actions/events.
    print("\nTeam Log:")
    for log_entry in team.get_log():
        print(log_entry)
