In [None]:
%pip install semantic-kernel azure-ai-projects azure-identity

In [None]:
import os
import time
import uuid
import re
import json
from typing import Any, Dict
from pprint import pprint

from openai import OpenAI
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
from synapse.ml.mlflow import get_mlflow_env_config

from azure.identity import ClientSecretCredential
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import AzureAISearchTool, FunctionTool, RequiredFunctionToolCall, SubmitToolOutputsAction, ToolOutput

from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.functions import kernel_function

# Azure AI Project credentials
credential = ClientSecretCredential(
    tenant_id="YOUR-TENANT-ID",
    client_id="YOUR-CLIENT-ID",
    client_secret="YOUR-CLIENT-SECRET"
)

project_connection_string = "YOUR-PROJECT-CONNECTION-STRING"
model_deployment_name = "gpt-4o"

project_client = AIProjectClient.from_connection_string(
    credential=credential,
    conn_str=project_connection_string
)

In [None]:
async def create_agent(
    client,
    name,
    instructions,
    tools=None,
    tool_resources=None,
    plugins=None,  # <- Optional Parameter
    model=None     # <- To override models for agents if needed.
):
    agent_definition = await client.agents.create_agent(
        model=model or model_deployment_name,
        name=name,
        instructions=instructions,
        tools=tools,
        tool_resources=tool_resources,
        headers={"x-ms-enable-preview": "true"},
    )

    return AzureAIAgent(
        client=client,
        definition=agent_definition,
        plugins=plugins  # <- Created in the class if receive from request
    )

In [None]:
class FabricFunctionAISkill:
    @kernel_function(description="Provides a list of specials from the menu.")
    def call_fabric_ai_skill(self, question: str) -> str:
        configs = get_mlflow_env_config()
        base_url = "YOUR-FABRIC-BASE-URL-FROM-PUBLISH"

        class FabricOpenAI(OpenAI):
            def __init__(self, api_version: str = "2024-05-01-preview", **kwargs: Any) -> None:
                self.api_version = api_version
                default_query = kwargs.pop("default_query", {})
                default_query["api-version"] = self.api_version
                super().__init__(api_key="", base_url=base_url, default_query=default_query, **kwargs)

            def _prepare_options(self, options: FinalRequestOptions) -> None:
                headers: Dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {}
                headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
                headers.setdefault("Accept", "application/json")
                headers.setdefault("ActivityId", str(uuid.uuid4()))
                options.headers = headers
                return super()._prepare_options(options)

        fabric_client = FabricOpenAI()
        assistant = fabric_client.beta.assistants.create(model="not used")
        thread = fabric_client.beta.threads.create()
        fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
        run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)

        while run.status in ["queued", "in_progress"]:
            run = fabric_client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
            time.sleep(2)

        response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
        answer = next((m.content[0].text.value for m in response if m.role == "assistant"), None)
        fabric_client.beta.threads.delete(thread_id=thread.id)

        return json.dumps({"answer": answer if answer else "No response from Fabric AI Skill."})

In [None]:
AISKill_plugin= FabricFunctionAISkill()
functions = FunctionTool(functions={FabricFunctionAISkill.call_fabric_ai_skill})

sk_client = AzureAIAgent.create_client(
    credential=credential,
    conn_str=project_connection_string
)

thread = await sk_client.agents.create_thread()

# Coordinator agents
router_instructions = (
    "You are a routing coordinator. Your only task is to route the user query to the correct agents by generating a JSON object. "
    "The JSON should map agent names to sub-queries. "
    "The valid agents are: "
    "- CustomerInfoAgent: Handles questions related to data and customer information. "
    "- MarketingAgent: Expert in marketing campaigns and slogans. "
    "- LoyaltyProgramsAgent: Expert in Power Platform licensing. "
    "- ChitChatAgent: Handles informal, social, or general-purpose questions. "
    "IMPORTANT: Do not answer the user's question yourself. ONLY return a valid JSON object mapping the agents to sub-queries. "
    "Example: {\"MarketingAgent\": \"Suggest a slogan for...\", \"CustomerInfoAgent\": \"What data sources are available?\"} "
    "If none of the agents are a good match, route the full question to ChitChatAgent as fallback. "
    "Never include explanations, comments, or assistant-like messages."
)

routing_agent = await create_agent(
    sk_client,
    "RoutingAgent",
    router_instructions
)

synthesis_agent = await create_agent(
    sk_client,
    "SynthesisAgent",
    "You are a synthesis agent. Combine responses from other agents into a clear, friendly answer. Don't include any JSON."
)

# Define FunctionTool for Fabric AI Skill
AISKill_plugin = FabricFunctionAISkill()
functions = FunctionTool(functions={FabricFunctionAISkill.call_fabric_ai_skill})

# Creating the Agent
CustomerAgent = await create_agent(
    sk_client,
    "CustomerInfoAgent",
    "You are responsible for answering customer-related questions using an internal AI Skill. Please, try to use your tool any time your receive a question.",
    tools=functions.definitions,
    plugins=[AISKill_plugin]
)

# Create the agent settings
agent_settings = AzureAIAgentSettings.create(
    model_deployment_name="gpt-4o"
)

# MarketingAgent (creative slogans)
marketing_agent = await create_agent(
    sk_client,
    "MarketingAgent",
    "You are a creative marketer. Generate catchy slogans and campaign messages."
)

# LoyaltyProgramsAgent (Azure AI Search)
search_index_name = "YOUR-AI-SEARCH-INDEX-NAME"
conn_list = project_client.connections._list_connections()["value"]
search_connection_id = ""

for conn in conn_list:
    metadata = conn["properties"].get("metadata", {})
    if metadata.get("type", "").upper() == "AZURE_AI_SEARCH":
        search_connection_id = conn["id"]
        break

ai_search_tool = AzureAISearchTool(
    index_connection_id=search_connection_id,
    index_name=search_index_name
)

loyalty_agent = await create_agent(
    sk_client,
    "LoyaltyProgramsAgent",
    "You are an expert in loyalty programs. Use Azure AI Search to retrieve relevant information.",
    tools=ai_search_tool.definitions,
    tool_resources=ai_search_tool.resources
)

# ChitChatAgent (default fallback)
chitchat_agent = await create_agent(
    sk_client,
    "ChitChatAgent",
    "You are a friendly assistant. Handle general, informal, or social questions not related to business-specific topics."
)

In [None]:
user_message = "Could you give a me a list of 5 customers? And give me too details about power platform licensing."

In [None]:
# Explicitally send the User Message  to Routing Agent
user_chat_message = ChatMessageContent(role=AuthorRole.USER, content=user_message)

routing_response = ""
async for chunk in routing_agent.invoke(thread_id=thread.id, messages=[user_chat_message]):
    if hasattr(chunk, "content"):
        routing_response += str(chunk.content)

print("Routing agent response:")
print(routing_response)

# Clean the Json
cleaned = re.sub(r"^```json\n|```$", "", routing_response.strip(), flags=re.MULTILINE)
routing_dict = json.loads(cleaned)
agent_outputs = {}

# Generates subqueries to domains expert agents
for agent_name, subquery in routing_dict.items():
    agent = {
        "CustomerInfoAgent": CustomerAgent,
        "MarketingAgent": marketing_agent,
        "LoyaltyProgramsAgent": loyalty_agent,
        "ChitChatAgent": chitchat_agent
    }.get(agent_name)

    if agent:
        agent_thread = await sk_client.agents.create_thread()

        # Generates threaths to each domains expert agents
        user_subquery_message = ChatMessageContent(role=AuthorRole.USER, content=subquery)

        response_text = ""
        async for chunk in agent.invoke(thread_id=agent_thread.id, messages=[user_subquery_message]):
            if hasattr(chunk, "content"):
                response_text += str(chunk.content)

        # Print the individual responses of the agents
        print(f"\n[{agent_name}] response:")
        print(response_text)

        agent_outputs[agent_name] = response_text

        # Add the domain axpert agents response to the main thread.id
        await synthesis_agent.add_chat_message(
            thread_id=thread.id,
            message=ChatMessageContent(role=AuthorRole.ASSISTANT, content=response_text)
        )

# Generates the question to the syntethizer agent.
synthesis_input = json.dumps({"user_query": user_message, "agent_responses": agent_outputs}, indent=2)
synthesis_chat_message = ChatMessageContent(role=AuthorRole.USER, content=synthesis_input)

synth_response = ""
async for chunk in synthesis_agent.invoke(thread_id=thread.id, messages=[synthesis_chat_message]):
    if hasattr(chunk, "content"):
        synth_response += str(chunk.content)

print("\nFinal synthesized response:")
print(synth_response)

In [None]:
# Obtener listado completo de agentes
agents_response = await sk_client.agents.list_agents()

# Borrar todos los agentes existentes
for agent in agents_response['data']:
    agent_id = agent['id']
    await sk_client.agents.delete_agent(agent_id)
    print(f"Deleted agent: {agent['name']} (ID: {agent_id})")