In [1]:
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

"""
DESCRIPTION:
    This sample demonstrates how to use agent operations with the 
    custom tools to create a RAG agent with Fabric Eventhouse 
    from the Azure Agents service using a synchronous client.


USAGE:
    Before running the sample:
    Set these environment variables with your own values:
    1) AZURE_AISTUDIO_PROJECT_CONN_STRING - The project connection string, as found in the overview page of your
       Azure AI Foundry project.
    2) AZURE_OPENAI_GPT4o_DEPLOYMENT_NAME - The deployment name of the AI model, as found under the "Name" column in 
       the "Models + endpoints" tab in your Azure AI Foundry project.
"""
import os
from typing import Any, Callable, Set, Dict, List, Optional
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import FunctionTool, ToolSet
from azure.identity import DefaultAzureCredential
from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference import EmbeddingsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from azure.core.credentials import AzureKeyCredential

load_dotenv()
AIFOUNDRY_GPT4O_ENDPOINT=os.getenv("AIFOUNDRY_GPT4O_ENDPOINT")
AIFOUNDRY_API_KEY=os.getenv("AIFOUNDRY_API_KEY")
AIFOUNDRY_ADA_ENDPOINT=os.getenv("AIFOUNDRY_ADA_ENDPOINT")
AIFOUNDRY_GPT4O_DEPLOYMENT_NAME= os.getenv("AIFOUNDRY_GPT4O_DEPLOYMENT_NAME")
AIFOUNDRY_ADA_DEPLOYMENT_NAME= os.getenv("AIFOUNDRY_ADA_DEPLOYMENT_NAME")
AZURE_AISTUDIO_PROJECT_CONN_STRING=os.getenv("AZURE_AISTUDIO_PROJECT_CONN_STRING")


project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(),
    conn_str=AZURE_AISTUDIO_PROJECT_CONN_STRING,
)


In [2]:
# Connect to Fabric Eventhouse DB
# First create an app registration in Azure Entra 
# Assign the corresponding service principal to the Fabric Workspace as reader
# Make sure the Fabric settings allow the assignment of the service principal to the workspace
# read more here: https://learn.microsoft.com/en-us/fabric/admin/enable-service-principal-admin-apis

import os
from dotenv import load_dotenv

load_dotenv()
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

ENTRA_TENANT_ID = os.getenv("ENTRA_TENANT_ID")
KUSTO_CLUSTER = os.getenv("KUSTO_CLUSTER")
KUSTO_DATABASE = os.getenv("KUSTO_DATABASE")
KUSTO_TABLE = os.getenv("KUSTO_TABLE")
KUSTO_MANAGED_IDENTITY_APP_ID = os.getenv("KUSTO_MANAGED_IDENTITY_APP_ID")
KUSTO_MANAGED_IDENTITY_SECRET = os.getenv("KUSTO_MANAGED_IDENTITY_SECRET")


# Connect to adx using AAD app registration
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_CLUSTER, KUSTO_MANAGED_IDENTITY_APP_ID, KUSTO_MANAGED_IDENTITY_SECRET,  ENTRA_TENANT_ID)
client = KustoClient(kcsb)

# Test the connection to kusto works - sample query to get the top 10 results from the  table
kusto_query = KUSTO_TABLE + " | take 10"

response = client.execute(KUSTO_DATABASE, kusto_query)
df = dataframe_from_result_table(response.primary_results[0])
df

Unnamed: 0,document_name,content,embedding
0,moby dick book,"flight, and still at every billow that he brok...","[-0.03209448605775833, -0.017250102013349533, ..."
1,moby dick book,But no sooner did his harpooneer stand up for ...,"[-0.02721520885825157, -0.01564219407737255, 0..."
2,moby dick book,mad cougar! This puts me in mind of fastening ...,"[-0.0304492749273777, -0.0014969026669859886, ..."
3,moby dick book,gunwales of the bows were almost even with the...,"[-0.0052542597986757755, -0.014714685268700125..."
4,moby dick book,"ourselves stand up under; even here, above -gr...","[0.011199021711945534, 0.002442705212160945, -..."
5,moby dick book,to what? To three bits of board. Is this the c...,"[0.0005041902768425643, -0.0058479211293160915..."
6,moby dick book,"“Stand by, men; he stirs,” cried Starbuck, as ...","[-0.015052762813866138, -0.025596633553504944,..."
7,moby dick book,"whereby when wounded, the blood is in some deg...","[-0.018078479915857315, -0.0049111368134617805..."
8,moby dick book,"new made wound, which kept continually playing...","[-0.022315341979265213, -0.016487231478095055,..."
9,moby dick book,the solemn churches that preach unconditional ...,"[-0.001759579055942595, -0.01655816100537777, ..."


In [3]:
from tenacity import retry, wait_random_exponential, stop_after_attempt

#we use the tenacity library to create delays and retries when calling openAI embeddings to avoid hitting throttling limits
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def calc_embeddings(text):
    model = EmbeddingsClient(
        endpoint="https://ai-hubpfrag458423774142.openai.azure.com/openai/deployments/text-embedding-ada-002",
        credential=AzureKeyCredential(AIFOUNDRY_API_KEY),
        model=AIFOUNDRY_ADA_DEPLOYMENT_NAME
    )

    response = model.embed(
        input=[text],
    )
    return response

def do_search(question, nr_of_answers=1):
    ENTRA_TENANT_ID = os.getenv("ENTRA_TENANT_ID")
    KUSTO_CLUSTER = os.getenv("KUSTO_CLUSTER")
    KUSTO_DATABASE = os.getenv("KUSTO_DATABASE")
    KUSTO_TABLE = os.getenv("KUSTO_TABLE")
    KUSTO_MANAGED_IDENTITY_APP_ID = os.getenv("KUSTO_MANAGED_IDENTITY_APP_ID")
    KUSTO_MANAGED_IDENTITY_SECRET = os.getenv("KUSTO_MANAGED_IDENTITY_SECRET")

    # Connect to adx using AAD app registration
    kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_CLUSTER, KUSTO_MANAGED_IDENTITY_APP_ID, KUSTO_MANAGED_IDENTITY_SECRET,  ENTRA_TENANT_ID)
    client = KustoClient(kcsb)

    searchedEmbedding = calc_embeddings(question)
    kusto_query = KUSTO_TABLE + " | extend similarity = series_cosine_similarity(dynamic("+str(searchedEmbedding)+"), embedding) | top " + str(nr_of_answers) + " by similarity desc "
    response = client.execute(KUSTO_DATABASE, kusto_query)

    txt_results = []
    for row in response.primary_results[0]:
        txt_results.append(row['content'])
    return txt_results

def call_openAI_for_natural_language(question, answers):
    client = ChatCompletionsClient(
        endpoint=AIFOUNDRY_GPT4O_ENDPOINT,
        credential=AzureKeyCredential(AIFOUNDRY_API_KEY),
    )
    prompt = 'Question: {}'.format(question) + '\n' + 'Information: {}'.format(answers)
    response = client.complete(
        messages=[
            SystemMessage(content="You are a HELPFUL assistant answering users questions. Answer the question using the provided information and do not add anything else."),
            UserMessage(content=prompt)
        ],
        max_tokens=4096,
        temperature=1.0,
        top_p=1.0,
        model=AIFOUNDRY_GPT4O_DEPLOYMENT_NAME
    )

    return response.choices[0].message.content

In [4]:
#Define a function to be called by the agent as a custom tool
import json

def query_knowledge_base(question: str) -> str:
    """
    Fetches data from our knowledge base in Fabric Eventhouse using it as a Vector DB.
    :param question: the information to search.
    :return: answer to the question.
    """
    # Call the search function to get the results
    results = do_search(question, 3)
    nl_answer = call_openAI_for_natural_language(question, results)
    return nl_answer

user_functions: Set[Callable[..., Any]] = {
    query_knowledge_base,
}

In [5]:
# Initialize agent toolset with user functions
functions = FunctionTool(user_functions)
toolset = ToolSet()
toolset.add(functions)

# Create your agent with the toolset
agent = project_client.agents.create_agent(
    model=AIFOUNDRY_GPT4O_DEPLOYMENT_NAME,
    name="rag-agent",
    instructions="You are an assistant retrieving information. Use the provided functions to answer questions.",
    toolset=toolset
)

In [6]:
# Create thread for communication
def call_agent(question):
    thread = project_client.agents.create_thread()
    print(f"Created thread, ID: {thread.id}")

    # Create message to thread
    message = project_client.agents.create_message(
        thread_id=thread.id,
        role="user",
        content=question
    )
    print(f"Created message, ID: {message.id}")

    # Create and process agent run in thread with tools
    run = project_client.agents.create_and_process_run(thread_id=thread.id, assistant_id=agent.id)
    print(f"Run finished with status: {run.status}")

    if run.status == "failed":
        print(f"Run failed: {run.last_error}")
        # Fetch and log all messages
        
    messages = project_client.agents.list_messages(thread_id=thread.id)
    print(f"Messages: {messages}")

    # print(messages["data"][0]["content"][0]["text"]["value"])
    # Fetch and log all messages in chronological order
    messages_data = messages["data"]

    # Sort messages by creation time (ascending)
    sorted_messages = sorted(messages_data, key=lambda x: x["created_at"])

    print("\n--- Thread Messages (sorted) ---")
    for msg in sorted_messages:
        role = msg["role"].upper()
        # Each 'content' is a list; get the first text block if present
        content_blocks = msg.get("content", [])
        text_value = ""
        if content_blocks and content_blocks[0]["type"] == "text":
            text_value = content_blocks[0]["text"]["value"]
        print(f"{role}: {text_value}")

In [None]:
user_prompt = "Why does the coffin prepared for Queequeg become Ishmael's life buoy once the Pequod sinks?"
# query_knowledge_base(user_prompt)
call_agent(user_prompt)

In [8]:
# Delete the agent when done
project_client.agents.delete_agent(agent.id)
print("Deleted agent")

Deleted agent
