# AGiXT Python SDK Tests

## Register a user

In [None]:
import random
import string
import time
import openai
from agixtsdk import AGiXTSDK
import requests
import os
import re


def display_content(content):
    outputs_url = f"http://localhost:7437/outputs/"
    os.makedirs("outputs", exist_ok=True)
    try:
        from IPython.display import Audio, display, Image, Video
    except:
        print(content)
        return
    if "http://localhost:8091/outputs/" in content:
        if outputs_url != "http://localhost:8091/outputs/":
            content = content.replace("http://localhost:8091/outputs/", outputs_url)
    if outputs_url in content:
        urls = re.findall(f"{re.escape(outputs_url)}[^\"' ]+", content)
        urls = urls[0].split("\n\n")
        for url in urls:
            file_name = url.split("/")[-1]
            url = f"{outputs_url}{file_name}"
            data = requests.get(url, headers=agixt.headers).content
            if url.endswith(".jpg") or url.endswith(".png"):
                content = content.replace(url, "")
                display(Image(url=url))
            elif url.endswith(".mp4"):
                content = content.replace(url, "")
                display(Video(url=url, autoplay=True))
            elif url.endswith(".wav"):
                content = content.replace(url, "")
                display(Audio(url=url, autoplay=True))
    print(content)


failures = 0
random_string = "".join(
    random.choices(string.ascii_uppercase + string.digits, k=10)
).lower()
test_email = f"{random_string}@test.com"
agixt = AGiXTSDK(base_uri="http://localhost:7437", verbose=True)

while failures < 100:
    try:
        otp_uri = agixt.register_user(
            email=test_email, first_name="Test", last_name="User"
        )
        openai.base_url = "http://localhost:7437/v1/"
        openai.api_key = agixt.headers["Authorization"]
        openai.api_type = "openai"
        break
    except Exception as e:
        print(e)
        failures += 1
        time.sleep(5)

# Show QR code for MFA setup
import qrcode
from IPython.display import Image

qr = qrcode.QRCode()
qr.add_data(otp_uri)
qr.make(fit=True)
img = qr.make_image(fill="black", back_color="white")
img.save("qr.png")
Image(filename="qr.png")

## Confirm user exists


In [None]:
user_exists = agixt.user_exists(email=test_email)

## Update User's Name

In [None]:
update_user = agixt.update_user(first_name="Super", last_name="Man")

## Get User Details

In [None]:
user_details = agixt.get_user()


## Get a list of Providers

This will get a list of AI Providers available to use with AGiXT.


In [None]:
providers = agixt.get_providers()

## Get a list of Providers for a services

- Service options are `llm`, `tts`, `image`, `embeddings`, `transcription`, and `translation`.


In [None]:
services = agixt.get_providers_by_service(service="tts")

## Get Default Provider Settings

Choose a provider from the list of AI providers and get the default settings for that provider.


In [None]:
provider_name = "ezlocalai"
provider_settings = agixt.get_provider_settings(provider_name=provider_name)

## Get Embedding Providers

Embedding providers are used to embed information to vectors to store in the vector database to be searched for context injection.


In [None]:
embed_providers = agixt.get_embed_providers()

## Get Extension Settings

This is where we get all third party extension settings for the agent with defaults to fill in when there is nothing entered on the front end.


In [None]:
ext_settings_resp = agixt.get_extension_settings()

## Webhook Tests

This section tests the webhook system endpoints including incoming and outgoing webhooks using the AGiXT SDK.

In [None]:
# Test creating an outgoing webhook using requests
from datetime import datetime
import json

outgoing_webhook_data = {
    "name": "Test Webhook",
    "description": f"Test webhook created at {datetime.now().isoformat()}",
    "target_url": "https://webhook.site/test",
    "event_types": ["agent.created", "agent.deleted"],
    "active": True,
    "headers": {"Content-Type": "application/json"},
    "secret": "test-secret-123",
}

# Using requests to create webhook
response = requests.post(
    "http://localhost:7437/api/webhooks/outgoing",
    json=outgoing_webhook_data,
    headers=agixt.headers,
)
print(f"Created outgoing webhook: {response.status_code} - {response.json()}")
created_webhook = response.json() if response.status_code == 200 else None

In [None]:
# Test listing outgoing webhooks using requests
response = requests.get(
    "http://localhost:7437/api/webhooks/outgoing",
    headers=agixt.headers,
)
print(f"Get webhooks response: {response.status_code}")
if response.status_code == 200:
    webhooks = response.json()
    print(f"Found {len(webhooks)} outgoing webhooks")
    for webhook in webhooks:
        print(
            f"  - {webhook.get('name', 'Unnamed')}: {webhook.get('target_url', 'No URL')}"
        )
else:
    print(f"Error getting webhooks: {response.text}")
    webhooks = []

In [None]:
# Test creating an incoming webhook for an agent using requests
agent_name = "new_agent"
incoming_webhook_data = {
    "name": "GitHub Webhook",
    "description": "Webhook for GitHub events",
    "agent_name": agent_name,
    "secret": "github-secret-123",
    "transform_template": json.dumps(
        {
            "action": "{{ action }}",
            "repository": "{{ repository.name }}",
            "sender": "{{ sender.login }}",
        }
    ),
}

response = requests.post(
    "http://localhost:7437/api/webhooks/incoming",
    json=incoming_webhook_data,
    headers=agixt.headers,
)
print(
    f"Created incoming webhook: {response.status_code} - {response.json() if response.status_code == 200 else response.text}"
)
incoming_webhook = response.json() if response.status_code == 200 else None

In [None]:
# Test incoming webhook with agent_id parameter (alternative format)
print("--- Testing incoming webhook with agent_id parameter ---")

# Get the user details which contains the agent_id
user_details = agixt.get_user()
agent_id = (
    user_details["companies"][0]["agents"][0]["id"]
    if user_details and user_details.get("companies")
    else None
)

if agent_id:
    print(f"Found user agent_id: {agent_id}")

    incoming_webhook_data_with_id = {
        "name": "Agent ID Test Webhook",
        "description": "Testing webhook creation with agent_id parameter",
        "agent_id": agent_id,  # Using agent_id instead of agent_name
        "secret": "agent-id-secret-456",
        "transform_template": json.dumps(
            {"event": "{{ event_type }}", "data": "{{ data }}"}
        ),
    }

    response = requests.post(
        "http://localhost:7437/api/webhooks/incoming",
        json=incoming_webhook_data_with_id,
        headers=agixt.headers,
    )

    print(f"Incoming webhook with agent_id: {response.status_code}")
    if response.status_code == 200:
        print("✅ Successfully created incoming webhook using agent_id parameter")
        agent_id_webhook = response.json()
        print(f"Webhook ID: {agent_id_webhook.get('id', 'Unknown')}")
    else:
        print(f"❌ Failed to create webhook with agent_id: {response.text}")
else:
    print("❌ Could not get agent_id from user details, skipping agent_id test")

print("--- Agent ID parameter test completed ---\n")

In [None]:
# Test webhook event emission
# This would trigger any configured webhooks for agent events
test_agent = "webhook_test_agent"

# Create an agent to trigger webhook events
agixt.add_agent(
    agent_name=test_agent,
    settings={
        "mode": "prompt",
        "prompt_category": "Default",
        "prompt_name": "Think About It",
        "persona": "",
    },
)
print(f"Created agent {test_agent}, webhook events should have been triggered")

In [None]:
# Test webhook logs and statistics using requests
# Get webhook statistics
stats_response = requests.get(
    "http://localhost:7437/api/webhooks/stats",
    headers=agixt.headers,
)
if stats_response.status_code == 200:
    webhook_stats = stats_response.json()
    print(f"Webhook statistics: {json.dumps(webhook_stats, indent=2)}")
else:
    print(f"Error getting webhook stats: {stats_response.text}")

# Get webhook logs
logs_response = requests.get(
    "http://localhost:7437/api/webhooks/logs?limit=10",
    headers=agixt.headers,
)
if logs_response.status_code == 200:
    webhook_logs = logs_response.json()
    print(f"Found {len(webhook_logs)} webhook log entries")
    if webhook_logs:
        print(f"Latest log: {webhook_logs[0]}")
else:
    print(f"Error getting webhook logs: {logs_response.text}")

In [None]:
# Additional webhook validation tests
print("=== Running additional webhook validation tests ===")

# Test 1: Verify webhook creation response structure
if created_webhook and isinstance(created_webhook, dict):
    required_fields = ["id", "name", "target_url", "event_types", "active"]
    missing_fields = [
        field for field in required_fields if field not in created_webhook
    ]
    if missing_fields:
        print(f"❌ Created webhook missing required fields: {missing_fields}")
    else:
        print("✅ Created webhook has all required fields")

    # Verify field types
    if "id" in created_webhook and not isinstance(created_webhook["id"], str):
        print(f"❌ Webhook ID should be string, got {type(created_webhook['id'])}")
    else:
        print("✅ Webhook ID is properly formatted as string")
else:
    print("❌ Webhook creation failed - cannot validate response structure")

# Test 2: Test webhook with different event types
print("\n--- Testing webhook with different event types ---")
event_test_data = {
    "name": "Event Test Webhook",
    "description": "Testing different event types",
    "target_url": "https://httpbin.org/post",
    "event_types": ["conversation.started", "conversation.ended", "memory.added"],
    "active": True,
}

event_response = requests.post(
    "http://localhost:7437/api/webhooks/outgoing",
    json=event_test_data,
    headers=agixt.headers,
)
print(f"Event types test webhook: {event_response.status_code}")
if event_response.status_code == 200:
    print("✅ Webhook with different event types created successfully")
    event_webhook = event_response.json()
    print(f"Created webhook with events: {event_webhook.get('event_types', [])}")
else:
    print(f"❌ Failed to create event webhook: {event_response.text}")

# Test 3: Incoming webhook validation
if incoming_webhook and isinstance(incoming_webhook, dict):
    incoming_required = ["id", "name", "webhook_id", "agent_id"]
    missing_incoming = [
        field for field in incoming_required if field not in incoming_webhook
    ]
    if missing_incoming:
        print(f"❌ Incoming webhook missing fields: {missing_incoming}")
    else:
        print("✅ Incoming webhook has all required fields")
else:
    print("❌ Incoming webhook creation failed - cannot validate")

print("=== Webhook validation tests completed ===\n")

In [None]:
# Test webhook stats and logs endpoint response structure
print("--- Validating webhook stats and logs response structure ---")

# Test webhook stats response structure
stats_response = requests.get(
    "http://localhost:7437/api/webhooks/stats",
    headers=agixt.headers,
)

if stats_response.status_code == 200:
    stats_data = stats_response.json()
    print("✅ Webhook stats endpoint accessible")

    # Validate expected fields in stats response
    expected_stats_fields = [
        "total_outgoing",
        "total_incoming",
        "active_outgoing",
        "active_incoming",
    ]
    missing_stats_fields = [
        field for field in expected_stats_fields if field not in stats_data
    ]

    if missing_stats_fields:
        print(f"⚠️ Stats response missing expected fields: {missing_stats_fields}")
        print(f"Available fields: {list(stats_data.keys())}")
    else:
        print("✅ Stats response has all expected fields")
        print(f"Stats summary: {stats_data}")
else:
    print(
        f"❌ Webhook stats endpoint failed: {stats_response.status_code} - {stats_response.text}"
    )

# Test webhook logs response structure
logs_response = requests.get(
    "http://localhost:7437/api/webhooks/logs?limit=5",
    headers=agixt.headers,
)

if logs_response.status_code == 200:
    logs_data = logs_response.json()
    print("✅ Webhook logs endpoint accessible")

    if isinstance(logs_data, list):
        print(f"✅ Logs returned as list with {len(logs_data)} entries")

        if logs_data:
            # Validate log entry structure
            log_entry = logs_data[0]
            expected_log_fields = ["id", "webhook_id", "direction", "timestamp"]
            missing_log_fields = [
                field for field in expected_log_fields if field not in log_entry
            ]

            if missing_log_fields:
                print(f"⚠️ Log entry missing expected fields: {missing_log_fields}")
                print(f"Available fields: {list(log_entry.keys())}")
            else:
                print("✅ Log entries have expected structure")
        else:
            print("ℹ️ No webhook logs found (this is expected for new installations)")
    else:
        print(f"❌ Logs response should be a list, got {type(logs_data)}")
else:
    print(
        f"❌ Webhook logs endpoint failed: {logs_response.status_code} - {logs_response.text}"
    )

print("--- Endpoint structure validation completed ---\n")

In [None]:
# Cleanup: Delete test webhooks and agent using requests
# Get all outgoing webhooks and delete test ones
response = requests.get(
    "http://localhost:7437/api/webhooks/outgoing",
    headers=agixt.headers,
)
if response.status_code == 200:
    webhooks = response.json()
    for webhook in webhooks:
        if webhook.get("name") == "Test Webhook":
            delete_response = requests.delete(
                f"http://localhost:7437/api/webhooks/outgoing/{webhook['id']}",
                headers=agixt.headers,
            )
            if delete_response.status_code == 200:
                print(f"Deleted outgoing webhook: {webhook['name']}")
            else:
                print(f"Error deleting outgoing webhook: {delete_response.text}")

# Get all incoming webhooks and delete test ones
response = requests.get(
    "http://localhost:7437/api/webhooks/incoming",
    headers=agixt.headers,
)
if response.status_code == 200:
    webhooks = response.json()
    for webhook in webhooks:
        if webhook.get("name") == "GitHub Webhook":
            delete_response = requests.delete(
                f"http://localhost:7437/api/webhooks/incoming/{webhook['id']}",
                headers=agixt.headers,
            )
            if delete_response.status_code == 200:
                print(f"Deleted incoming webhook: {webhook['name']}")
            else:
                print(f"Error deleting incoming webhook: {delete_response.text}")

# Delete test agent
agixt.delete_agent(agent_name=test_agent)
print(f"Deleted test agent: {test_agent}")

In [None]:
# Test updating an outgoing webhook using requests
# First get the webhook ID from the created webhook or find an existing one
webhook_id = None

# Try to use the webhook we just created
if created_webhook and isinstance(created_webhook, dict) and "id" in created_webhook:
    webhook_id = created_webhook["id"]
    print(f"Using recently created webhook ID: {webhook_id}")
else:
    # If creation failed, try to find an existing webhook
    print("No webhook from creation, checking for existing webhooks...")
    response = requests.get(
        "http://localhost:7437/api/webhooks/outgoing",
        headers=agixt.headers,
    )
    if response.status_code == 200:
        webhooks = response.json()
        if webhooks:
            webhook_id = webhooks[0]["id"]
            print(f"Using existing webhook ID: {webhook_id}")

if webhook_id:
    update_data = {
        "name": "Updated Test Webhook",
        "description": "Updated webhook description",
        "event_types": ["agent.created", "agent.deleted", "chat.completed"],
        "active": True,
    }

    response = requests.put(
        f"http://localhost:7437/api/webhooks/outgoing/{webhook_id}",
        json=update_data,
        headers=agixt.headers,
    )

    if response.status_code == 200:
        updated_webhook = response.json()
        print(f"Successfully updated webhook: {updated_webhook.get('name', 'Unknown')}")
        print(f"Description: {updated_webhook.get('description', 'No description')}")
        print(f"Event types: {updated_webhook.get('event_types', [])}")
        print(f"Webhook ID: {updated_webhook.get('id', 'Unknown')}")
    else:
        print(f"Error updating webhook: {response.status_code} - {response.text}")
else:
    print("No webhook available to update, skipping update test")
    print("This could mean webhook creation is failing or no webhooks exist")

## Get Extension Commands


In [None]:
ext = agixt.get_extensions()

## Get command arguments


In [None]:
command_args = agixt.get_command_args(command_name="Write to File")

## Create a new Agent

Creates a new agent with the `ezlocalai` provider.


In [None]:
agent_name = "test_agent"
# Gets a list of the provider setting defaults
# We'll use defaults for the provider instead of defining anything for the tests.
add_agent_resp = agixt.add_agent(
    agent_name=agent_name,
    settings={
        "mode": "prompt",
        "prompt_category": "Default",
        "prompt_name": "Think About It",
        "persona": "",
    },
)

## Get Extensions Available to Agent

This function will get a list of extensions available to the agent as well as the required settings keys and available commands per extension. If the agent does not have the settings keys for the specific extension, the list of commands will be empty.

In [None]:
agent_extensions = agixt.get_agent_extensions(agent_name=agent_name)

## Execute a Command


In [None]:
agent_name = "test_agent"
command_execution = agixt.execute_command(
    agent_name=agent_name,
    command_name="Write to File",
    command_args={"filename": "test files.txt", "text": "This is just a test!"},
    conversation_name="Command execution for testing",
)

## Get a list of all current Agents

Any agents that you have created will be listed here. The `status` field is to say if the agent is currently running a task or not.


In [None]:
agents = agixt.get_agents()

## Rename the test agent

We will just rename it to `new_agent`.


In [None]:
agent_name = "test_agent"
new_agent_name = "new_agent"
rename_agent_resp = agixt.rename_agent(agent_name=agent_name, new_name=new_agent_name)

## Get the agent's settings

This will get the settings for the agent we just created, this will tell you all commands available to the agent as well as all of the provider settings for the agent.


In [None]:
agent_name = "new_agent"
agent_config = agixt.get_agentconfig(agent_name=agent_name)

## Update the agent's settings

We'll just update the temperature from the default `0.7` to `0.8` to confirm that we can modify a setting.


In [None]:
agent_name = "new_agent"
agent_config = agixt.get_agentconfig(agent_name=agent_name)
agent_settings = agent_config["settings"]
# We'll just change the AI_TEMPERATURE setting for the test
agent_settings["AI_TEMPERATURE"] = 0.8
update_agent_settings_resp = agixt.update_agent_settings(
    agent_name=agent_name, settings=agent_settings
)
print("Update agent settings response:", update_agent_settings_resp)
agent_config = agixt.get_agentconfig(agent_name=agent_name)

## Get a list of the agent's commands

This will get a list of all commands available to the agent.


In [None]:
agent_name = "new_agent"
commands = agixt.get_commands(agent_name=agent_name)

## Toggle a Command for the Agent

We'll toggle the `Write to File` command to `true` to confirm that we can toggle a command.


In [None]:
agent_name = "new_agent"
# We'll enable the "Write to File" command for the test
toggle_command_resp = agixt.toggle_command(
    agent_name=agent_name, command_name="Write to File", enable=True
)

## Update Agent Commands

In this example, we'll only change the `Convert Markdown to PDF` command to `False`, but we could change any (or all) of the commands with this API call.

In [None]:
agent_name = "new_agent"
agent_config = agixt.get_agentconfig(agent_name=agent_name)
if agent_config["commands"] != None:
    agent_commands = agent_config["commands"]
else:
    agent_commands = {}
agent_commands["Convert Markdown to PDF"] = False
update_agent_commands_resp = agixt.update_agent_commands(
    agent_name=agent_name, commands=agent_commands
)
agent_config = agixt.get_agentconfig(agent_name=agent_name)

## Create a new conversation


In [None]:
conversation = agixt.new_conversation(
    agent_name="new_agent", conversation_name="Talk for Tests"
)

## Get Conversations


In [None]:
conversations = agixt.get_conversations(agent_name="new_agent")

## Manual Conversation Message

In [None]:
agixt.new_conversation_message(
    role="USER",
    conversation_name="AGiXT Conversation",
    message="This is a test message from the user!",
)
agixt.new_conversation_message(
    role="new_agent",
    conversation_name="AGiXT Conversation",
    message="This is a test message from the agent!",
)

## Get Conversation Details


In [None]:
agent_name = "new_agent"
conversation_name = "AGiXT Conversation"
conversation = agixt.get_conversation(
    agent_name=agent_name, conversation_name=conversation_name, limit=100, page=1
)

## Fork a Conversation

In [None]:
# Add an extra interaction to the conversation so that there is more than just one
agixt.new_conversation_message(
    role="USER",
    conversation_name="AGiXT Conversation",
    message="This is a test message from the user!",
)
agixt.new_conversation_message(
    role="new_agent",
    conversation_name="AGiXT Conversation",
    message="This is a test message from the agent!",
)
forked_conversation = agixt.fork_conversation(
    conversation_name=conversation_name, message_id=conversation[1]["id"]
)
fork = agixt.get_conversation(
    agent_name=agent_name,
    conversation_name=forked_conversation,
)

## Delete Message from Conversation


In [None]:
agent_name = "new_agent"
conversation_name = "AGiXT Conversation"
conversation = agixt.get_conversation(
    agent_name=agent_name, conversation_name=conversation_name, limit=100, page=1
)
delete_msg = conversation[0]["message"]
print(f"Deleting message: {delete_msg}")
delete_msg_resp = agixt.delete_conversation_message(
    agent_name=agent_name, conversation_name=conversation_name, message=delete_msg
)

## Delete a Conversation


In [None]:
agent_name = "new_agent"
conversation_name = "AGiXT Conversation"
conversation = agixt.delete_conversation(
    agent_name=agent_name, conversation_name=conversation_name
)

## Have the Agent Learn from specified Text


In [None]:
agent_name = "new_agent"
text_learning = agixt.learn_text(
    agent_name=agent_name,
    user_input="What is AGiXT?",
    text="AGiXT is an open-source artificial intelligence automation platform.",
    collection_number="0",
)

## Have the Agent Learn from Files


### Zip

In [None]:
import base64

learn_file_path = "test.zip"
with open(learn_file_path, "rb") as f:
    learn_file_content = base64.b64encode(f.read()).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### CSV

In [None]:
import base64

learn_file_path = "test.csv"
with open(learn_file_path, "rb") as f:
    learn_file_content = base64.b64encode(f.read()).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### XLS/XLSX

In [None]:
import base64

learn_file_path = "test.xlsx"
with open(learn_file_path, "rb") as f:
    learn_file_content = base64.b64encode(f.read()).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### DOC/DOCX

In [None]:
import base64

learn_file_path = "test.docx"
with open(learn_file_path, "rb") as f:
    learn_file_content = base64.b64encode(f.read()).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### PPT/PPTX

In [None]:
import requests
import base64

ppt_url = "https://getsamplefiles.com/download/pptx/sample-1.pptx"
response = requests.get(ppt_url)
learn_file_path = os.path.join(os.getcwd(), "sample-1.pptx")
with open(learn_file_path, "wb") as f:
    f.write(response.content)
learn_file_content = base64.b64encode(response.content).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### PDF

In [None]:
import requests
import base64

pdf_url = "https://getsamplefiles.com/download/pdf/sample-1.pdf"
response = requests.get(pdf_url)
learn_file_path = os.path.join(os.getcwd(), "sample-1.pdf")
with open(learn_file_path, "wb") as f:
    f.write(response.content)
learn_file_content = base64.b64encode(response.content).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

### TXT

In [None]:
import base64

learn_file_path = "test.txt"
with open(learn_file_path, "rb") as f:
    learn_file_content = base64.b64encode(f.read()).decode("utf-8")

file_learning = agixt.learn_file(
    agent_name=agent_name,
    file_name=learn_file_path,
    file_content=learn_file_content,
    collection_number="0",
)

## Have the Agent Learn from a URL


In [None]:
agent_name = "new_agent"
url_learning = agixt.learn_url(
    agent_name=agent_name,
    url="https://josh-xt.github.io/AGiXT",
    collection_number="0",
)

## Get the Agents Memories

Get some relevant memories from the agent about AGiXT.


In [None]:
agent_name = "new_agent"
memories = agixt.get_agent_memories(
    agent_name=agent_name,
    user_input="What can you tell me about AGiXT?",
    limit=10,
    min_relevance_score=0.2,
    collection_number="0",
)

## Chat with the Agent

Chat about the learned information with the agent.


In [None]:
agent_name = "new_agent"
agent_chat = agixt.chat(
    agent_name=agent_name,
    user_input="What can you tell me about AGiXT?",
    conversation="Tell me about AGiXT",
    context_results=6,
)

## Delete a Memory

Delete a specific memory by Memory ID.


In [None]:
agent_name = "new_agent"
memories = agixt.get_agent_memories(
    agent_name=agent_name,
    user_input="What can you tell me about AGiXT?",
    limit=1,
    min_relevance_score=0.2,
    collection_number="0",
)
# Remove the first memory
memory = memories[0]
# memory_id = memory["id"]
print("Memory:", memory)
# print("Memory ID:", memory_id)
# delete_memory_resp = agixt.delete_agent_memory(
#    agent_name=agent_name, memory_id=memory_id, collection_number="0"
# )
# print("Delete memory response:", delete_memory_resp)

## Wipe the agents memories

This is necessary if you want the agent to serve a different purpose than its original intent after it has learned things. It may inject unnecessary context into the conversation if you don't wipe its memory and try to give it a different purpose, even temporarily.


In [None]:
# Note: Use this function with caution as it will erase the agent's memory.
agent_name = "new_agent"
wipe_mem_resp = agixt.wipe_agent_memories(agent_name=agent_name, collection_number="0")

## Instruct the Agent to do something

We'll do something simple with it for the sake of the basic example, we'll just tell it to `Tell me the capital of France`.


In [None]:
agent_name = "new_agent"
instruct_resp = agixt.instruct(
    agent_name=agent_name,
    user_input="Save a file with the capital of France in it called 'france.txt'.",
    conversation="Talk for Tests",
)

## Prompt the Agent

Use a custom Prompt Template to prompt the agent. For our example, we'll use our "Write a Poem" prompt template to have the agent write a poem for us about dragons.


In [None]:
agent_name = "new_agent"
prompt_name = "Write a Haiku"
user_input = "Show me 2."
# The "Write a Poem" prompt only requires one argument, "subject".
# We'll ask the AI to write a poem about dragons.
prompt_args = {
    "user_input": user_input,
    "subject": "dragons",
    "websearch": False,
    "websearch_depth": 0,
    "context_results": 0,
    "shots": 1,
    "conversation_name": "Talk for Tests",
}

agent_prompt_resp = agixt.prompt_agent(
    agent_name=agent_name,
    prompt_name=prompt_name,
    prompt_args=prompt_args,
)

## Get a list of Chains available to use


In [None]:
chains = agixt.get_chains()

## Create a new chain


In [None]:
chain_name = "Write another Poem"
add_chain_resp = agixt.add_chain(chain_name=chain_name)

## Rename the chain


In [None]:
chain_name = "Write another Poem"
new_chain_name = "Poem Writing Chain"
rename_chain_resp = agixt.rename_chain(chain_name=chain_name, new_name=new_chain_name)

## Add Chain Steps


In [None]:
agent_name = "new_agent"
chain_name = "Poem Writing Chain"
# Test add_step()
add_step_resp = agixt.add_step(
    chain_name=chain_name,
    step_number=1,
    agent_name=agent_name,
    prompt_type="Prompt",
    prompt={
        "prompt_name": "Write a Poem",
        "subject": "Artificial Intelligence",
    },
)
add_step_resp = agixt.add_step(
    chain_name=chain_name,
    step_number=2,
    agent_name=agent_name,
    prompt_type="Prompt",
    prompt={
        "prompt_name": "Write a Poem",
        "subject": "Quantum Computers",
    },
)

## Get the content of the chain


In [None]:
chain_name = "Poem Writing Chain"
chain = agixt.get_chain(chain_name=chain_name)

## Get Chain Arguments


In [None]:
chain_name = "Poem Writing Chain"
agixt.get_chain_args(chain_name=chain_name)

## Modify a chain step

Instead of the subject of the poem just being Artificial Intelligence, we'll change it to be Artificial General Intelligence.


In [None]:
chain_name = "Poem Writing Chain"
agent_name = "new_agent"
# Test update_step()
update_step_resp = agixt.update_step(
    chain_name=chain_name,
    step_number=1,
    agent_name=agent_name,
    prompt_type="Prompt",
    prompt={
        "prompt_name": "Write a Poem",
        "subject": "Artificial General Intelligence",
    },
)

## Move the chain step

When you move a step, it will automatically reassign the order of the steps to match the new order. If there are only 2 steps like in our case, it will just swap them.


In [None]:
chain_name = "Poem Writing Chain"
agent_name = "new_agent"
move_step_resp = agixt.move_step(
    chain_name=chain_name, old_step_number=1, new_step_number=2
)

## Delete a step from the chain


In [None]:
chain_name = "Poem Writing Chain"
delete_step_resp = agixt.delete_step(chain_name=chain_name, step_number=2)

## Add a Command to the Chain

We'll write the result to a file for an example.


In [None]:
agent_name = "new_agent"
chain_name = "Poem Writing Chain"
add_step_resp = agixt.add_step(
    chain_name=chain_name,
    step_number=2,
    agent_name=agent_name,
    prompt_type="Command",
    prompt={
        "command_name": "Write to File",
        "filename": "{user_input}.txt",
        "text": "Poem:\n{STEP1}",
    },
)

## Run the chain


In [None]:
chain_name = "Poem Writing Chain"
# The user input for this chain will just be the name of the text file to write to.
user_input = "Super Poems"
run_chain_resp = agixt.run_chain(
    chain_name=chain_name, user_input=user_input, from_step=1
)

## Delete the chain


In [None]:
delete_chain_resp = agixt.delete_chain(chain_name="Poem Writing Chain")

## Get a list of prompts available to use


In [None]:
prompts = agixt.get_prompts(prompt_category="Default")

## Get the content of a prompt


In [None]:
get_prompt_resp = agixt.get_prompt(
    prompt_name="Write a Poem", prompt_category="Default"
)

## Create a new prompt

We'll make a basic prompt that asks the AI to tell us a short story about a subject. The subject is not yet defined, it would be defined in a chain. Using `{variable_name}` in a prompt will allow you to define the variable in a chain and have it be used in the prompt.


In [None]:
add_prompt_resp = agixt.add_prompt(
    prompt_name="Short Story",
    prompt="Tell me a short story about {subject}",
    prompt_category="Default",
)

## Get the prompt variables


In [None]:
get_prompt_args_resp = agixt.get_prompt_args(
    prompt_name="Short Story", prompt_category="Default"
)

## Update the prompt content

We'll ask it to `Add a dragon to the story somehow` in the prompt to make the short story more interesting.


In [None]:
update_prompt_resp = agixt.update_prompt(
    prompt_name="Short Story",
    prompt="Tell me a short story about {subject}. Add a dragon to the story somehow.",
    prompt_category="Default",
)

## Delete the prompt

If you don't want the prompt anymore, delete it.


In [None]:
delete_prompt_resp = agixt.delete_prompt(
    prompt_name="Short Story", prompt_category="Default"
)

## Delete the Agent

If you are done with the agent and don't want or need it anymore, you can delete it along with everything associated with it, such as its memories, settings, and history. The Agent isn't just fired, it is dead.


In [None]:
agent_name = "new_agent"
delete_agent_resp = agixt.delete_agent(agent_name=agent_name)

## OpenAI Style Endpoint Tests

### Get Embeddings
[OpenAI API Reference](https://platform.openai.com/docs/api-reference/embeddings)

In [None]:
# Modify this prompt to generate different outputs
prompt = "Tacos are great."

response = openai.embeddings.create(
    input=prompt,
    model=agent_name,
)
print(response.data[0].embedding)

### Chat Completion Tests

[OpenAI API Reference](https://platform.openai.com/docs/api-reference/chat)


In [None]:
# Modify this prompt to generate different outputs
prompt = "Write a short poem about Pikachu with a picture."


response = openai.chat.completions.create(
    model=agent_name,  # Model is Agent Name
    messages=[{"role": "user", "content": prompt}],
    stream=False,
    user="Pikachu Poem",  # User is Conversation Name
)
display_content(response.choices[0].message.content)

### Streaming Chat Completion Test

Test the new streaming functionality that allows real-time streaming of AI responses.

[OpenAI API Reference - Streaming](https://platform.openai.com/docs/api-reference/chat/streaming)

In [None]:
import time
import json

# Test streaming chat completion
prompt = "Tell me a short story about a robot learning to paint. Make it creative and engaging."

print("🎬 Starting streaming test...")
print("=" * 60)
print("Response will appear in real-time:")
print("-" * 60)

start_time = time.time()

try:
    # Create streaming request
    stream = openai.chat.completions.create(
        model=agent_name,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=300,
        temperature=0.7,
        user="Streaming Test",
    )

    # Process streaming response
    full_response = ""
    chunk_count = 0

    for chunk in stream:
        chunk_count += 1
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end="", flush=True)

        # Check if streaming is complete
        if chunk.choices[0].finish_reason == "stop":
            break

    end_time = time.time()

    print("\n" + "-" * 60)
    print(f"✅ Streaming completed successfully!")
    print(f"📊 Statistics:")
    print(f"   • Total chunks received: {chunk_count}")
    print(f"   • Total characters: {len(full_response)}")
    print(f"   • Time taken: {end_time - start_time:.2f} seconds")
    print(
        f"   • Average chars/second: {len(full_response)/(end_time - start_time):.1f}"
    )
    print("=" * 60)

except Exception as e:
    print(f"❌ Streaming test failed: {str(e)}")
    print(
        "This could indicate that streaming is not properly implemented or the agent is not available."
    )

### Streaming vs Non-Streaming Comparison

Compare the performance and user experience between streaming and non-streaming responses.

In [None]:
import time

# Same prompt for both tests to ensure fair comparison
comparison_prompt = "Explain the concept of artificial intelligence in simple terms, covering machine learning, neural networks, and real-world applications."

print("🔄 Running comparison tests...")
print("=" * 80)

# Test 1: Non-streaming
print("1️⃣ Non-streaming test:")
print("-" * 40)

start_time = time.time()

non_streaming_response = openai.chat.completions.create(
    model=agent_name,
    messages=[{"role": "user", "content": comparison_prompt}],
    stream=False,
    max_tokens=250,
    temperature=0.7,
    user="Comparison Test - Non-Streaming",
)

end_time = time.time()
non_streaming_time = end_time - start_time
non_streaming_content = non_streaming_response.choices[0].message.content

print(f"⏱️ Total time to first content: {non_streaming_time:.2f} seconds")
print(f"📝 Response: {non_streaming_content[:100]}...")
print(f"📊 Character count: {len(non_streaming_content)}")

print("\n" + "=" * 80)

# Test 2: Streaming
print("2️⃣ Streaming test:")
print("-" * 40)

start_time = time.time()
first_content_time = None
streaming_content = ""

stream = openai.chat.completions.create(
    model=agent_name,
    messages=[{"role": "user", "content": comparison_prompt}],
    stream=True,
    max_tokens=250,
    temperature=0.7,
    user="Comparison Test - Streaming",
)

print("📡 Streaming response (live):")
print("💬 ", end="", flush=True)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        content = chunk.choices[0].delta.content
        streaming_content += content

        # Record time to first content
        if first_content_time is None:
            first_content_time = time.time() - start_time

        print(content, end="", flush=True)

    if chunk.choices[0].finish_reason == "stop":
        break

end_time = time.time()
streaming_total_time = end_time - start_time

print(f"\n\n📊 Streaming Statistics:")
print(f"   • Time to first content: {first_content_time:.2f} seconds")
print(f"   • Total completion time: {streaming_total_time:.2f} seconds")
print(f"   • Character count: {len(streaming_content)}")

print("\n" + "=" * 80)
print("🎯 Comparison Results:")
print(f"   • Non-streaming total time: {non_streaming_time:.2f}s")
print(f"   • Streaming first content: {first_content_time:.2f}s")
print(f"   • Streaming total time: {streaming_total_time:.2f}s")
print(
    f"   • Time to first content improvement: {((non_streaming_time - first_content_time) / non_streaming_time * 100):.1f}%"
)
print("   • User Experience: Streaming provides immediate feedback! 🚀")

### Streaming Error Handling Test

Test how the streaming endpoint handles various error scenarios and edge cases.

In [None]:
print("🧪 Testing streaming error handling scenarios...")
print("=" * 60)

# Test 1: Invalid model name
print("1️⃣ Testing with invalid model name:")
try:
    stream = openai.chat.completions.create(
        model="nonexistent_agent",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True,
        max_tokens=50,
    )

    response_received = False
    for chunk in stream:
        response_received = True
        if chunk.choices[0].delta.content:
            print(f"   📄 Chunk: {chunk.choices[0].delta.content[:50]}...")
        if chunk.choices[0].finish_reason:
            print(f"   🏁 Finish reason: {chunk.choices[0].finish_reason}")
            break

    if response_received:
        print("   ✅ Error handled gracefully in streaming format")
    else:
        print("   ❌ No response received")

except Exception as e:
    print(f"   ⚠️ Exception caught: {str(e)[:100]}...")

print("\n" + "-" * 60)

# Test 2: Empty message
print("2️⃣ Testing with empty message:")
try:
    stream = openai.chat.completions.create(
        model=agent_name,
        messages=[{"role": "user", "content": ""}],
        stream=True,
        max_tokens=50,
    )

    response_received = False
    for chunk in stream:
        response_received = True
        if chunk.choices[0].delta.content:
            print(f"   📄 Chunk: {chunk.choices[0].delta.content[:50]}...")
        if chunk.choices[0].finish_reason:
            print(f"   🏁 Finish reason: {chunk.choices[0].finish_reason}")
            break

    if response_received:
        print("   ✅ Empty message handled appropriately")
    else:
        print("   ❌ No response received")

except Exception as e:
    print(f"   ⚠️ Exception caught: {str(e)[:100]}...")

print("\n" + "-" * 60)

# Test 3: Very low max_tokens
print("3️⃣ Testing with very low max_tokens (5):")
try:
    stream = openai.chat.completions.create(
        model=agent_name,
        messages=[{"role": "user", "content": "Tell me about artificial intelligence"}],
        stream=True,
        max_tokens=5,  # Very low limit
    )

    chunk_count = 0
    total_content = ""

    for chunk in stream:
        chunk_count += 1
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            total_content += content
            print(content, end="", flush=True)
        if chunk.choices[0].finish_reason:
            print(f"\n   🏁 Finish reason: {chunk.choices[0].finish_reason}")
            break

    print(f"   📊 Received {chunk_count} chunks, {len(total_content)} characters total")
    print("   ✅ Low max_tokens handled correctly")

except Exception as e:
    print(f"   ⚠️ Exception caught: {str(e)[:100]}...")

print("\n" + "=" * 60)
print("🎯 Error handling tests completed!")

### Vision Test
The model used for tests does not have vision, but this example is here to show how you would use the endpoint if you had a model that could process images.

In [None]:
response = openai.chat.completions.create(
    model=agent_name,
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "Describe each stage of this image."},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"https://www.visualwatermark.com/images/add-text-to-photos/add-text-to-image-3.webp"
                    },
                },
            ],
        },
    ],
    user="Vision Test",
)
display_content(response.choices[0].message.content)

### File Upload Test

In [None]:
import base64

# Download csv to hurricanes.csv
csv_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/hurricanes.csv"
response = requests.get(csv_url)
base64_encoded_file = base64.b64encode(response.content).decode("utf-8")
data_url = f"data:application/csv;base64,{base64_encoded_file}"

response = openai.chat.completions.create(
    model=agent_name,
    messages=[
        {
            "role": "user",
            "analyze_user_input": "false",
            "content": [
                {
                    "type": "text",
                    "text": "Which month had the most hurricanes according to the data provided?",
                },
                {
                    "type": "file_url",
                    "file_url": {
                        "url": data_url,
                    },
                },
            ],
        },
    ],
    user="Data Analysis",
)
display_content(response.choices[0].message.content)

### Websearch Test

In [None]:
# Modify this prompt to generate different outputs
prompt = "What are the latest critical windows vulnerabilities that have recently been patched in the past week?"


response = openai.chat.completions.create(
    model=agent_name,
    messages=[
        {
            "role": "user",
            "websearch": "true",
            "websearch_depth": "2",
            "content": prompt,
        }
    ],
    stream=False,
    user="Windows Vulnerabilities",
)
display_content(response.choices[0].message.content)