In [397]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [398]:
from dotenv import load_dotenv
load_dotenv(override=True)

True

In [399]:
import asyncio
import os
import sys

from typing import Annotated
from openai import AsyncAzureOpenAI

if sys.version_info >= (3, 12):
    from typing import override  # pragma: no cover
else:
    from typing_extensions import override  # pragma: no cover


from semantic_kernel.kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion, AzureChatCompletion
from semantic_kernel.agents import ChatCompletionAgent, AgentGroupChat, Agent
from semantic_kernel.agents.strategies import (
    KernelFunctionSelectionStrategy,
    KernelFunctionTerminationStrategy,
    DefaultTerminationStrategy
)

from semantic_kernel.functions import KernelFunctionFromPrompt
from semantic_kernel.agents.strategies.selection.selection_strategy import SelectionStrategy

from semantic_kernel.agents.open_ai import OpenAIAssistantAgent
from semantic_kernel.contents import AuthorRole, ChatMessageContent
from semantic_kernel.functions import kernel_function

In [400]:
import logging

logging.basicConfig(level=logging.INFO)

In [401]:
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
    credential, "https://cognitiveservices.azure.com/.default"
)


def _create_kernel(service_id: str = "default") -> Kernel:
    kernel = Kernel()

    client = AsyncAzureOpenAI(
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        azure_deployment=os.getenv("AZURE_OPENAI_MODEL"),
        azure_ad_token_provider=token_provider,
        api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
    )

    kernel.add_service(
        AzureChatCompletion(
            deployment_name=os.getenv("AZURE_OPENAI_MODEL"),
            async_client=client,
            service_id=service_id,
        )
    )

    return kernel

INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use IMDS
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use IMDS


In [402]:
from typing import ClassVar
from semantic_kernel.contents.history_reducer.chat_history_reducer import (
    ChatHistoryReducer,
)
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.kernel_pydantic import KernelBaseModel


class AgentChoiceResponse(KernelBaseModel):
    agent_id: Annotated[
        str,
        "Agent ID selected by the orchestrator. Must be a valid agent_id from the list of available agents.",
    ]
    reason: Annotated[str, "Reasoning behind the agent_id selection."]

prompt = """
You are a team orchestrator that uses a chat history to determine the next best speaker in the conversation.

Your task is to return the agent_id of the speaker that is best suited to proceed based on the context provided in the chat history and the description of the agents, in JSON format as shown in the example output section
- You MUST return agent_id value from the list of available agents.
- The names are case-sensitive and should not be abbreviated or changed.
- DO NOT change the structure of the output, only the values.
- You MUST provide a reason for the agent_id selection.
- DO NOT output any additional formatting or text.
- When a user input is expected, you MUST select an agent capable of handling the user input.
- When provided, you can also take a decision based on tools available to each agent
- When provided, you can also take a decision based on the allowed transitions between agents.


### Example Output
{{"agent_id": "agent_1", "reason": "Agent 1 is the best speaker for the next turn."}}


### Agents

{agents}


### Chat History

{history}


BE SURE TO READ AGAIN THE INSTUCTIONS ABOVE BEFORE PROCEEDING.       
        """

class SpeakerElectionStrategy(SelectionStrategy):

    kernel: Kernel
    history_reducer: ChatHistoryReducer | None = None
    include_tools_descriptions: bool = False,
    allowed_transitions: dict["Agent", list["Agent"]] | None = None,

    @override
    async def select_agent(
        self, agents: list["Agent"], history: list[ChatMessageContent]
    ) -> "Agent":

        if self.history_reducer is not None:
            self.history_reducer.messages = history
            reduced_history = await self.history_reducer.reduce()
            if reduced_history is not None:
                history = reduced_history.messages

        # Flatten the history
        messages = [
            {"role": str(message.role), "content": message.content, "name": message.name or "user"}
            for message in history
        ]

        agents_info = self._generate_agents_info(agents)

        # Invoke the function
        arguments = KernelArguments()
        arguments["agents"] = agents_info
        arguments["history"] = messages
        
        execution_settings =  {}
        # https://devblogs.microsoft.com/semantic-kernel/using-json-schema-for-structured-output-in-python-for-openai-models/
        execution_settings["response_format"] = AgentChoiceResponse

        input_prompt = prompt.format(agents=agents_info, history=messages)
        # logging.info(f"SpeakerElectionStrategy: {input_prompt}")
        function = KernelFunctionFromPrompt(function_name="SpeakerElection", prompt=input_prompt)
        result = await function.invoke(
            kernel=self.kernel,
            arguments=arguments,
            execution_settings=execution_settings,
        )
        logging.info(f"SpeakerElectionStrategy: {result}")
        parsed_result = AgentChoiceResponse.model_validate_json(result.value[0].content)

        return next(agent for agent in agents if agent.id == parsed_result.agent_id)

    def _generate_agents_info(self, agents: list["Agent"]) -> str:
        logging.info(f"msg={agents}")
        agents_info = []
        for agent in agents:
            tools = []
            if self.include_tools_descriptions:
                for tool in agent.kernel.get_full_list_of_function_metadata():
                    tool_name = tool.name
                    tool_description = tool.description
                    tools.append(f"    - tool '{tool_name}': {tool_description}")
            tools_str = "\n".join(tools)

            transitions = []
            if self.allowed_transitions and agent in self.allowed_transitions:
                transitions = [
                    f"    - can transition to: {next_agent.id}"
                    for next_agent in self.allowed_transitions[agent]
                ]
            transitions_str = "\n".join(transitions)

            agent_info = f"- agent_id: {agent.id}\n    - description: {agent.description}\n{tools_str}\n{transitions_str}\n\n"
            agents_info.append(agent_info)
            
        return "\n".join(agents_info)

In [403]:
from semantic_kernel.agents.strategies.termination.termination_strategy import TerminationStrategy

class UserInputRequiredTerminationStrategy(TerminationStrategy):
    stop_agents: list["Agent"]
        
    async def should_agent_terminate(self, agent: "Agent", history: list["ChatMessageContent"]) -> bool:
        return agent in self.stop_agents

In [404]:
kernel = _create_kernel() 

In [405]:
sales_agent = ChatCompletionAgent(
    description="A sales agent that can answer sales questions",
    id="sales",
    kernel=kernel,
    instructions="""
You are a sales person that responds to customer inquiries.
    
    You have access to pricing and product details in the PRODUCTS sections below. Please note field starting with "_" are not to be shared with the Customer.
    
    Your tasks are:
    - provide the Customer with the information they need. Try to be specific and provide the customer only options that fit their needs.
    
    IMPORTANT NOTES:
    - DO act politely and professionally
    - NEVER provide false information
    
    ### PRODUCTS
    - Mobile Internet
        - Description: Mobile WiFi for you to take anywhere, supports up to 10 devices.
        - Price: €10/month
        - Details: 10GB data included, €1/GB after that.
        - _SKU: INET_MOBILE
    - All-in-One Bundle
        - Description: Mobile internet and home internet in one package.
        - Price: €45/month
        - Details: 10GB mobile data, €1/GB after that. Home internet included.
        - _SKU: INET_BUNDLE
    - Home Internet
        - Description: High-speed internet for your home.
        - Price: €30/month
        - Details: Unlimited data at 1Gbps.
        - _SKU: INET_HOME""",
)

In [406]:
from semantic_kernel.functions import kernel_function


class TechnicalAgentPlugin:
    @kernel_function
    def get_service_status(
        service_sku: Annotated[str, "The SKU of the service to check status for"]
    ) -> Annotated[str, "Status of the specified service"]:

        return "Service degraded"

    @kernel_function
    def check_customer_telemetry(
        service_sku: Annotated[
            str,
            "The SKU of the service to check status for, values can be INET_MOBILE, INET_BUNDLE, INET_HOME",
        ],
        customerCode: Annotated[str, "The customer code to check telemetry for"],
    ) -> Annotated[str, "Telemetry summary for the specified customer"]:

        return "No issues detected"


technical_agent_kernel = _create_kernel()

technical_agent_kernel.add_plugin(TechnicalAgentPlugin, plugin_name="TechnicalAgent")


technical_agent = ChatCompletionAgent(
    description="A technical support agent that can answer technical questions",
    id="technical",
    kernel=technical_agent_kernel,
    instructions="""You are a technical support agent that responds to customer inquiries.
    
    Your task are:
    - Assess the technical issue the customer is facing.
    - Verify if there any known issues with the service the customer is using.
    - Check remote telemetry data to identify potential issues with customer's device. Be sure to ask customer code first.
    - Provide the customer with possible solutions to the issue. See the list of common issues below.
    - When the service status is OK, reply the customer and suggest to restart the device.
    - When the service status is DEGRADED, apologize to the customer and kindly ask them to wait for the issue to be resolved.
    - Open an internal ticket if the issue cannot be resolved immediately.
    
    Make sure to act politely and professionally.    
    
    ### Common issues and solutions:

    - Home Internet:
        - Issue: No internet connection.
        - Solutions: 
            - Check the router's power supply and cables.
            - Restart the router.
            - Check the internet connection status LED on the router.
    - Mobile Internet:
        - Issue: Slow internet connection or no connection.
        - Solutions:
            - Check the signal strength on the device.
            - Restart the device.
            - Check the data usage on the device.
            - Suggest the customer to purchase additional data when the limit is reached.
    - All-in-One Bundle:
        USE a combination of the solutions for Home Internet and Mobile Internet.
    
    """,
)

In [407]:
user_agent = ChatCompletionAgent(
    id="user_agent",
    kernel=kernel,
    description="A human user that interacts with the system. Can provide input to the chat",
    instructions="Always responde PAUSE"
)

In [408]:
team = AgentGroupChat(
        agents=[user_agent, sales_agent, technical_agent],
        termination_strategy=UserInputRequiredTerminationStrategy(stop_agents=[user_agent]),
        selection_strategy=SpeakerElectionStrategy(kernel=kernel),
    )

In [409]:
await team.add_chat_message(
    ChatMessageContent(
        role=AuthorRole.USER,
        content="Mobile internet is not working",
    )
)

chat= []
async for result in team.invoke():
    chat.append(f"{result.name}: '{result.content}'")

INFO:semantic_kernel.agents.group_chat.agent_chat:Adding `1` agent chat messages
INFO:root:msg=[ChatCompletionAgent(id='user_agent', description='A human user that interacts with the system. Can provide input to the chat', name='agent_GgfYKKagsIiEqJaa', instructions='Always responde PAUSE', kernel=Kernel(retry_mechanism=PassThroughWithoutRetry(), services={'default': AzureChatCompletion(ai_model_id='gpt-4o', service_id='default', instruction_role='system', client=<openai.lib.azure.AsyncAzureOpenAI object at 0x000001F265AAEE90>, ai_model_type=<OpenAIModelTypes.CHAT: 'chat'>, prompt_tokens=0, completion_tokens=0, total_tokens=0)}, ai_service_selector=<semantic_kernel.services.ai_service_selector.AIServiceSelector object at 0x000001F263D4E8A0>, plugins={}, function_invocation_filters=[], prompt_rendering_filters=[], auto_function_invocation_filters=[]), arguments=None, prompt_template=None, service_id='default'), ChatCompletionAgent(id='sales', description='A sales agent that can answer s

In [410]:
chat

["agent_XdfiFURTDXBukMXe: 'I’m sorry to hear you’re experiencing issues with your mobile internet. I’ll do my best to assist you. \n\nLet’s start by troubleshooting:\n\n1. Could you please check the signal strength on your device? Do you see any bars or network indicators?  \n2. Have you tried restarting your device? This often resolves basic connectivity issues.  \n3. Have you checked if your data usage limit has been reached or if your signal is appearing in a roaming area?\n\nIf you’re still having issues, I can check our remote telemetry data for specific factors affecting your device. For that, I’ll need a customer code or any ID associated with your account. Could you please provide that?'",
 "agent_GgfYKKagsIiEqJaa: 'PAUSE'"]