<a href="https://colab.research.google.com/github/Kallenhard1/3d-game-shaders-for-beginners/blob/master/multimodal_task_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade langchain langchain-community langchain-openai langgraph cdp_langchain dotenv

import os
import json
from dotenv import load_dotenv
import requests
import subprocess
from typing import Dict, Any, List
import openai
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain.memory import ConversationBufferMemory
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain.tools import tool
from langgraph.types import Command, interrupt
from cdp_langchain.utils import CdpAgentkitWrapper
from cdp_langchain.agent_toolkits import CdpToolkit
from langchain_core.tools import Tool
from typing import Literal, Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver

"""
Sources
python.langchain.com/docs/tutorials/chatbot/
js.langchain.com/docs/tutorials/chatbot/
medium.com/@tahreemrasul/how-to-build-your-own-chatbot-with-langchain-and-openai-f092822b6ba6
blog.jetbrains.com/pycharm/2024/08/how-to-build-chatbots-with-langchain/
"""

# Load environment variables
load_dotenv(override=True)
api_key = os.getenv("OPENAI_API_KEY")
# Define your Obsidian vault path; optionally, set this in your .env file:
obsidian_vault_path = os.getenv("OBSIDIAN_VAULT_PATH", "./tasks")

# Update the members list to include the new agents.
members = [
    'task_manager_agent',
    'data_agent',
    "human"
]
options = members + ["FINISH"]

SUPERVISOR_PROMPT = f"""
  You are the Supervisor responsible for managing user interactions and coordinating tasks between the following specialized agents: {members}.
  Your primary objective is to efficiently handle the user's requests to manage their tasks by delegating work to the appropriate agent. Based on the conversation context, decide which agent should execute the next step and craft a concise, clear instruction for that agent. Instruct agents to only use the specific tools they need to complete their task. Return your answer as a JSON object with two keys: "next_agent" (which must be one of {options}) and "instruction" (a short message in Markdown format for the chosen agent that contains only the necessary information).

  ## Agent Roles:
  **task_mannager_agent**
  - **Objective:** Edits or updates the user's task details (e.g., task ID, task name, description, date).
  - **Tools:** add_task_to_obsidian (used to add or update a task in the Obsidian Markdown file).
  - **Output:** Returns a confirmation and the updated task details.

  **data_catalog_agent**
  - **Objective:** Retrieves the user's task data.
  - **Tools:** fetch_data (retrieves task data from the designated source), get_task_details (fetches detailed information on a task).
  - **Output:** Returns structured data containing task details; if no data is found, returns an empty list.

  ### Instructions:
  1. **Routing Based on User Intent:**
    - If the user asks to view their tasks (e.g., "Show my tasks for today"), route to **data_catalog_agent**.
    - If the user requests to add or modify a task (e.g., "Add a new task: 'Buy groceries at 6 PM'"), route to **task_mannager_agent**.

  2. **Using Gathered Information:**
    - Only pass relevant details from previous outputs to inform the next step.
    - Do not include unnecessary data in your instructions.

  3. **Handling Human Interaction:**
    - The **human node is NOT an agent**; it is used only when additional user input is needed.
    - When further clarification or confirmation is required (e.g., selecting which task to update), address the user directly with a clear explanation and request for input in Markdown format.
    - If all necessary data has been retrieved and the request is fully satisfied, provide the final response with `FINISH` in Markdown format without routing to the human node.

  4. **Finalizing the Task:**
    - Once the user's request has been completely handled, return `FINISH`.

  ## Formatting Your Response:
  Return a structured JSON object with:
  - `"next_agent"`: The next agent to execute the task.
  - `"instruction"`: A precise command for the agent, in Markdown format, based on the gathered context.

  ## Example Outputs:
  Example 1: Listing Tasks
  **User Input:** "Show me my tasks for today."
  **Supervisor Output:** "next_agent": "data_catalog_agent", "instruction": "Retrieve all tasks scheduled for today. Return task details in JSON format."

  Example 2: Adding a New Task
  **User Input:** "Add a new task: Buy groceries at 6 PM."
  **Supervisor Output:** "next_agent": "task_mannager_agent", "instruction": "Add a new task with the details: 'Buy groceries at 6 PM'. Return confirmation and task ID."

  Example 3: Requesting Human Input for Clarification
  **User Input:** "I need to update one of my tasks, but I'm not sure which one."
  **Supervisor Output:** "next_agent": "human", "instruction": "Here are your current tasks: [list tasks]. Please specify which task you want to update and what changes should be made."

  Example 4: Finalizing the Request
  **User Input:** "All my tasks for today are complete."
  **Supervisor Output:** "next_agent": "FINISH", "instruction": "Confirm that all tasks for today have been marked as complete. No further action is required."

"""

# Define output format for Supervisor decisions
class Router(TypedDict):
    """Decides the next agent to call."""
    next: Literal[*options]

# Define Supervisor Node Logic
class State(MessagesState):
    next: str
    next_agent: Optional[str]

class SupervisorOutput(TypedDict):
    next_agent: Literal["task_manager_agent", "data_agent", "human", "FINISH"]
    instruction: str

class AgentOutput(TypedDict):
    message: str
    data: dict

class ChatOllama:
    def __init__(self, model: str = "gpt-4o-mini", extra_params: Optional[Dict[str, Any]] = None):
        self.model = model
        self.extra_params = extra_params or {}

    def predict(self, prompt: str) -> str:
        """
        Call the Ollama CLI with the provided prompt.
        Ensure that the 'ollama' CLI is installed and the model is available.
        """
        # Construct the command. You can also add extra parameters if needed.
        command = ["ollama", "run", self.model, prompt]
        result = subprocess.run(command, capture_output=True, text=True)
        if result.returncode != 0:
            raise Exception(f"Ollama call failed: {result.stderr}")
        return result.stdout.strip()

llm = ChatOllama(model="llama3.2")

# AI Agent for general command processing
class AIAgent:
    def __init__(self, api_key):
        self.llm = ChatOpenAI(openai_api_key=api_key)

    def process_command(self, command):
        """Processes user input and generates a response."""
        prompt = f"User Command: {command}\nAI Action:"
        response = self.llm.predict(prompt)
        return response

# -----------------------------
# Supervisor Node Logic
# -----------------------------

def supervisor_node(state: State) -> Command[Literal[*members, "__end__"]]:
    """Determines the next agent based on system logic."""
    messages = [{"role": "system", "content": SUPERVISOR_PROMPT}] + state["messages"]
    response = llm.with_structured_output(Router).invoke(messages)
    next_agent = response["next"]
    return Command(goto=next_agent if next_agent != "FINISH" else END, update={"next": next_agent})

def human_node(state: MessagesState, config=None) -> Command[Literal["supervisor"]]:
    """
    Collects user input when an agent requests further human input.
    Uses an interrupt to pause and await user input.
    """
    user_input = interrupt(value="Awaiting your input to continue: ")
    return Command(
        update={"messages": state["messages"] + [HumanMessage(content=user_input, name="human")]},
        goto="supervisor"
    )

#-------------------------------#
#            Tools              #
#-------------------------------#

def save_graph(graph):
    """Saves the graph as a PNG image."""
    graph_viz = graph.get_graph().draw_mermaid_png()

    # Save as an image file
    with open("/app/outputs/agent_graph.png", "wb") as f:
        f.write(graph_viz)

    print("✅ Graph saved as 'agent_graph.png'. Open it to view.")

def get_cdp_agent_tools(tool_names: List[str]) -> List[Tool]:
    """
    Returns a list of tools created from the CDP AgentKit wrapper.

    Args:
        tool_names: names of tools to retrieve.

    Returns:
        List[Tool]: A list of tools created from the CDP AgentKit wrapper.
    """


    # load the notes data from the text file
    if not os.path.exists("/app/resources/notes_data.txt"):
        with open("/app/resources/notes_data.txt", "w") as f:
            f.write("")
            print("✅ notes_data.txt created.")

    with open("/app/resources/notes_data.txt", "r") as f:
        tasks_data = f.read()
        values = {"cdp_tasks_data": tasks_data}

    # Initialize CDP AgentKit wrapper
    cdp = CdpAgentkitWrapper(**values)

    # Create toolkit from wrapper
    cdp_toolkit = CdpToolkit.from_cdp_agentkit_wrapper(cdp)

    # Get all available tools
    return  [tool for tool in cdp_toolkit.get_tools() if tool.name in tool_names]

@tool
def extract_json(messages: List[Any]) -> List[Dict[str, Any]]:

    """
    Extracts dataset details (id, vault_address, price, currency) from agent messages.

    Args:
        messages (List[Any]): Messages received from the catalog agent.

    Returns:
        List[Dict[str, Any]]: List of dataset objects with id, vault_address, price, and currency.
    """

    dataset_details = []

    for message in messages:
        if isinstance(message, str):
            content = message.strip()

            # Remove markdown-style code blocks if present
            if content.startswith("```json"):
                content = content[7:]  # Remove opening ```json
            if content.endswith("```"):
                content = content[:-3]  # Remove closing ```

            try:
                # Parse the JSON content
                catalog_results = json.loads(content)

                # Append all dataset fields to dataset_details
                dataset_details.extend([
                    {
                        "id": entry["id"],
                        "task_name": entry["task_name"],
                        "content": entry["content"],
                        "date": entry["date"]
                    }
                    for entry in catalog_results
                ])

            except json.JSONDecodeError:
                continue  # Skip malformed messages

    return dataset_details

@tool
def add_task_to_obsidian(task_name: str, details: str) -> dict:
    """
    Add a task to the Obsidian vault.

    Args:
    task_name (str): Name of the task.
    details (str): Details of the task.

    Returns:
        Parsed JSON response containing task id, name and success with True value or an error message.
    """
    try:
        file_path = os.path.join(obsidian_vault_path, "tasks.md")
        task_entry = f"- [ ] {task_name}: {details}\n"
        with open(file_path, "a", encoding="utf-8") as file:
            file.write(task_entry)
        print(f"Task '{task_name}' added to Obsidian!")
        response = {"success": True, "message": f"Task '{task_name}' added to Obsidian!"}
        return response.json()  # Return the list of data products

    except requests.exceptions.RequestException as e:
        return [{"error": f"Failed to add task to obsidian: {str(e)}"}]


#-------------------------------#
#        Task Manager Agent     #
#-------------------------------#
task_manager_agent = create_react_agent(
    llm,
    tools= get_cdp_agent_tools(["add_task_to_obsidian"]),
    response_format=AgentOutput,
    prompt=""" You are the Task Manager Agent. Your job is to retrieve the user's tasks details.

        you have the following tools:
        - Use the 'get_task_details' tool to retrieve the user's task_id, task_name, content and date.

        Return your answer as a JSON object with two keys: 'message': answer to the query and data: dictionary of data produced.
        """

)

def task_manager_node(state: MessagesState, config=None) -> Command[Literal["supervisor"]]:
    """
    Simulate editing a task. For demonstration, assume the last human message contains
    the task to update. In a real implementation, you would parse the task details,
    perform validations, update a database or file, and optionally call Obsidian integration.
    """
    if state["messages"]:
        last_msg = state["messages"][-1]
        edited_task = f"{last_msg.content} (edited)"
    else:
        edited_task = "No task provided."
    message = f"Task updated: {edited_task}"
    # Optionally, update the Obsidian vault:
    add_task_to_obsidian("Updated Task", edited_task)
    return Command(
        update={"messages": state["messages"] + [AIMessage(content=message, name="edit_task_agent")]},
        goto="supervisor"
    )

# -----------------------------
# Code Execution (if needed)
# -----------------------------
class CodeExecutor:
    @staticmethod
    def execute_code(code):
        """Executes generated Python code securely."""
        import subprocess
        try:
            output = subprocess.run(["python3", "-c", code], capture_output=True, text=True)
            return output.stdout if output.stdout else output.stderr
        except Exception as e:
            return str(e)

#graph
builder = StateGraph(State)
builder.add_edge(START,"supervisor")
builder.add_node("supervisor", supervisor_node)
builder.add_node("task_manager_agent",task_manager_node)
# builder.add_node("data_agent", data_node)
# builder.add_node("fetch_data_agent", fetch_data_node)
builder.add_node("human", human_node)

# Compile the graph
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

#plot graph
save_graph(graph)

# -----------------------------
# Chatbot Interface
# -----------------------------
def chatbot():
    memory = ConversationBufferMemory(return_messages=True)
    # Allowed agent nodes for processing:
    allowed_nodes = {"task_manager_agent", "edit_task_agent", "human"}

    print("Chatbot is ready. Type your message (or 'exit' to quit).")
    while True:
        user_input = input("User: ")
        if user_input.lower() in ["exit", "quit"]:
            break

        # Load conversation history from memory.
        state = {"messages": memory.load_memory_variables({}).get("history", [])}
        # Append the new user message.
        state["messages"].append(HumanMessage(content=user_input, name="user"))

        # Thread config required for the graph (if needed)
        thread_config = {"configurable": {"thread_id": "1"}}

        # Optionally, process the command with a simple AI agent
        ai_agent = AIAgent(api_key=api_key)
        ai_response = ai_agent.process_command(user_input)
        print("AI Response:", ai_response)

        # Execute generated code if necessary.
        generated_code = "print('Hello from AI!')"
        execution_result = CodeExecutor.execute_code(generated_code)
        print("Execution Result:", execution_result)

        # Invoke the LangGraph workflow with the current state.
        for update in graph.stream(
            state,
            config=thread_config,
            stream_mode="updates",
            subgraphs=True,
        ):
            print(update)
            print("----\n")

# -----------------------------
# Example Usage
# -----------------------------
if __name__ == "__main__":
    chatbot()


Collecting langchain-openai
  Using cached langchain_openai-0.3.8-py3-none-any.whl.metadata (2.3 kB)
Collecting langgraph
  Using cached langgraph-0.3.11-py3-none-any.whl.metadata (7.5 kB)
Collecting langgraph-prebuilt<0.2,>=0.1.1 (from langgraph)
  Using cached langgraph_prebuilt-0.1.3-py3-none-any.whl.metadata (5.0 kB)
✅ notes_data.txt created.


ValidationError: 1 validation error for CdpAgentkitWrapper
  Value error, Did not find cdp_api_key_name, please add an environment variable `CDP_API_KEY_NAME` which contains it, or pass `cdp_api_key_name` as a named parameter. [type=value_error, input_value={'cdp_tasks_data': ''}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/value_error

In [None]:
import numpy as np
from math import exp

def sigmoid(x):
  return 1 / (1 + exp(-x))

class Neuron:
  def __init__(self, weights, bias):
    self.weights = weights
    self.bias = bias

  def feed_foward(self, inputs):
    total = np.dot(self.weights, inputs) + self.bias
    return sigmoid(total);

weights = np.array([0, 1])
bias = 4
n = Neuron(weights, bias)
x = np.array([2, 3])
print(n.feed_foward(x))

0.9990889488055994
