In [None]:
# Install required packages (if not already installed)
%pip install prance openai semantic-kernel nest_asyncio asyncio python-dotenv requests azure-identity azure-mgmt-resource -U

In [None]:

# Import necessary libraries
import os
import asyncio
import logging
import requests
import tempfile
import re

from azure.identity import DefaultAzureCredential

from functools import reduce
from dotenv import load_dotenv, find_dotenv
from prance import ResolvingParser
from prance.util import resolver
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.kernel import Kernel
from semantic_kernel.connectors.openapi_plugin.openapi_parser import OpenApiParser
from semantic_kernel.connectors.openapi_plugin.openapi_function_execution_parameters import (
        OpenAPIFunctionExecutionParameters,
    )

# Load environment variables from .env.local if it exists, otherwise fallback to .env
load_dotenv('.env.local')
load_dotenv()

In [299]:
# To toggle streaming or non-streaming mode, change the following boolean
streaming = True

# Define the agent name and instructions
AGENT_ID = "agent1"
AGENT_NAME = "agent1"
AGENT_INSTRUCTIONS = (
                "Instructions:\n"
                " - Break the task into steps, and output the result of each step as you perform it.\n"
                " - You are an AI assistant that helps with calling APIs to generate useful information based on a user's question.\n"
                " - Use the proper function calls to get information that will be useful to the user.\n"
                " - If one function call depends on the output of another, make sure to call them in order and use the outputs appropriately.\n"
                " - Include what you are thinking, working on, and next steps in your responses, and ask for more information if needed.\n"
                " - If you don't know something, and are not able to ask the user for more information, or can't call an API, you can say 'I don't know'.\n"
                " - Always format an email as HTML. Ensure the content is well orangized and use bullet lists or tables where necessary.\n"
                " - For Weather API's, if a query parameter is required, read the description as the query will need to be converted into latitude and longitude and not a city and state.\n"
            )

subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
resource_group = os.environ["AZURE_RESOURCE_GROUP"]
service_name = os.environ["AZURE_APIM_SERVICE_NAME"]
api_version = os.environ["AZURE_APIM_SERVICE_API_VERSION"]
product_id = os.environ["AZURE_APIM_SERVICE_PRODUCT_ID"]
subscription_key = os.environ["AZURE_APIM_SERVICE_SUBSCRIPTION_KEY"]

In [300]:
async def invoke_agent(agent: ChatCompletionAgent, input: str, chat: ChatHistory):
    """Invoke the agent with the user input."""
    chat.add_user_message(input)

    print(f"# {AuthorRole.USER}: '{input}'")

    if streaming:
        contents = []
        content_name = ""
        async for content in agent.invoke_stream(chat):
            content_name = content.name
            contents.append(content)
        streaming_chat_message = reduce(lambda first, second: first + second, contents)
        print(f"# {content.role} - {content_name or '*'}: '{streaming_chat_message}'")
        chat.add_message(streaming_chat_message)
    else:
        async for content in agent.invoke(chat):
            print(f"# {content.role} - {content.name or '*'}: '{content.content}'")
            chat.add_message(content)

In [301]:
def sanitize_plugin_name(name):
    # Replace any character that is not a letter, number, or underscore with an underscore
    sanitized_name = re.sub(r'[^0-9A-Za-z_]', '_', name)
    # Remove leading underscores or numbers to ensure valid identifier if necessary
    sanitized_name = re.sub(r'^[^A-Za-z]+', '', sanitized_name)
    # Ensure the name is not empty
    if not sanitized_name:
        sanitized_name = 'plugin'
    return sanitized_name

In [302]:
async def add_openapi_plugin(kernel: Kernel, plugin_name:str, openapi_spec: str):

    print(f"    > Adding OpenAPI plugin '{plugin_name}'")
    
    parser = ResolvingParser(spec_string=openapi_spec, resolve_types = resolver.RESOLVE_FILES, strict=False, recursion_limit=10)
    parsed_spec = parser.specification

    async def my_auth_callback(**kwargs):
        return {"Ocp-Apim-Subscription-Key": os.environ["AZURE_APIM_SERVICE_SUBSCRIPTION_KEY"], "Content-Type": "application/json"}

    kernel.add_plugin_from_openapi(
        plugin_name=plugin_name,
        openapi_parsed_spec=parsed_spec,
        execution_settings=OpenAPIFunctionExecutionParameters(
                # Determines whether payload parameter names are augmented with namespaces.
                # Namespaces prevent naming conflicts by adding the parent parameter name
                # as a prefix, separated by dots
                auth_callback=my_auth_callback,
                enable_payload_namespacing=True
        )
    )


In [303]:
# Authenticate and get an access token
def get_access_token():
    credential = DefaultAzureCredential()
    token = credential.get_token("https://management.azure.com/.default")
    return token.token

In [304]:
# Function to fetch APIs that belong to a certain product
def fetch_apis_by_product(access_token, product_id):
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
    url = f'https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.ApiManagement/service/{service_name}/products/{product_id}/apis?api-version={api_version}'
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()['value']

# Get the OpenAPI JSON file from the export endpoint
def fetch_openapi_spec(api_id, access_token):
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
    url = f'https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.ApiManagement/service/{service_name}/apis/{api_id}?export=true&format=openapi&api-version={api_version}'
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    #return response.json()
    return response.text

In [305]:
async def add_apim_apis_by_product(kernel, product_id):

    # Get the access token  
    access_token = get_access_token()

    # Fetch the APIs that belong to the specified product
    apis = fetch_apis_by_product(access_token, product_id)

    # Add the OpenAPI plugins for each API
    plugin_names = set()
    for api in apis:
        openapi_spec = fetch_openapi_spec(api['name'], access_token)
        # Sanitize the plugin_name
        name_candidate = sanitize_plugin_name(api['name'])
        # Ensure the plugin_name is unique
        plugin_name = name_candidate
        counter = 1
        while plugin_name in plugin_names:
            plugin_name = f"{name_candidate}_{counter}"
            counter += 1
        plugin_names.add(plugin_name)
        await add_openapi_plugin(kernel, plugin_name, openapi_spec)

In [None]:
async def main():
    # Initialize the kernel
    kernel = Kernel()

    # Add Azure OpenAI chat completion
    chat_completion = AzureChatCompletion(
        deployment_name= os.environ["AZURE_OPENAI_MODEL"],
        api_key= os.environ["AZURE_OPENAI_KEY"],
        endpoint= os.environ["AZURE_OPENAI_ENDPOINT"],
        service_id=AGENT_ID
    )

    kernel.add_service(chat_completion)
    
    # Add the APIs from the specified product       
    await add_apim_apis_by_product(kernel, product_id)

    # Get the AI Service settings for the specified service_id
    settings = kernel.get_prompt_execution_settings_from_service_id(service_id=AGENT_ID)

    # Configure the function choice behavior to auto invoke kernel functions
    settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

    # NOTE: This is all that is required to enable logging
    logging.basicConfig(level=logging.DEBUG)
    
    # Create the agent
    agent = ChatCompletionAgent(service_id=AGENT_ID, kernel=kernel, name=AGENT_NAME, instructions=AGENT_INSTRUCTIONS, execution_settings=settings)

    # Define the chat history
    chat = ChatHistory()

    # Respond to user input
    await invoke_agent(agent, "What is the weather in Orlando, FL? What product's would work for me with the current temperature? Once you have it, send an email to adam.hockemeyer@microsoft.com with the details.", chat)


# Run the main function
if __name__ == "__main__":
    await main()