<img width="60px" style="float: right;" src="https://xmks.s3.amazonaws.com/2020/X-Blue.png">
 
# XMPro MAGS via MQTT  
### Interaction Notebook (author: **Gavin Green** )
Code: https://xmpro.github.io/Blueprints-Accelerators-Patterns/

<img width="60px" src="https://img.shields.io/badge/License-MIT-green.svg">

This notebook sets up an MQTT client, connects to a broker, and allows for interaction with a MAGS Agent.

## Prerequisites

Before running this notebook, make sure you have the following dependencies installed:

In [1]:
! pip install --upgrade --quiet paho.mqtt ipywidgets pandas pytz python-dotenv

## Imports and Initial Setup

We use the read_env_file to read in the .env and load it into the OS variables.  Make sure you have a .env and the following inside it:

- MQTT_BROKER
- MQTT_PORT
- MQTT_USER
- MQTT_PASSWORD

In [13]:
# Imports and Initial Setup

from IPython.display import display, Markdown, HTML, clear_output, Javascript
import os
from paho.mqtt import client as mqtt_client
import time
import json
from collections import deque
import pandas as pd
import pytz
import ipywidgets as widgets
from datetime import datetime, timezone, timedelta
import re
import socket
import threading
import queue

def read_env_file(file_path='.env'):
    env_vars = {}
    try:
        with open(file_path, 'r') as file:
            for line in file:
                line = line.strip()
                if line and not line.startswith('#'):
                    key, value = line.split('=', 1)
                    env_vars[key.strip()] = value.strip()
    except FileNotFoundError:
        print(f"Warning: {file_path} not found. Using default values.")
    return env_vars

# Read the .env file
env_vars = read_env_file()

# Set environment variables
for key, value in env_vars.items():
    os.environ[key] = value

## Global Variables

Here we define all the global variable we are using, and also ask for the `TEAM_ID` and the `AGENT_ID` to be entered.  

We also ask for the `local_timezone` so the dates can be formatted appropriately.  The default if not provided is `American/Chicago`.

The MQTT topics are configured using these inputs.

In [None]:
# Global Variables

def validate_team_id(team_id):
    pattern = r'^[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-TEAM-\d{3}$'
    if not re.match(pattern, team_id):
        raise ValueError("Invalid TEAM_ID format. It should follow the pattern: SITE-AREA-FUNCTION-TEAM-VERSION (e.g., DALLAS-PROD-OPS-TEAM-001)")
    return team_id

def validate_agent_id(agent_id):
    pattern = r'^[A-Z0-9]+-[A-Z0-9]+-AGENT-\d{3}$'
    if not re.match(pattern, agent_id):
        raise ValueError("Invalid AGENT_ID format. It should follow the pattern: AREA-FUNCTION-AGENT-INSTANCE (e.g., WTR-QUAL-AGENT-001)")
    return agent_id

def get_user_input(prompt, default, validation_func=None):
    while True:
        user_input = input(f"{prompt} (default: {default}): ") or default
        if validation_func:
            try:
                return validation_func(user_input)
            except ValueError as e:
                print(f"Error: {e}")
        else:
            return user_input

# Get user input for key variables
TEAM_ID = get_user_input("Enter TEAM_ID", "", validate_team_id)
AGENT_ID = get_user_input("Enter AGENT_ID", "", validate_agent_id)
local_timezone  = get_user_input("Enter local_timezone (like: America/Chicago)", "America/Chicago")

broker_address  = os.environ.get('MQTT_BROKER', 'Not set')
port            = int(os.environ.get('MQTT_PORT', 'Not set'))
user            = os.environ.get('MQTT_USER', 'Not set')
password        = os.environ.get('MQTT_PASSWORD', 'Not set')
client          = None;

# Create a queue for received messages
message_queue       = queue.Queue()
processing_active   = True

# MQTT Topics
topic_receive = {
    'status': f"XMAGS/{TEAM_ID}/EVT/status/{AGENT_ID}",
    'observation_response': f"XMAGS/{TEAM_ID}/EVT/observation_result/{AGENT_ID}",
    'reflection_response': f"XMAGS/{TEAM_ID}/EVT/reflection_result/{AGENT_ID}",
    'plan_new': f"XMAGS/{TEAM_ID}/EVT/plan_new",
    'plan_update': f"XMAGS/{TEAM_ID}/EVT/plan_update",
    'plan_task': f"XMAGS/{TEAM_ID}/CMD/plan_task",
    'plan_action': f"XMAGS/{TEAM_ID}/CMD/plan_action",
    'conversation_response': f"XMAGS/{TEAM_ID}/EVT/chat_response/{AGENT_ID}"
}

topic_send = {
    'observation': f"XMAGS/{TEAM_ID}/DATA/observation/{AGENT_ID}",
    'conversation': f"XMAGS/{TEAM_ID}/DATA/chat/{AGENT_ID}",
}

# Global variables for data storage and visualization
connected = False
conversation_id = None
status_table = pd.DataFrame(columns=['Timestamp', 'State', 'Memories Processed', 'Plans Generated', 'Conversations Handled', 'Tool Usage', 'Last Reset Time', 'Error Message'])
max_table_rows = 10
table_output = widgets.Output()

# Global variable to store received messages
received_messages = []

def create_html_table(title, data):
    html = ""
    if title:
        html += f"<h4>{title}</h4>"
    
    html += "<table style='border-collapse: collapse; width: 100%;'>"
    for key, value in data.items():
        html += f"<tr><td style='border: 1px solid #ddd; padding: 8px; font-weight: bold; width: 18%'>{key}</td>"
        html += f"<td style='border: 1px solid #ddd; padding: 8px; text-align: left;'>{format_value(value)}</td></tr>"
    html += "</table>"
    return html

def format_value(value):
    if isinstance(value, (dict, list)):
        return f"<pre>{json.dumps(value, indent=2, default=str)}</pre>"
    elif isinstance(value, str):
        return value.replace('\n', '<br>')
    else:
        return str(value)
    
html_content = "<div style='font-family: Arial, sans-serif; max-width: 800px;'>"
html_content += create_html_table("Global Variables", {
    "TEAM_ID": TEAM_ID,
    "AGENT_ID": AGENT_ID,
    "broker_address": broker_address,
    "port": port,
    "user": user,
    "local_timezone": local_timezone
})
html_content += "</div><br /><hr />"

display(HTML(html_content))

## Function Definitions

Here we define all the various function we will need to connect and work with MQTT as well as present the data as a table.



In [15]:
# Function Definitions

def create_interactive_html():
    html = """
    <div id="mqtt-interface" style="font-family: Arial, sans-serif; margin: auto;">
        <h3>Agent Status</h3>
        <div id="table-output" style="margin-top: 20px; overflow-x: auto;"></div>
        <hr>
        <h3>Received Messages</h3>
        <div id="messages-output" style="max-height: 200px; overflow-y: auto;"></div>
    </div>
    """
    return HTML(html)

def update_display():
    table_html = status_table.to_html(index=False, classes='table table-striped table-bordered')
    messages_html = "<br>".join(reversed(received_messages[-10:]))  # Display last 10 messages, latest first
    
    display(Javascript(f"""
        document.getElementById('table-output').innerHTML = `{table_html}`;
        document.getElementById('messages-output').innerHTML = `{messages_html}`;
    """))

    # Force the display to update
    display(HTML("<script>IPython.notebook.kernel.execute('sys.stdout.flush()')</script>"))

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode())
        #print(f"Payload: {json.dumps(payload, indent=2)}")
        message_queue.put((msg.topic, payload))
    except json.JSONDecodeError:
        print(f"Raw payload: {msg.payload.decode()}")

def on_connect(client, userdata, flags, reason_code, properties):
    global connected
    if reason_code == 0:
        connected = True
    else:
        print(f"Failed to connect, return code {reason_code}")

def connect_and_subscribe(client):
    try:
        print(f"Attempting to connect to {broker_address}:{port}")
        client.connect(broker_address, port=port)
        client.loop_start()

        # Wait for the connection to be established
        timeout = 10
        start_time = time.time()
        while not connected and time.time() - start_time < timeout:
            time.sleep(0.1)

        if not connected:
            raise TimeoutError("Connection timed out")

        print("Connected to MQTT broker successfully")

        for topic in topic_receive.values():
            client.subscribe(topic)
            print(f"Subscribed to {topic}")

    except Exception as e:
        print(f"Error connecting to MQTT broker: {e}")
        if isinstance(e, socket.gaierror):
            print("This might be a DNS resolution issue. Check if the broker address is correct.")
        elif isinstance(e, ConnectionRefusedError):
            print("The connection was refused. Check if the broker is running and the port is correct.")
        elif isinstance(e, ssl.SSLError):
            print("SSL error occurred. Check your SSL/TLS settings.")
        raise  # Re-raise the exception for further handling

def publish_message(topic, message):
    result = client.publish(topic, json.dumps(message))
    status = result[0]
    if status == 0:
        print(f"Message sent successfully to topic {topic}")
    else:
        print(f"Failed to send message to topic {topic}, with status {status}")
        if status == 1:
            print("Connection refused: unacceptable protocol version")
        elif status == 2:
            print("Connection refused: identifier rejected")
        elif status == 3:
            print("Connection refused: server unavailable")
        elif status == 4:
            print("Connection refused: bad username or password")
        elif status == 5:
            print("Connection refused: not authorized")
    return status

def send_chat_message(content, conversation_id=None):
    chat_message = {
        "agent_id": AGENT_ID,
        "conversation_id": conversation_id or "",
        "content": content
    }
    publish_message(topic_send['conversation'], chat_message)

def send_observation(content):
    observation = {
        "agent_id": AGENT_ID,
        "content": content
    }
    publish_message(topic_send['observation'], observation)

def parse_timestamp(timestamp_str):
    if isinstance(timestamp_str, datetime):
        return timestamp_str.astimezone(pytz.timezone(local_timezone))
    
    timestamp_str = timestamp_str.rstrip('Z') + '+00:00'
    
    match = re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\.(\d+)(\+00:00)', timestamp_str)
    if match:
        main_time, fractional_seconds, tz = match.groups()
        microseconds = fractional_seconds[:6].ljust(6, '0')
        timestamp_str = f"{main_time}.{microseconds}{tz}"
        utc_time = datetime.fromisoformat(timestamp_str)
        local_tz = pytz.timezone(local_timezone)
        local_time = utc_time.astimezone(local_tz)
        return local_time
    else:
        raise ValueError(f"Unable to parse timestamp: {timestamp_str}")

def process_messages():
    global processing_active
    while processing_active:
        try:
            topic, payload = message_queue.get(timeout=1)
            handle_message(topic, payload)
        except queue.Empty:
            continue
        except Exception as e:
            print(f"Error processing message: {e}")

def handle_message(topic, payload):
    global received_messages, status_table
    
    try:
        if topic == topic_receive['status']:
            handle_status_message(payload)
        else:
            message = create_html_table(topic, payload)
            received_messages.append(message)

        update_display()
    except Exception as e:
        print(f"Error processing message for topic {topic}: {e}")
        print(f"Full payload: {payload}")

def handle_status_message(payload):
    global status_table
    
    try:
        timestamp = parse_timestamp(payload['timestamp'])
        formatted_timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')
        last_reset_time = parse_timestamp(payload['last_reset_time']).strftime('%Y-%m-%d %H:%M:%S %Z')
    except ValueError as e:
        print(f"Error parsing timestamp: {e}")
        timestamp = datetime.now(timezone.utc)
        formatted_timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')
        last_reset_time = "Unknown"
    
    new_row = pd.DataFrame({
        'Timestamp': [formatted_timestamp],
        'State': [payload['state']],
        'Memories Processed': [payload['memories_processed']],
        'Plans Generated': [payload['plans_generated']],
        'Conversations Handled': [payload['conversations_handled']],
        'Tool Usage': [payload['tool_usage']],
        'Last Reset Time': [last_reset_time],
        'Error Message': [payload['error_message'] if payload['error_message'] else 'None']
    })
    
    status_table = pd.concat([new_row, status_table], ignore_index=True).head(max_table_rows)


## MQTT Client Setup & Running

In [None]:
# MQTT Client Setup & Running

try:
    # Setup MQTT client
    client = mqtt_client.Client(client_id="JupyterNotebookClient", protocol=mqtt_client.MQTTv5, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
    client.on_message = on_message
    client.on_connect = on_connect
    client.username_pw_set(user, password=password)

    # Connect and subscribe in one step
    connect_and_subscribe(client)

    # Start the MQTT loop in a non-blocking way
    client.loop_start()

    # Start a separate thread for processing messages
    message_thread = threading.Thread(target=process_messages, daemon=True)
    message_thread.start()

    print("MQTT client is running in the background. You can now use the interaction cell.")

    # Display the initial interface
    display(create_interactive_html())

except KeyboardInterrupt:
    print("Interrupted by user, shutting down...")
except Exception as e:
    print(f"Error in MQTT setup or connection: {e}")

# Function to stop the client (can be called manually when needed)
def stop_client():
    global processing_active
    print("Cleaning up...")
    processing_active = False  # Stop the message processing loop
    message_thread.join(timeout=5)  # Wait for the message thread to finish
    client.loop_stop()
    client.disconnect()
    print("Disconnected from MQTT broker and stopped all threads.")

# Make client and stop_function globally accessible
globals()['mqtt_client'] = client
globals()['stop_mqtt_client'] = stop_client

## Interaction

In [None]:
# Publish a chat/conversation

conversation_id = ""
chat = ""

if chat and chat.strip():
    if not conversation_id or conversation_id.strip() == "":
        send_chat_message(chat)                                     # Start a new conversation
    else:
        send_chat_message(chat, conversation_id=conversation_id)    # Continue an existing conversation

In [None]:
# Publish an observation

observation = ""

if observation and observation.strip():
    send_observation(observation)

## Cleanup and Shutdown

In [None]:
# Run this cell to close down and disconnect

stop_mqtt_client()