In [None]:
import os
from typing import Annotated, Any

from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent

from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureChatPromptExecutionSettings,
    AzureTextEmbedding,
    OpenAIEmbeddingPromptExecutionSettings
    )
from semantic_kernel.connectors.memory.azure_ai_search import (
    AzureAISearchCollection,
    AzureAISearchSettings, 
    AzureAISearchStore, )
from semantic_kernel.data.text_search import TextSearchResult, SearchOptions

from semantic_kernel.data import (
    VectorStoreRecordDataField,
    VectorStoreRecordKeyField,
    VectorStoreRecordVectorField,
    vectorstoremodel,
)
from semantic_kernel.functions import (
    KernelArguments,
    KernelParameterMetadata,
)

from azure.search.documents.indexes.aio import SearchIndexClient
from azure.core.credentials import AzureKeyCredential

from pydantic import BaseModel
from dotenv import load_dotenv

load_dotenv()

In [None]:
kernel = Kernel()

kernel.add_service(AzureChatCompletion(
    service_id="azure-openai",
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
    api_version=os.getenv("AZURE_OPENAI_API_VERSION")
))

credential = AzureKeyCredential(os.getenv("AZURE_AI_SEARCH_API_KEY"))

# Create a SearchIndexClient for index management
index_client = SearchIndexClient(endpoint=os.getenv("AZURE_AI_SEARCH_ENDPOINT"),
                                 credential=credential)

settings = kernel.get_prompt_execution_settings_from_service_id(service_id="azure-openai")


In [None]:
index_name = "ler-embedded-index"

@vectorstoremodel
class LERResponseClass(BaseModel):
    ler_number: Annotated[str, VectorStoreRecordKeyField]
    report_date: Annotated[str, VectorStoreRecordDataField()]
    event_date: Annotated[str, VectorStoreRecordDataField()]
    facility_name: Annotated[str, VectorStoreRecordDataField()]
    title: Annotated[str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="titleVector")]
    cfr_requirements: Annotated[list[str], VectorStoreRecordDataField()]
    abstract: Annotated[str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="abstractVector")]
    narrative: Annotated[str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="narrativeVector")]
    titleVector: Annotated[list[float] | None,
        VectorStoreRecordVectorField(
            dimensions=1536,
            local_embedding=True,
            embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},)]
    abstractVector: Annotated[list[float] | None,
        VectorStoreRecordVectorField(
            dimensions=1536,
            local_embedding=True,
            embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},)]
    narrativeVector: Annotated[list[float] | None,
        VectorStoreRecordVectorField(
            dimensions=1536,
            local_embedding=True,
            embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},)]

collection = AzureAISearchCollection[str, LERResponseClass](
    collection_name=index_name,
    data_model_type=LERResponseClass,
    search_index_client=index_client
)

def response_map(result:LERResponseClass) -> TextSearchResult:
    """Map the LERResponseClass to a TextSearchResult."""
    return TextSearchResult(
        name = result.title,
        value = result.abstract,
        link = result.ler_number
    )

text_search = collection.create_text_search_from_vector_text_search(text_search_results_mapper=response_map)


In [None]:
embeddings = AzureTextEmbedding(service_id="azure_embedding",
                                 api_key=os.getenv("AZURE_OPENAI_API_KEY"),
                                 endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
                                 deployment_name=os.getenv("AZURE_EMBEDDING_DEPLOYMENT_NAME"),
                                 api_version=os.getenv("AZURE_OPENAI_API_VERSION"))

if not kernel.services.get("azure_embedding"):
    kernel.add_service(embeddings)

test that search is working at all

In [None]:
results = await collection.vectorizable_text_search(vectorizable_text="Turbine trip")

async for result in results.results:
    print(result)

In [None]:
plugin = kernel.add_functions(
    plugin_name="azure_ai_search",
    functions=[
        text_search.create_search(
            # this create search method uses the `search` method of the text search object.
            # remember that the text_search object for this sample is based on
            # the text_search method of the Azure AI Search.
            # but it can also be used with the other vector search methods.
            # This method's description, name and parameters are what will be serialized as part of the tool
            # call functionality of the LLM.
            # And crafting these should be part of the prompt design process.
            # The default parameters are `query`, `top`, and `skip`, but you specify your own.
            # The default parameters match the parameters of the VectorSearchOptions class.
            description="""
            Search for relevant LER reports that may relate to the issue you are investigating. You do not need to
            know the LER report number, you can search by keywords across the title, abstract, or narrative.
            The narrative is focused on the event and the root cause, while the abstract is a summary of the
            entire report.""",
            # Next to the dynamic filters based on parameters, I can specify options that are always used.
            # this can include the `top` and `skip` parameters, but also filters that are always applied.
            # In this case, I am filtering by country, so only hotels in the USA are returned.
            parameters=[
                KernelParameterMetadata(
                    name="query", description="What to search for.", type="str", is_required=True, type_object=str
                ),
                KernelParameterMetadata(
                    name="top",
                    description="Number of results to return.",
                    type="int",
                    default_value=3,
                    type_object=int,
                ),
            ],
        )
    ],
)


In [None]:
# chat_function = kernel.add_function(
#     prompt="{{$chat_history}}{{$user_input}}",
#     plugin_name="ChatBot",
#     function_name="Chat",
# )

In [None]:
execution_settings = AzureChatPromptExecutionSettings(
    function_choice_behavior=FunctionChoiceBehavior.Auto(filters={"excluded_plugins": ["ChatBot"]}),
    service_id="azure-openai",
    max_tokens=2000,
    temperature=0.7,
    top_p=0.8,
)

In [None]:
agent = ChatCompletionAgent(
    kernel=kernel,
    name="ChatCompletionAgent",
    instructions="""
        You are a chat bot. Your name is Mosscap and
        you have one goal: help find the most relevant
        LER report information for the user based on
        their request. You should be polite, helpful,
        but also concise. Always focus your answers around
        what the user is asking for, and what relevant
        information you were able to retrieve.

        If no results come back from search, or nothing
        relevant is available, just let the user know.
        Do not draw on your own knowledge, or make up information.
        """,
    arguments = KernelArguments(
        settings=execution_settings,
    )
)

In [None]:
from semantic_kernel.contents import AuthorRole, ChatMessageContent, FunctionCallContent, FunctionResultContent

# Define a list to hold callback message content
intermediate_steps: list[ChatMessageContent] = []

# Define an async method to handle the `on_intermediate_message` callback
async def handle_intermediate_steps(message: ChatMessageContent) -> None:
    intermediate_steps.append(message)
    print(message)

In [None]:
user_inputs = [
    "So, what is it you do exactly?", 
    "A turbine tripped. Has this happened before?", 
    "Thank you",
]

thread = None

# Generate the agent response(s)
for user_input in user_inputs:
    print(f"# {AuthorRole.USER}: '{user_input}'")
    async for response in agent.invoke(
        messages=user_input,
        thread=thread,
        on_intermediate_message=handle_intermediate_steps,
    ):
        thread = response.thread
        print(f"# {response.name}: {response.content}")

In [None]:
for step in intermediate_steps:
    print(f"role:{step.role}, content:{step.content}, type:{step.content_type}, inner_content: {step.inner_content}")

In [None]:
async for thr in thread.get_messages():
    print(thr)

In [None]:
# Delete the thread when it is no longer needed
await thread.delete() if thread else None

# Print the intermediate steps
print("\nIntermediate Steps:")
for msg in intermediate_steps:
    if any(isinstance(item, FunctionResultContent) for item in msg.items):
        for fr in msg.items:
            if isinstance(fr, FunctionResultContent):
                print(f"Function Result:> {fr.result} for function: {fr.name}")
    elif any(isinstance(item, FunctionCallContent) for item in msg.items):
        for fcc in msg.items:
            if isinstance(fcc, FunctionCallContent):
                print(f"Function Call:> {fcc.name} with arguments: {fcc.arguments}")
    else:
        print(f"{msg.role}: {msg.content}")