https://microsoftlearning.github.io/mslearn-ai-agents/

https://github.com/MicrosoftLearning/mslearn-ai-agents

https://microsoft.github.io/build-your-first-agent-with-azure-ai-agent-service-workshop/getting-started/

In [None]:
##Install the required dependencies
!pip install python-dotenv azure-identity azure-ai-projects azure-ai-agents uvicorn
!pip install --upgrade azure-ai-agents --pre

In [1]:
## Login to your personal azure acccount and check the subscription id [ Should be subscription id with free credit]
!az login

[
  {
    "cloudName": "AzureCloud",
    "homeTenantId": "1e05f3ac-96d5-4115-bcd6-2c4470c80b8e",
    "id": "f58341f3-c5c4-412f-9310-371c124471bb",
    "isDefault": true,
    "managedByTenants": [],
    "name": "Azure subscription 1",
    "state": "Enabled",
    "tenantDefaultDomain": "ajayhazratraining261outlook.onmicrosoft.com",
    "tenantDisplayName": "Default Directory",
    "tenantId": "1e05f3ac-96d5-4115-bcd6-2c4470c80b8e",
    "user": {
      "name": "ajay-hazra-training261@outlook.com",
      "type": "user"
    }
  }
]




In [None]:
############################################
# Determining the priority, efforts and team assignment for support tickets using connected agents solution 
# WITHOUT MCP
############################################

import os
from dotenv import load_dotenv

from azure.ai.agents import AgentsClient
from azure.ai.agents.models import ConnectedAgentTool, MessageRole, ListSortOrder
from azure.identity import DefaultAzureCredential

# Clear the console
os.system('cls' if os.name == 'nt' else 'clear')

# Load environment variables
load_dotenv()
project_endpoint = os.getenv("PROJECT_ENDPOINT")
model_deployment = os.getenv("MODEL_DEPLOYMENT_NAME")

# Connect to the agents client
agents_client = AgentsClient(
    endpoint=project_endpoint,
    credential=DefaultAzureCredential(
        exclude_environment_credential=True,
        exclude_managed_identity_credential=True
    ),
)

##Agent Definitions:
####################
# priority_agent
####################
priority_agent_name = "priority_agent"
priority_agent_instructions = """
    Assess how urgent a ticket is based on its description.

    Respond with one of the following levels:
    - High: User-facing or blocking issues
    - Medium: Time-sensitive but not breaking anything
    - Low: Cosmetic or non-urgent tasks

    Only output the urgency level and a very brief explanation.
  """
####################
# team_agent
####################
team_agent_name = "team_agent"
team_agent_instructions = """
    Decide which team should own each ticket.

    Choose from the following teams:
    - Frontend
    - Backend
    - Infrastructure
    - Marketing

    Base your answer on the content of the ticket. Respond with the team name and a very brief explanation.
    """

####################
# effort_agent
####################

effort_agent_name = "effort_agent"
effort_agent_instructions = """
    Estimate how much work each ticket will require.

    Use the following scale:
    - Small: Can be completed in a day
    - Medium: 2-3 days of work
    - Large: Multi-day or cross-team effort

    Base your estimate on the complexity implied by the ticket. Respond with the effort level and a brief justification.
    """
####################
# triage-agent
####################

triage_agent_name = "triage-agent"
triage_agent_instructions = """
Triage the given ticket. Use the connected tools to determine the ticket's priority, 
which team it should be assigned to, and how much effort it may take.
"""

with agents_client:

####################
# Creation of Agents
####################
    
    # Create an agent to prioritize support tickets
    priority_agent = agents_client.create_agent(
        model=model_deployment,
        name=priority_agent_name,
        instructions=priority_agent_instructions
    )
    print(f"[INFO] Created agent: {priority_agent.name} (id={priority_agent.id})")

    # Create an agent to assign tickets to the appropriate team
    team_agent = agents_client.create_agent(
        model=model_deployment,
        name=team_agent_name,
        instructions=team_agent_instructions,
        description="Determines which team should take the ticket"
    )
    print(f"[INFO] Created agent: {team_agent.name} (id={team_agent.id})")
    
    # Create an agent to estimate effort for a support ticket
    effort_agent = agents_client.create_agent(
        model=model_deployment,
        name=effort_agent_name,
        instructions=effort_agent_instructions,
        description="Determines the effort required to complete the ticket"
    )
    print(f"[INFO] Created agent: {effort_agent.name} (id={effort_agent.id})")


    #Create and agent to check the priority of the tciket
    priority_agent_tool = ConnectedAgentTool(
        id=priority_agent.id, 
        name=priority_agent_name, 
        description="Assess the priority of a ticket"
    )


    team_agent_tool = ConnectedAgentTool(
        id=team_agent.id, 
        name=team_agent_name, 
        description="Determines which team should take the ticket"
    )
        
    effort_agent_tool = ConnectedAgentTool(
        id=effort_agent.id, 
        name=effort_agent_name, 
        description="Determines the effort required to complete the ticket"
    )

    triage_agent = agents_client.create_agent(
        model=model_deployment,
        name=triage_agent_name,
        instructions=triage_agent_instructions,
        tools=[
            priority_agent_tool.definitions[0],
            team_agent_tool.definitions[0],
            effort_agent_tool.definitions[0]
        ]
    )
    

    effort_agent = agents_client.create_agent(
        model=model_deployment,
        name="effort_agent",
        instructions="Estimate effort required for the ticket."
    )
    print(f"[INFO] Created agent: {effort_agent.name} (id={effort_agent.id})")

    # Connect tools
    priority_tool = ConnectedAgentTool(id=priority_agent.id, name=priority_agent.name, description="Assess priority")
    team_tool = ConnectedAgentTool(id=team_agent.id, name=team_agent.name, description="Assign team")
    effort_tool = ConnectedAgentTool(id=effort_agent.id, name=effort_agent.name, description="Estimate effort")
    print("[INFO] Connected agent tools created.")

    # Create triage agent
    triage_agent = agents_client.create_agent(
        model=model_deployment,
        name="triage_agent",
        instructions="Triage the ticket using connected tools.",
        tools=[
            priority_tool.definitions[0],
            team_tool.definitions[0],
            effort_tool.definitions[0]
        ]
    )
    print(f"[INFO] Created triage agent: {triage_agent.name} (id={triage_agent.id})")

    # Create thread
    thread = agents_client.threads.create()
    print(f"[INFO] Thread created: id={thread.id}")
    # User prompt
    prompt = input("\nWhat's the support problem you need to resolve?: ")
    message = agents_client.messages.create(
        thread_id=thread.id,
        role=MessageRole.USER,
        content=prompt,
    )
    print(f"[INFO] User message sent: {message.id} content='{prompt}'")

    # Run triage agent
    print("[INFO] Running triage agent...")
    run = agents_client.runs.create_and_process(thread_id=thread.id, agent_id=triage_agent.id)
    print(f"[INFO] Run status: {run.status}")
    if run.status == "failed":
        print(f"[INFO] Run failed: {run.last_error}")

    # Print all messages in order
    print("\n[INFO] Conversation transcript:")
    messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
    for msg in messages:
        if msg.text_messages:
            for text_msg in msg.text_messages:
                print(f"{msg.role}: {text_msg.text.value}")

    print("\n[INFO] End of interaction.")


In [6]:
# Multi-Agent Ticket Triage Workflow
# User
#  ↓
# Triage Agent
#  ├─ Priority Agent
#  ├─ Team Agent
#  ├─ Effort Agent
#  └─ MS Learn Agent (only if docs/QnA needed)
#         └─ MCP (learn.microsoft.com)

import os
from dotenv import load_dotenv

from azure.identity import DefaultAzureCredential
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
    ConnectedAgentTool,
    ToolSet,
    McpTool,
    MessageRole,
    ListSortOrder
)

# ------------------------------------------------------------
# Load config
# ------------------------------------------------------------
load_dotenv()

PROJECT_ENDPOINT = os.getenv("PROJECT_ENDPOINT")
MODEL_DEPLOYMENT = os.getenv("MODEL_DEPLOYMENT_NAME")

# ------------------------------------------------------------
# Client
# ------------------------------------------------------------
agents_client = AgentsClient(
    endpoint=PROJECT_ENDPOINT,
    credential=DefaultAzureCredential(
        exclude_environment_credential=True,
        exclude_managed_identity_credential=True
    )
)

# ------------------------------------------------------------
# Instructions
# ------------------------------------------------------------
PRIORITY_INSTRUCTIONS = """
Assess ticket urgency.
Return: High, Medium, or Low with a short reason.
"""

TEAM_INSTRUCTIONS = """
Assign the ticket to:
Frontend, Backend, Infrastructure, or Marketing.
Return team + reason.
"""

EFFORT_INSTRUCTIONS = """
Estimate effort:
Small (1 day), Medium (2–3 days), Large (multi-day).
Return effort + reason.
"""

MSLEARN_AGENT_INSTRUCTIONS = """
You answer questions using Microsoft Learn documentation.

Use the MCP tool to search learn.microsoft.com
ONLY when the question is about:
- Azure services
- Microsoft products
- Official documentation
- How-to or conceptual learning questions

Return a concise factual answer with references if possible.
"""

TRIAGE_INSTRUCTIONS = """
You are a triage orchestrator.

Always:
1. Determine priority
2. Assign the team
3. Estimate effort

ONLY IF the ticket contains:
- a how-to question
- documentation clarification
- Azure or Microsoft Learn related QnA

THEN call the Microsoft Learn agent.

Otherwise, do NOT call it.

Return a structured final response.
"""

# ------------------------------------------------------------
# MCP Tool (used ONLY by MS Learn agent)
# ------------------------------------------------------------
mcp_tool = McpTool(
    server_label="mslearn",
    server_url="https://learn.microsoft.com/api/mcp"
)
mcp_tool.set_approval_mode("never")

toolset = ToolSet()
toolset.add(mcp_tool)

# ------------------------------------------------------------
# Create agents
# ------------------------------------------------------------
with agents_client:
    print("\n[INFO] Creating agents...")

    priority_agent = agents_client.create_agent(
        model=MODEL_DEPLOYMENT,
        name="priority_agent",
        instructions=PRIORITY_INSTRUCTIONS
    )

    team_agent = agents_client.create_agent(
        model=MODEL_DEPLOYMENT,
        name="team_agent",
        instructions=TEAM_INSTRUCTIONS
    )

    effort_agent = agents_client.create_agent(
        model=MODEL_DEPLOYMENT,
        name="effort_agent",
        instructions=EFFORT_INSTRUCTIONS
    )

    # --------------------------------------------
    # MS Learn Agent (HAS MCP)
    # --------------------------------------------
    mslearn_agent = agents_client.create_agent(
        model=MODEL_DEPLOYMENT,
        name="mslearn_agent",
        instructions=MSLEARN_AGENT_INSTRUCTIONS
    )

    print("[INFO] Base agents created")

    # --------------------------------------------------------
    # Connected agent tools
    # --------------------------------------------------------
    priority_tool = ConnectedAgentTool(
        id=priority_agent.id,
        name="priority_agent",
        description="Assess ticket priority"
    )

    team_tool = ConnectedAgentTool(
        id=team_agent.id,
        name="team_agent",
        description="Assign owning team"
    )

    effort_tool = ConnectedAgentTool(
        id=effort_agent.id,
        name="effort_agent",
        description="Estimate effort"
    )

    mslearn_tool = ConnectedAgentTool(
        id=mslearn_agent.id,
        name="mslearn_agent",
        description="Answer Microsoft Learn / Azure documentation questions"
    )

    # --------------------------------------------------------
    # TRIAGE AGENT (Orchestrator)
    # --------------------------------------------------------
    triage_agent = agents_client.create_agent(
        model=MODEL_DEPLOYMENT,
        name="triage_agent",
        instructions=TRIAGE_INSTRUCTIONS,
        tools=[
            priority_tool.definitions[0],
            team_tool.definitions[0],
            effort_tool.definitions[0],
            mslearn_tool.definitions[0]
        ]
    )

    print(f"[INFO] Triage agent created: {triage_agent.id}")

    # --------------------------------------------------------
    # Thread + user input
    # --------------------------------------------------------
    thread = agents_client.threads.create()

    user_prompt = input("\nDescribe the support ticket:\n> ")

    agents_client.messages.create(
        thread_id=thread.id,
        role=MessageRole.USER,
        content=user_prompt
    )

    # --------------------------------------------------------
    # Run (IMPORTANT: MCP toolset only here)
    # --------------------------------------------------------
    run = agents_client.runs.create_and_process(
        thread_id=thread.id,
        agent_id=triage_agent.id,
        toolset=toolset
    )

    print(f"\n[INFO] Run status: {run.status}")
    if run.status == "failed":
        print(run.last_error)

    # --------------------------------------------------------
    # Output
    # --------------------------------------------------------
    print("\n========== FINAL RESULT ==========")

    messages = agents_client.messages.list(
        thread_id=thread.id,
        order=ListSortOrder.ASCENDING
    )

    for msg in messages:
        if msg.text_messages:
            for t in msg.text_messages:
                print(f"{msg.role.upper()}: {t.text.value}")

    print("=================================\n")
    print("\n[INFO] Interaction complete.")



[INFO] Creating agents...
[INFO] Base agents created
[INFO] Triage agent created: asst_7JCWO50HSsGSuKVEI3ZRTMPI

[INFO] Run status: RunStatus.COMPLETED

USER: How can i create Azure VM using CLI and what are the latest VM types available
ASSISTANT: Priority: High - Creating Azure VMs and understanding the latest VM types are common tasks that are often needed promptly for development or production environments.

Team Assigned: Cloud Infrastructure Team - They specialize in Azure resources, virtual machines, and CLI operations.

Estimated Effort: Moderate - Requires providing step-by-step Azure CLI commands for VM creation and summarizing the latest VM types. 

---
How to create Azure VM using CLI:

1. Log in to Azure CLI:
```
az login
```

2. Create a resource group (replace 'myResourceGroup' and 'eastus' with your values):
```
az group create --name myResourceGroup --location eastus
```

3. Create a VM (replace variables accordingly). Example for Windows VM:
```
vmname="myVM"
usernam

In [None]:
##Clean up

from dotenv import load_dotenv

from azure.ai.agents import AgentsClient
from azure.identity import DefaultAzureCredential

# Clear the console
os.system('cls' if os.name == 'nt' else 'clear')

# Load environment variables from .env file
load_dotenv()
project_endpoint = os.getenv("PROJECT_ENDPOINT")

# Connect to the agents client
agents_client = AgentsClient(
    endpoint=project_endpoint,
    credential=DefaultAzureCredential(
        exclude_environment_credential=True,
        exclude_managed_identity_credential=True
    ),
)

with agents_client:
    # List all agents in the project
    agents = agents_client.list_agents()
    print("Agents found in project:")
    for agent in agents:
        print(f"- {agent.name} (id={agent.id})")

    # Delete all agents
    for agent in agents:
        print(f"Deleting agent: {agent.name} (id={agent.id})")
        agents_client.delete_agent(agent.id)

    print("All agents deleted successfully.")

# Azure AI Agents using A2A

In [None]:
!pip install httpx fastapi  uvicorn starlette sse-starlette a2a-sdk --force-reinstall

In [None]:
import os
import asyncio
import uvicorn
from azure.ai.agents import AgentsClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import ListSortOrder, MessageRole
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, Part, TaskState
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.routing import Route
from a2a.utils import new_agent_text_message
from a2a.server.tasks import TaskUpdater
from a2a.server.events.event_queue import EventQueue


# Define agent creation function
async def create_agent():
    client = AgentsClient(
        endpoint=os.environ["PROJECT_ENDPOINT"],
        credential=DefaultAzureCredential(exclude_environment_credential=True, exclude_managed_identity_credential=True)
    )

    agent = await client.create_agent(
        model=os.environ["MODEL_DEPLOYMENT_NAME"],
        name="title-agent",
        instructions="Given a topic, generate a catchy blog post title."
    )
    return agent


# Define agent execution logic
async def execute_agent(user_message: str):
    client = AgentsClient(
        endpoint=os.environ["PROJECT_ENDPOINT"],
        credential=DefaultAzureCredential(exclude_environment_credential=True, exclude_managed_identity_credential=True)
    )
    
    agent = await create_agent()
    thread = await client.threads.create()  # Create a thread for the chat session
    await client.messages.create(thread_id=thread.id, role=MessageRole.USER, content=user_message)
    run = await client.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)

    if run.status == 'failed':
        print(f'Title Agent: Run failed - {run.last_error}')
        return [f'Error: {run.last_error}']

    messages = await client.messages.list(thread_id=thread.id, order=ListSortOrder.DESCENDING)
    responses = [msg.text.value for msg in messages if msg.role == MessageRole.AGENT and msg.text_messages]
    return responses or ['No response received']


# Function to process requests
async def process_request(message_parts: list[Part], context_id: str, task_updater: TaskUpdater):
    try:
        # Extract user message
        user_message = message_parts[0].root.text
        responses = await execute_agent(user_message)

        # Update task status to "working"
        await task_updater.update_status(TaskState.working, message="Processing...")

        # Simulate processing delay
        await asyncio.sleep(1)

        # Complete the task with the response from the agent
        await task_updater.complete(message=responses[0])

    except Exception as e:
        print(f'Error processing request: {e}')
        await task_updater.failed(
            message=new_agent_text_message("Title Agent failed to process the request.", context_id=context_id)
        )


# Function to execute the agent's task (like an executor)
async def execute_agent_task(context: RequestContext, event_queue: EventQueue):
    updater = TaskUpdater(event_queue, context.task_id, context.context_id)
    await updater.submit()

    # Start working on the task
    await updater.start_work()

    # Process the request using the provided message parts
    await process_request(context.message.parts, context.context_id, updater)


# Setup the Starlette server and routes
async def health_check(request: Request) -> PlainTextResponse:
    return PlainTextResponse('AI Foundry Title Agent is running!')


# Define the agent card
agent_card = AgentCard(
    name='AI Foundry Title Agent',
    description='Generates catchy titles for blog posts.',
    url=f'http://127.0.0.1:8000/',
    version='1.0.0',
    default_input_modes=['text'],
    default_output_modes=['text'],
    capabilities=AgentCapabilities(),
    skills=[AgentSkill(id="generate_blog_title", name="Generate Blog Title", description="Generates a catchy title.", tags=["title"])]
)


# Create request handler with the custom executor
request_handler = DefaultRequestHandler(
    agent_executor=execute_agent_task,  # Pass in the function to handle the agent task
    task_store=InMemoryTaskStore()
)


# Create the A2A Starlette application
a2a_app = A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)


# Define routes and add health check endpoint
routes = [Route("/health", health_check)]
routes.extend(a2a_app.routes())

# Create the Starlette app
app = Starlette(routes=routes)

# Run the server using Uvicorn
def run_uvicorn():
    config = uvicorn.Config(app, host="127.0.0.1", port=8000)
    server = uvicorn.Server(config)
    loop = asyncio.get_event_loop()
    loop.create_task(server.serve())

# Now we run the server in an asynchronous environment
run_uvicorn()


INFO:     Started server process [10616]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


In [None]:
import asyncio
import uuid
import httpx
from a2a.client import A2AClient
from a2a.types import AgentCard, SendMessageRequest, SendMessageResponse

# Function to send a message to the remote agent and get a response
async def send_message_to_agent(agent_url: str, agent_card: AgentCard, user_message: str) -> str:
    """Function to send a message to the remote agent and get a response."""
    try:
        # Initialize HTTP client and A2A client
        async with httpx.AsyncClient(timeout=30) as client:
            agent_client = A2AClient(client, agent_card, url=agent_url)
            
            # Generate a unique message ID
            message_id = str(uuid.uuid4())
            
            # Create the message request
            message_request = SendMessageRequest(
                id=message_id,  # Include the id field
                params={  # Add the params field as required by the API
                    "message": {  # Ensure the message field is included
                        "messageId": message_id,  # Include the messageId field here
                        "role": "user",  # Specify the role (user in this case)
                        "parts": [{"kind": "text", "text": user_message}],  # Include the message part
                    }
                }
            )

            # Send the message and await response
            response = await agent_client.send_message(message_request)

            # Check if the response was successful
            if isinstance(response.root, SendMessageResponse):
                return f"Agent Response: {response.root.result}"
            else:
                return "Failed to get a valid response from the agent."
    except Exception as e:
        return f"Error during agent interaction: {e}"

# Function to run the test with a sample message
async def run_test():
    # Set up the agent card for the remote agent
    from a2a.client import A2ACardResolver
    import httpx

    agent_url = "http://127.0.0.1:8000" 
    async with httpx.AsyncClient(timeout=30) as client:
            card_resolver = A2ACardResolver(client, agent_url)
            try:
                card = await card_resolver.get_agent_card()
                print(card)
            except Exception as e:
                print(f"Error retrieving agent card: {e}")


            # Send a message to the agent and print the result
            result = await send_message_to_agent(agent_url, card, "Test message for triage agent.")
            print(result)

# Run the test inside the Jupyter Notebook cell
await run_test()


In [1]:
!az login

[
  {
    "cloudName": "AzureCloud",
    "homeTenantId": "1e05f3ac-96d5-4115-bcd6-2c4470c80b8e",
    "id": "f58341f3-c5c4-412f-9310-371c124471bb",
    "isDefault": true,
    "managedByTenants": [],
    "name": "Azure subscription 1",
    "state": "Enabled",
    "tenantDefaultDomain": "ajayhazratraining261outlook.onmicrosoft.com",
    "tenantDisplayName": "Default Directory",
    "tenantId": "1e05f3ac-96d5-4115-bcd6-2c4470c80b8e",
    "user": {
      "name": "ajay-hazra-training261@outlook.com",
      "type": "user"
    }
  }
]




In [None]:
#### Root Cause Analysis Agent ####
""" Azure AI Foundry Agent that analyzes system logs to identify root causes of issues. """

import os

from azure.ai.agents import AgentsClient
from azure.ai.agents.models import Agent, MessageRole, ListSortOrder
from azure.identity import DefaultAzureCredential

class RootCauseAnalysisAgent:

    def __init__(self):

        # Create the agents client
        self.client = AgentsClient(
            endpoint=os.environ['PROJECT_ENDPOINT'],
            credential=DefaultAzureCredential(
                exclude_environment_credential=True,
                exclude_managed_identity_credential=True
            )
        )

        self.agent: Agent | None = None

    async def create_agent(self) -> Agent:
        if self.agent:
            return self.agent

        # Create the title agent
        self.agent = self.client.create_agent(
            model=os.environ['MODEL_DEPLOYMENT_NAME'],
            name='root-cause-analysis-agent',
            instructions="""
            You are a system log analysis assistant.
            Given system logs, identify potential root causes of issues. Share the root cause analysis in a structured format.
            Always provide clear and concise explanations.
            """,
        )
        return self.agent

    async def run_conversation(self, user_message: str) -> list[str]:
        if not self.agent:
            await self.create_agent()

        # Create a thread for the chat session
        thread = self.client.threads.create()

        # Send user message
        self.client.messages.create(thread_id=thread.id, role=MessageRole.USER, content=user_message)

        # Create and run the agent
        run = self.client.runs.create_and_process(thread_id=thread.id, agent_id=self.agent.id)

        if run.status == 'failed':
            print(f'Root Cause Analysis Agent: Run failed - {run.last_error}')
            return [f'Error: {run.last_error}']

        # Get response messages
        messages = self.client.messages.list(thread_id=thread.id, order=ListSortOrder.DESCENDING)
        responses = []
        for msg in messages:
            # Only get the latest assistant response
            if msg.role == 'assistant' and msg.text_messages:
                for text_msg in msg.text_messages:
                    responses.append(text_msg.text.value)
                break 

        return responses if responses else ['No response received']

async def create_foundry_RCA_agent() -> RootCauseAnalysisAgent:
    agent = RootCauseAnalysisAgent()
    await agent.create_agent()
    return agent



#===============================================
# Execute the Root Cause Analysis Agent
#===============================================



""" Azure AI Foundry Agent that generates an outline """

from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import AgentCard, Part, TaskState
from a2a.utils.message import new_agent_text_message
#from outline_agent.agent import OutlineAgent, create_foundry_outline_agent

# An AgentExecutor that runs Azure AI Foundry-based agents. Adapted from the ADK agent executor pattern.
class RootCauseAnalysisAgentExecutor(AgentExecutor):

    def __init__(self, card: AgentCard):
        self._card = card
        self._foundry_agent: RootCauseAnalysisAgent | None = None

    async def _get_or_create_agent(self) -> RootCauseAnalysisAgent:
        if not self._foundry_agent:
            self._foundry_agent = await create_foundry_RCA_agent()
        return self._foundry_agent

    async def _process_request(self, message_parts: list[Part], context_id: str, task_updater: TaskUpdater) -> None:
        # Process a user request through the Foundry agent

        try:
            # Retrieve message from A2A parts
            user_message = message_parts[0].root.text

            # Get the RCA agent
            agent = await self._get_or_create_agent()

            # Update the task status
            await task_updater.update_status(
                TaskState.working,
                message=new_agent_text_message('RCA Agent is processing your request...', context_id=context_id)
            )

            # Run the conversation
            responses = await agent.run_conversation(user_message)

            # Update the task with responses
            for response in responses:
                await task_updater.update_status(
                    TaskState.working,
                    message=new_agent_text_message( response, context_id=context_id)
                )

            # Mark the task as complete
            final_message = responses[-1] if responses else 'Task completed.'
            await task_updater.complete(
                message=new_agent_text_message(final_message, context_id=context_id)
            )

        except Exception as e:
            await task_updater.failed(
                message=new_agent_text_message('RCA Agent failed to process the request.', 
                context_id=context_id)
            )

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        
        # Create task updater
        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        await updater.submit()

        # Start working
        await updater.start_work()

        # Process the request
        await self._process_request(context.message.parts, context.context_id, updater)

    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        print(f'RCA Agent: Cancelling execution for context {context.context_id}')

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        await updater.failed(
            message=new_agent_text_message('Task cancelled by user', context_id=context.context_id)
        )

def create_foundry_agent_executor(card: AgentCard) -> RootCauseAnalysisAgentExecutor:
    return RootCauseAnalysisAgentExecutor(card)

##############################################
#Server Code to host the RCA Agent
###############################################
import os
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from dotenv import load_dotenv
#from outline_agent.agent_executor import create_foundry_agent_executor
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.routing import Route
import asyncio

load_dotenv()

#host = os.environ["SERVER_URL"]
#port = os.environ["TITLE_AGENT_PORT"]
host = "127.0.0.1"
port = "8001"

# Define agent skills
skills = [
    AgentSkill(
        id='Generate RCA',
        name='Generate RCA',
        description='Generates a root cause analysis based on a topic',
        tags=['rca'],
        examples=[
            'Can you analyze these system logs and provide a root cause analysis?',
        ],
    ),
]

# Create agent card
agent_card = AgentCard(
    name='AI Foundry RCA Agent',
    description='An intelligent root cause analysis agent powered by Azure AI Foundry. '
    'I can help you analyze system logs and provide root cause analysis.',
    url=f'http://{host}:{port}/',
    version='1.0.0',
    default_input_modes=['text'],
    default_output_modes=['text'],
    capabilities=AgentCapabilities(streaming=True),
    skills=skills,
)

# Create agent executor
agent_executor = RootCauseAnalysisAgentExecutor(agent_card)

# Create request handler
request_handler = DefaultRequestHandler(
    agent_executor=agent_executor, task_store=InMemoryTaskStore()
)

# Create A2A application
a2a_app = A2AStarletteApplication(
    agent_card=agent_card, http_handler=request_handler
)

# Get routes
routes = a2a_app.routes()

# Add health check endpoint
async def health_check(request: Request) -> PlainTextResponse:
    return PlainTextResponse('AI Foundry RCA Agent is running!')

routes.append(Route(path='/health', methods=['GET'], endpoint=health_check))

# Create Starlette app
app = Starlette(routes=routes)

# Run the server using Uvicorn
def run_uvicorn(app):
    config = uvicorn.Config(app, host="127.0.0.1", port=8000)
    server = uvicorn.Server(config)
    loop = asyncio.get_event_loop()
    loop.create_task(server.serve())

# Now we run the server in an asynchronous environment
run_uvicorn(app)


INFO:     Started server process [22424]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


In [37]:
import warnings

warnings.filterwarnings(
    "ignore",
    category=DeprecationWarning,
    message=".*A2AClient is deprecated.*"
)
    
    
    
    
from a2a.client import A2AClient
    
import httpx
#from a2a.client import A2AClient
from a2a.client import A2ACardResolver
import uuid
from a2a.types import (
    SendMessageRequest,
    
)
user_query = input("Enter the system logs for root cause analysis:\n")
agent_url = 'http://127.0.0.1:8000'
async with httpx.AsyncClient(timeout=30) as client:
                card_resolver = A2ACardResolver(client, agent_url)
                card = await card_resolver.get_agent_card()

                agent_client = A2AClient(httpx.AsyncClient(timeout=30), card, url=agent_url)
                message_id = str(uuid.uuid4())
                message_request = SendMessageRequest(
                    id=message_id,
                    params={
                        "message": {
                            "role": "user",
                            "parts": [{"kind": "text", "text": "Test message for RCA agent."}],
                            "messageId": message_id,
                        }
                    }
                )
                response = await agent_client.send_message(message_request)
                #print(response)
                #print(type(response))

                
                texts = []

                # Loop through all messages in history
                for message in response.root.result.history:
                    if hasattr(message, "parts") and message.parts:
                        for part in message.parts:
                            if hasattr(part.root, "text") and part.root.text:
                                texts.append(part.root.text)

                # Print all extracted texts
                for t in texts:
                    print(t)


Test message for RCA agent.
RCA Agent is processing your request...
Root Cause Analysis Report

Issue: Test message received for RCA agent

Summary:
- The input is a test message, not an actual system log or error report.

Root Cause:
- No issue detected as the message serves as a system functionality test.

Recommendations:
- Provide actual system log data or incident details for accurate root cause analysis.
- Ensure logs include timestamps, error codes, and relevant system components if available for best results.
