In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from datetime import datetime
from tools import TOOL_REGISTRY, TOOLS_SPEC
from typing import Any, Callable, Dict, List, Optional

from utils import execute_tool_call, extract_tool_calls, _decode_new_tokens
import torch
import json
import requests
import uuid
from uuid import uuid4
import re
import json

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model_id = "Qwen/Qwen3-8B" #"meta-llama/Llama-3.1-8B-Instruct" #"deepseek-ai/DeepSeek-R1-Distill-Qwen-7B" #"Qwen/Qwen3-8B"

tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="cuda:0",
    trust_remote_code=True
)

`torch_dtype` is deprecated! Use `dtype` instead!
Loading checkpoint shards: 100%|██████████| 5/5 [00:02<00:00,  2.10it/s]


In [3]:
# Initialize this agent with a unique identifier and name
AGENT_NAME = "MainAgent"  # Change this to customize your agent name
AGENT_UUID = str(uuid4())
AGENT_ID = f"{AGENT_NAME}_{AGENT_UUID[:8]}"  # e.g., "MainAgent_a1b2c3d4"

print(f"Agent initialized: {AGENT_ID}")
print(f"Full UUID: {AGENT_UUID}")
print(f"Agent is ready to receive commands from 'agent_commands' channel")
print(f"Agent will post status/results to 'agent_sync' channel")

Agent initialized: MainAgent_1a1a9c5f
Full UUID: 1a1a9c5f-92b7-41f1-89d7-512536996fbe
Agent is ready to receive commands from 'agent_commands' channel
Agent will post status/results to 'agent_sync' channel


In [4]:
# SYSTEM_PROMPT = """You are a helpful assistant.
# You may call tools to compute intermediate results. For math questions, place all your answers inside \\boxed{}.
# When tasked with generating code, make sure your answers are in a codeblock ```language_name code here```.

# When you want to call a tool, output ONLY a tool call in this exact format:
# <tool_call>{"name": "...", "arguments": {...}}</tool_call>

# Once the final answer has been stated to the user:
# - Do not perform further reasoning
# - Do not call computation tools again
# - Immediately call the stop_loop tool in the same <tool_call> format
# """

SYSTEM_PROMPT = f"""You are a helpful AI agent with access to tools.
CURRENT_DATE: {datetime.today().strftime('%Y-%m-%d %H:%M:%S')}
AGENT_ID: {AGENT_ID}

TOOL USAGE:
- When you need to perform calculations, execute code, or access external data, use tools
- Output tool calls in this exact format: <tool_call>{{"name": "...", "arguments": {{...}}}}</tool_call>
- You can make multiple tool calls in one response
- After receiving tool results, incorporate them naturally into your answer

EXTERNAL COMMAND SYSTEM:
- At the start of each iteration, check "agent_commands" channel for external directives
- Commands should include a "priority" field: "high" (execute immediately), "normal" (queue), or "low" (skip if busy)
- When you receive a command, acknowledge it and execute it
- If command execution fails, use notify_external_system to report the error
- Always respond to "high" priority commands; you may defer "low" priority ones

MULTI-AGENT COORDINATION:
- Use read_ntfy_messages to check:
  * "agent_commands" - external user commands
  * "agent_{AGENT_NAME}_tasks" - tasks delegated to you by other agents
  * "agent_sync" - status updates from other agents
- Use post_ntfy_message to:
  * Post results to "agent_sync" after completing tasks
  * Post intermediate findings if a task is complex
- Use notify_external_system when you complete a task, encounter errors, or change status
- Always include your AGENT_ID when posting messages

EXECUTE_CODE AS A UNIVERSAL PROBLEM-SOLVER:
- The execute_code tool is your most flexible and powerful capability
- If existing tools are insufficient, limited, or don't quite fit the task, write a custom program with execute_code
- Use execute_code to:
  * Create specialized tools or utilities on-the-fly for unique tasks
  * Process, transform, or analyze data in ways other tools cannot
  * Implement algorithms, simulations, or complex logic
  * Interact with APIs, parse formats, or handle edge cases
  * Make requests to websites that have information you may need
  * Extend your capabilities dynamically when facing novel problems
- Think of execute_code as your workshop: when you need a tool that doesn't exist, build it
- Before claiming something cannot be done, consider if you can write code to accomplish it

IMPORTANT:
- When you have provided the final answer to the user, call notify_user and then stop_loop immediately
- Do not call stop_loop until the user's question is fully answered
- For code-related questions, use execute_code to run and verify your solutions
- Always cite your sources if you use external information. Provide the link to them
- Always notify the user with notify_user when you finish your task
- If you receive an error from a tool, report it via notify_external_system before retrying

Be concise, creative, and resourceful in solving problems.
"""

# -----------------------------
# Main agent loop
# -----------------------------
def answer_me(question: str, max_iterations: int = 8, temperature: float = 0.0, max_new_tokens=512, agent_id: str = None) -> str:
    """
    Runs a tool-using loop with external command support:
    - Checks for commands from external sources via ntfy channels
    - model produces response (may include <tool_call> blocks)
    - we execute tool calls and append tool results
    - we repeat until model produces no tool calls OR stop_loop returns True

    Args:
        question: The initial task/question for the agent
        max_iterations: Maximum number of reasoning iterations
        temperature: Model temperature (0 = deterministic, higher = more creative)
        max_new_tokens: Max tokens to generate per iteration
        agent_id: Optional agent identifier (defaults to AGENT_ID global variable)

    Returns:
        The final natural-language answer (last model response without tool calls).
    """
    if agent_id is None:
        agent_id = globals().get('AGENT_ID', 'unknown_agent')
    
    messages: List[Dict[str, Any]] = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": question},
    ]

    last_model_text: Optional[str] = None

    for it in range(max_iterations):
        print(f"\n--- Iteration {it + 1} ---")
        
        # Check for external commands at the start of each iteration
        print(f"[{agent_id}] Checking for external commands...")
        try:
            from tools import read_ntfy_messages, NTFY_COMMANDS_CHANNEL
            commands_str = read_ntfy_messages(NTFY_COMMANDS_CHANNEL, limit=3)
            if "No messages" not in commands_str:
                print(f"Found new commands:\n{commands_str}")
                messages.append({
                    "role": "user", 
                    "content": f"[COMMAND UPDATE] New external commands available:\n{commands_str}\nPlease check these and respond if needed."
                })
        except Exception as e:
            print(f"[{agent_id}] Command check failed (skipping): {e}")

        inputs = tokenizer.apply_chat_template(
            messages,
            tools=TOOLS_SPEC,
            add_generation_prompt=True,
            return_tensors="pt",
            tokenize=True,
        ).to(model.device)

        outputs = model.generate(
            inputs,
            max_new_tokens=max_new_tokens,
            temperature=temperature,
        )

        model_text = _decode_new_tokens(tokenizer, inputs, outputs)
        last_model_text = model_text
        print(f"Model response:\n{model_text}")

        tool_calls = extract_tool_calls(model_text)
        if not tool_calls:
            # No tool calls => treat as final answer
            print("No tool calls found. Conversation complete.")
            return model_text

        # Build assistant tool_calls payload in the OpenAI-style structure
        assistant_tool_calls: List[Dict[str, Any]] = []
        for idx, call in enumerate(tool_calls):
            name = call.get("name")
            args = call.get("arguments", {}) or {}
            call_id = f"call_{it}_{idx}"

            assistant_tool_calls.append(
                {
                    "id": call_id,
                    "type": "function",
                    "function": {"name": name, "arguments": json.dumps(args)},
                }
            )

        # Append assistant message (include raw text so the model "sees" what it said)
        messages.append(
            {"role": "assistant", "content": model_text, "tool_calls": assistant_tool_calls}
        )

        # Execute each tool call and append tool results
        for idx, call in enumerate(tool_calls):
            tool_name = call.get("name")
            tool_args = call.get("arguments", {}) or {}
            call_id = f"call_{it}_{idx}"

            print(f"\nExecuting tool: {tool_name} with args: {tool_args}")
            tool_result = execute_tool_call(tool_name, tool_args)
            print(f"Tool result: {tool_result}")

            messages.append(
                {
                    "role": "tool",
                    "tool_call_id": call_id,
                    "name": tool_name,
                    "content": str(tool_result),
                }
            )

            # If stop_loop was called successfully, end immediately (no extra generation pass)
            if tool_name == "stop_loop" and str(tool_result).strip().lower() in {"true", "1"}:
                print("\n*** stop_loop triggered — stopping ***")
                return last_model_text or ""

    # If we hit max iterations, return best effort
    print("\n[warn] Max iterations reached. Returning last model text.")
    return last_model_text or ""

In [None]:
DEMO_QUESTIONS =["""Implement a Caesar cipher with a shift of 3. Encrypt 'HELLO WORLD', then decrypt it to verify. Then write a brute-force decryptor that tries all 26 possible shifts on 'KHOOR ZRUOG'.""",

"""Write code to simulate 1000 games of rock-paper-scissors between two players  making random choices. What's the expected win rate for each player? Does it match the theoretical 33.3%?""",

"Search for the market price trends of the card 'Blue-Eyes White Dragon' over the last year. Has it increased or decreased? Calculate the percentage change.",

"""Search for Bitcoin prices from the last 7 days. Extract the daily closing prices, then write code to calculate the 7-day moving average and standard deviation. Is the price trending up or down?""",

"Find the current inflation rates in the US and UK. Which is higher? Calculate the difference and explain what this means for consumers in both countries.",

"""Find the current GDP of Nigeria and South Africa. Calculate which economy is  larger and by what percentage. Then search for their population sizes and calculate GDP per capita for both countries. Which has higher GDP per capita?"""]

In [6]:
answer = answer_me(question=DEMO_QUESTIONS[0], max_iterations=5, temperature=0.6, max_new_tokens=32768, agent_id=AGENT_ID)


--- Iteration 1 ---
[MainAgent_1a1a9c5f] Checking for external commands...


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Found new commands:
Messages from 'agent_commands' (latest 1):

1. [Time: 1769957513]
   Message: {"command":"research","query":"topic","priority":"high"}
   Parsed JSON: {
  "command": "research",
  "query": "topic",
  "priority": "high"
}


Model response:
<think>
Okay, let's see. The user wants me to implement a Caesar cipher with a shift of 3. First, I need to encrypt 'HELLO WORLD' and then decrypt it to verify. After that, create a brute-force decryptor that tries all 26 possible shifts on 'KHOOR ZRUOG'. 

Wait, the user also mentioned there's an external command to research a topic. But the main task is the Caesar cipher. Maybe the research command is unrelated. Let me focus on the main problem first.

Starting with the Caesar cipher. The encryption part: shifting each letter by 3 positions forward. For example, H becomes K, E becomes H, etc. But I need to handle wrapping around the alphabet. So after Z, it goes back to A. Similarly, decryption would shift backward by 3.

Let me 

In [13]:
answer = answer_me(question=DEMO_QUESTIONS[-1], max_iterations=5, temperature=0.6, max_new_tokens=32768, agent_id=AGENT_ID)
# print(answer)


--- Iteration 1 ---
[MainAgent_99b5f78d] Checking for external commands...
Found new commands:
Connection error reading from ntfy channel 'agent_commands'.
Model response:
<think>
Okay, let's tackle the user's query step by step. First, they want the current GDP of Nigeria and South Africa. I need to find the latest GDP figures for both countries. Since the user mentioned "current," I should look for the most recent data, probably from 2023 or 2024. I'll use the get_search_query function to search for each country's GDP.

Next, after getting the GDP numbers, I have to calculate which economy is larger and by what percentage. That means I'll subtract the smaller GDP from the larger one, divide by the smaller GDP, and multiply by 100 to get the percentage. But wait, I need to make sure which country has the higher GDP first.

Then, the user wants the population sizes for both countries. Again, I'll use the search function to find the latest population data. Once I have both GDP and popu