In [None]:
from typing import Annotated, Literal, Optional
from autogen import AssistantAgent, UserProxyAgent, config_list_from_json
import os
from autogen import ConversableAgent
import requests
import json

In [11]:


# Define the path or environment variable for the configuration list
config_path = os.getenv("OAI_CONFIG_LIST", "OAI_CONFIG_LIST")  # Ensure correct path or environment variable
if not config_path:
    raise ValueError("OAI_CONFIG_LIST environment variable is not set or empty.")

# Load configuration for the Gemini model
config_list_gemini = config_list_from_json(
    config_path,
    filter_dict={"model": ["gemini-2.0-flash"]},
)
config_list_openai = config_list_from_json(
    config_path,
    filter_dict={"model": ["gpt-4o-mini"]},
)


# MCP Server interaction functions
MCP_SERVER_URL = "http://127.0.0.1:5000/api/tickets"  # Base URL for MCP server

def get_tickets(status: Optional[str] = None, priority: Optional[str] = None):
    """
    Get tickets from the MCP server, optionally filtered by status and priority.
    Returns the JSON response from the server.
    """
    params = {}
    if status:
        params['status'] = status
    if priority:
        params['priority'] = priority
    try:
        response = requests.get(MCP_SERVER_URL, params=params)
        response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Failed to get tickets: {e}"}

def get_ticket_by_id(ticket_id: str):
    """
    Get a specific ticket by ID from the MCP server.
    Returns the JSON response from the server.
    """
    url = f"{MCP_SERVER_URL}/{ticket_id}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Failed to get ticket {ticket_id}: {e}"}

def create_ticket(title: str, description: str, status: str, priority: str, assigned_to: str):
    """
    Create a new ticket on the MCP server.
    Returns the JSON response from the server.
    """
    headers = {'Content-Type': 'application/json'}
    data = {
        "title": title,
        "description": description,
        "status": status,
        "priority": priority,
        "assigned_to": assigned_to
    }
    try:
        response = requests.post(MCP_SERVER_URL, headers=headers, data=json.dumps(data))
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Failed to create ticket: {e}"}

def update_ticket(ticket_id: str, updates: dict):
    """
    Update an existing ticket on the MCP server.
    'updates' should be a dictionary containing fields to update (e.g., {'status': 'closed'}).
    Returns the JSON response from the server.
    """
    url = f"{MCP_SERVER_URL}/{ticket_id}"
    headers = {'Content-Type': 'application/json'}
    try:
        response = requests.put(url, headers=headers, data=json.dumps(updates))
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Failed to update ticket {ticket_id}: {e}"}


# Define Assistant Agent
assistant = ConversableAgent(
    name="MCP_Assistant",
    system_message="""You are a helpful AI assistant designed to interact with the MCP (MyCoolProject) server API.
    You can perform actions like retrieving tickets, creating new tickets, and updating existing tickets.
    Use the provided tools to interact with the MCP server based on user requests.
    When you have successfully addressed the user's request and provided a response, or if you cannot fulfill the request, respond with 'TERMINATE' to end the conversation.
    If you encounter errors from the API, inform the user and suggest alternative actions or clarifications.
    """,
    llm_config={"config_list":config_list_openai}
)

# Define User Proxy Agent
user_proxy = UserProxyAgent(
    name="MCP_User_Proxy",
    llm_config=False,
    is_termination_msg=lambda msg: msg.get("content") is not None and "TERMINATE" in msg["content"],
    human_input_mode="NEVER",  # Set to "ALWAYS" if you want human intervention
    # max_auto_reply=10, # set max auto reply to a reasonable number
    code_execution_config={"use_docker": False}, # set to True or False based on your requirement
)

# Register tools for Assistant Agent (for planning and tool selection)
assistant.register_for_llm(name="get_tickets", description="Get a list of tickets from the MCP server. You can filter by status and priority.")(get_tickets)
assistant.register_for_llm(name="get_ticket_by_id", description="Get details of a specific ticket by its ID.")(get_ticket_by_id)
assistant.register_for_llm(name="create_ticket", description="Create a new ticket on the MCP server. Requires title, description, status, priority, and assigned_to.")(create_ticket)
assistant.register_for_llm(name="update_ticket", description="Update an existing ticket on the MCP server. Requires ticket ID and a dictionary of fields to update.")(update_ticket)

# Register tools for User Proxy Agent (for execution)
user_proxy.register_for_execution(name="get_tickets")(get_tickets)
user_proxy.register_for_execution(name="get_ticket_by_id")(get_ticket_by_id)
user_proxy.register_for_execution(name="create_ticket")(create_ticket)
user_proxy.register_for_execution(name="update_ticket")(update_ticket)


The return type of the function 'get_tickets' is not annotated. Although annotating it is optional, the function should return either a string, a subclass of 'pydantic.BaseModel'.
The return type of the function 'get_ticket_by_id' is not annotated. Although annotating it is optional, the function should return either a string, a subclass of 'pydantic.BaseModel'.
The return type of the function 'create_ticket' is not annotated. Although annotating it is optional, the function should return either a string, a subclass of 'pydantic.BaseModel'.
The return type of the function 'update_ticket' is not annotated. Although annotating it is optional, the function should return either a string, a subclass of 'pydantic.BaseModel'.


<function __main__.update_ticket(ticket_id: str, updates: dict)>

In [19]:

print("MCP Client started. Type 'exit' to quit.")

# user_input = "get list of tickets"
user_input = "get high priority tickets"
# if user_input.lower() == 'exit':
#     break

chat_result = user_proxy.initiate_chat(assistant, message=user_input)

# Process and print the last response from the assistant (which should contain the final output)
last_message = chat_result.chat_history[-1]
if last_message and last_message['role'] == 'assistant':
    print(f"Assistant: {last_message['content']}")
else:
    print("No response from assistant.")

MCP Client started. Type 'exit' to quit.
[33mMCP_User_Proxy[0m (to MCP_Assistant):

get high priority tickets

--------------------------------------------------------------------------------
[31m
>>>>>>>> USING AUTO REPLY...[0m
[33mMCP_Assistant[0m (to MCP_User_Proxy):

[32m***** Suggested tool call (call_o3csuKFEgaXKYZDPQtyVxrMM): get_tickets *****[0m
Arguments: 
{"priority":"high"}
[32m****************************************************************************[0m

--------------------------------------------------------------------------------
[35m
>>>>>>>> EXECUTING FUNCTION get_tickets...[0m
[33mMCP_User_Proxy[0m (to MCP_Assistant):

[33mMCP_User_Proxy[0m (to MCP_Assistant):

[32m***** Response from calling tool (call_o3csuKFEgaXKYZDPQtyVxrMM) *****[0m
{"tickets": [{"assigned_group": "Web Operations", "assigned_to": "tech_support_01", "change_request_id": "CHNG_0001", "closed_date": "2025-03-20 11:00:00", "closure_code": "Fixed", "cmdb_ci_id": "CMDBCI0001", "cre

In [23]:
from typing import Optional, List, Dict, Callable

# Assume these functions and classes are defined elsewhere in your code
def load_docs_from_db(db_name: str, table_name: str) -> List[dict]:
    """Loads documents from a SQLite database table."""
    # Replace this with your actual database loading logic
    import sqlite3
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute(f"SELECT * FROM {table_name}")
    columns = [column[0] for column in cursor.description]
    rows = cursor.fetchall()
    conn.close()
    return [dict(zip(columns, row)) for row in rows]

# Load documents from a SQLite database
ci_db = load_docs_from_db("snow.db", "configuration_items")
# ci_db

def update_ids(data):
    """
    Updates 'upstream_ids' and 'downstream_ids' in the data by replacing 'CMDBCl' with 'CMDBCI'.

    Args:
        data: A list of dictionaries representing the CMDB data.

    Returns:
        The updated list of dictionaries.
    """
    for item in data:
        # Update upstream_ids
        if 'upstream_ids' in item and item['upstream_ids']:
            upstream_ids = [uid.strip().replace('CMDBCl', 'CMDBCI') for uid in item['upstream_ids'].split(',')]
            item['upstream_ids'] = ', '.join(upstream_ids)

        # Update downstream_ids
        if 'downstream_ids' in item and item['downstream_ids']:
            downstream_ids = [did.strip().replace('CMDBCl', 'CMDBCI') for did in item['downstream_ids'].split(',')]
            item['downstream_ids'] = ', '.join(downstream_ids)
    return data

# ci_db = update_ids(ci_db)
ci_db

[{'id': 'CMDBCI0001',
  'name': 'Web Server 01',
  'description': 'Primary web server for customer portal',
  'host': 'web01.example.com',
  'owner': 'IT Operations',
  'upstream_ids': 'CMDBCI00003, CMDBCI00010',
  'downstream_ids': 'CMDBCI00002, CMDBCI00009',
  'created_date': '2024-07-15 10:00:00',
  'updated_date': '2024-07-15 10:00:00'},
 {'id': 'CMDBCI0002',
  'name': 'Database Server 02',
  'description': 'Secondary database server for application data',
  'host': 'db02.example.com',
  'owner': 'Database Admins',
  'upstream_ids': 'CMDBCI00001',
  'downstream_ids': 'CMDBCI00011, CMDBCI00013',
  'created_date': '2024-08-01 14:30:00',
  'updated_date': '2024-08-01 14:30:00'},
 {'id': 'CMDBCI0003',
  'name': 'Network Switch 03',
  'description': 'Core network switch in the data center',
  'host': 'switch03.example.com',
  'owner': 'Network Engineers',
  'upstream_ids': '',
  'downstream_ids': 'CMDBCI00001, CMDBCI00004, CMDBCI00006, CMDBCI00010',
  'created_date': '2024-09-10 09:15:0

In [24]:
def get_upstream_dependencies_recursive(data, cmdbci_id):
    """
    Recursively finds all upstream dependent IDs for a given CMDBCI ID.

    Args:
        data: A list of dictionaries representing the CMDB data.
        cmdbci_id: The ID for which to find upstream dependencies.

    Returns:
        A set of all upstream dependent IDs.
    """
    dependencies = set()
    item_map = {item['id']: item for item in data}

    def _find_upstream(current_id, visited):
        if current_id in visited:
            return set()
        visited.add(current_id)
        upstream_deps = set()
        
        item = item_map.get(current_id)
        if item:
            upstream_ids = item.get('upstream_ids', '').split(',')
            upstream_ids = [uid.strip() for uid in upstream_ids if uid.strip()]
            
            for upstream_id in upstream_ids:
                if upstream_id and upstream_id not in visited:
                    upstream_deps.add(upstream_id)
                    upstream_deps.update(_find_upstream(upstream_id, visited))
                    
        return upstream_deps

    return _find_upstream(cmdbci_id, set())

# Example usage:
target_id = 'CMDBCI00021'
upstream_ids = get_upstream_dependencies_recursive(ci_db, target_id)
print(f"Upstream dependencies for {target_id}: {upstream_ids}")



Upstream dependencies for CMDBCI00021: {'CMDBCI00022', 'CMDBCI00008', 'CMDBCI00023'}
