In [1]:
import json
from databricks.sdk import WorkspaceClient
from azure.ai.projects import AIProjectClient
from azure.ai.agents.models import ToolSet, FunctionTool
from azure.identity import DefaultAzureCredential
from databricks.sdk.service.dashboards import GenieAPI
from azure.ai.agents.models import (FunctionTool, ToolSet)
from typing import Any, Callable, Set
import json
import os

In [None]:
os.environ["DATABRICKS_SDK_UPSTREAM"] = "AzureAIFoundry"
os.environ["DATABRICKS_SDK_UPSTREAM_VERSION"] = "1.0.0"

DATABRICKS_ENTRA_ID_AUDIENCE_SCOPE = "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default" 
# Well known Entra ID audience for Azure Databricks - https://learn.microsoft.com/en-us/azure/databricks/dev-tools/auth/user-aad-token

FOUNDRY_PROJECT_ENDPOINT = ""
FOUNDRY_DATABRICKS_CONNECTION_NAME = ""

GENIE_QUESTION_1 = "What is the average transaction value?"

GENIE_QUESTION_2 = "How many transactions were made that were above that value?"

In [16]:
import json
import requests
from typing import Dict, Any, Callable

from azure.identity import DefaultAzureCredential
from azure.mgmt.logic import LogicManagementClient


class AzureLogicAppTool:
    """
    A service that manages multiple Logic Apps by retrieving and storing their callback URLs,
    and then invoking them with an appropriate payload.
    """

    def __init__(self, subscription_id: str, resource_group: str, credential=None):
        if credential is None:
            credential = DefaultAzureCredential(exclude_environment_credential=True,
    exclude_interactive_browser_credential=False)
        self.subscription_id = subscription_id
        self.resource_group = resource_group
        self.logic_client = LogicManagementClient(credential, subscription_id)

        self.callback_urls: Dict[str, str] = {}

    def register_logic_app(self, logic_app_name: str, trigger_name: str) -> None:
        """
        Retrieves and stores a callback URL for a specific Logic App + trigger.
        Raises a ValueError if the callback URL is missing.
        """
        callback = self.logic_client.workflow_triggers.list_callback_url(
            resource_group_name=self.resource_group,
            workflow_name=logic_app_name,
            trigger_name=trigger_name,
        )

        if callback.value is None:
            raise ValueError(f"No callback URL returned for Logic App '{logic_app_name}'.")

        self.callback_urls[logic_app_name] = callback.value

    def invoke_logic_app(self, logic_app_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        """
        Invokes the registered Logic App (by name) with the given JSON payload.
        Returns a dictionary summarizing success/failure.
        """
        if logic_app_name not in self.callback_urls:
            raise ValueError(f"Logic App '{logic_app_name}' has not been registered.")

        url = self.callback_urls[logic_app_name]
        response = requests.post(url=url, json=payload)

        if response.ok:
            return {"result": f"Successfully invoked {logic_app_name}."}
        else:
            return {"error": (f"Error invoking {logic_app_name} " f"({response.status_code}): {response.text}")}


def create_send_email_function(service: AzureLogicAppTool, logic_app_name: str) -> Callable[[str, str, str], str]:
    """
    Returns a function that sends an email by invoking the specified Logic App in LogicAppService.
    This keeps the LogicAppService instance out of global scope by capturing it in a closure.
    """

    def send_email_via_logic_app(recipient: str, subject: str, body: str) -> str:
        """
        Sends an email by invoking the specified Logic App with the given recipient, subject, and body.

        :param recipient: The email address of the recipient.
        :param subject: The subject of the email.
        :param body: The body of the email.
        :return: A JSON string summarizing the result of the operation.
        """
        payload = {
            "email_to": recipient,
            "email_subject": subject,
            "email_body": body,
        }
        result = service.invoke_logic_app(logic_app_name, payload)
        return json.dumps(result)

    return send_email_via_logic_app

def create_get_weather_function(service: AzureLogicAppTool, logic_app_name: str) -> Callable[[str], str]:
    """
    Returns a function that retrieves weather information by invoking the specified Logic App in LogicAppService.
    This keeps the LogicAppService instance out of global scope by capturing it in a closure.
    """

    def get_weather_via_logic_app(location: str) -> str:
        """
        Retrieves weather information for a given location by invoking the specified Logic App.

        :param location: The location for which to retrieve weather information.
        :return: A JSON string summarizing the result of the operation.
        """
        payload = {
            "location": location,
        }
        result = service.invoke_logic_app(logic_app_name, payload)
        return json.dumps(result)

    return get_weather_via_logic_app

In [17]:
def ask_genie(question: str, conversation_id: str = None) -> str:
    """
    Ask Genie a question and return the response as JSON.
    The response JSON will contain the conversation ID and either the message content or a table of results.
    Reuse the conversation ID in future calls to continue the conversation and maintain context.
    
    param question: The question to ask Genie.
    param conversation_id: The ID of the conversation to continue. If None, a new conversation will be started.
    """
    try:
        if conversation_id is None:
            message = genie_api.start_conversation_and_wait(genie_space_id, question)
            conversation_id = message.conversation_id
        else:
            message = genie_api.create_message_and_wait(genie_space_id, conversation_id, question)

        query_result = None
        if message.query_result:
            query_result = genie_api.get_message_query_result(
                genie_space_id, message.conversation_id, message.id
            )

        message_content = genie_api.get_message(genie_space_id, message.conversation_id, message.id)

        # Try to parse structured data if available
        if query_result and query_result.statement_response:
            statement_id = query_result.statement_response.statement_id
            results = databricks_workspace_client.statement_execution.get_statement(statement_id)
            columns = results.manifest.schema.columns
            data = results.result.data_array
            headers = [col.name for col in columns]
            rows = []
            for row in data:
                formatted_row = []
                for value, col in zip(row, columns):
                    if value is None:
                        formatted_value = "NULL"
                    elif col.type_name in ["DECIMAL", "DOUBLE", "FLOAT"]:
                        formatted_value = f"{float(value):,.2f}"
                    elif col.type_name in ["INT", "BIGINT", "LONG"]:
                        formatted_value = f"{int(value):,}"
                    else:
                        formatted_value = str(value)
                    formatted_row.append(formatted_value)
                rows.append(formatted_row)
            return json.dumps({
                "conversation_id": conversation_id,
                "table": {
                    "columns": headers,
                    "rows": rows
                }
            })

        # Fallback to plain message text
        if message_content.attachments:
            for attachment in message_content.attachments:
                if attachment.text and attachment.text.content:
                    return json.dumps({
                        "conversation_id": conversation_id,
                        "message": attachment.text.content
                    })

        return json.dumps({
            "conversation_id": conversation_id,
            "message": message_content.content or "No content returned."
        })

    except Exception as e:
        return json.dumps({
            "error": "An error occurred while talking to Genie.",
            "details": str(e)
        })


In [18]:
credential = DefaultAzureCredential(
    exclude_environment_credential=True,
    exclude_interactive_browser_credential=False
)


project_client = AIProjectClient(
    FOUNDRY_PROJECT_ENDPOINT,
    credential
)
print(f"AI Project client created for project endpoint: {FOUNDRY_PROJECT_ENDPOINT}")

AI Project client created for project endpoint: https://gk-aifoundry.services.ai.azure.com/api/projects/supplychainAgents


In [19]:
connection = project_client.connections.get(FOUNDRY_DATABRICKS_CONNECTION_NAME)
print(f"Retrieved connection '{FOUNDRY_DATABRICKS_CONNECTION_NAME}' from AI project")

if connection.metadata['azure_databricks_connection_type'] == 'genie':
    genie_space_id = connection.metadata['genie_space_id']
    print(f"Connection is of type 'genie', retrieved genie space ID: {genie_space_id}")
else:
    raise ValueError("Connection is not of type 'genie', please check the connection type.")

Retrieved connection 'gkvadbgenie01f046c445d312de8f180e3af59131f2' from AI project
Connection is of type 'genie', retrieved genie space ID: 01f046c445d312de8f180e3af59131f2


In [None]:
# Extract subscription and resource group from the project scope
subscription_id = ""
resource_group = ""

# Logic App details which are already defined in Azure
logic_app_name = "sendEmail"
trigger_name = "When_a_HTTP_request_is_received"

# Create and initialize AzureLogicAppTool utility
logic_app_tool = AzureLogicAppTool(subscription_id, resource_group)
logic_app_tool.register_logic_app(logic_app_name, trigger_name)

In [None]:
#  Logic App details which are already defined in Azure
logic_app_name_weather = "getweather"
trigger_name_weather = "When_a_HTTP_request_is_received"

# Create and initialize AzureLogicAppTool utility
logic_app_tool = AzureLogicAppTool(subscription_id, resource_group)
logic_app_tool.register_logic_app(logic_app_name_weather, trigger_name_weather)

In [22]:
# Create the specialized "send_email_via_logic_app" function for your agent tools
send_email_func = create_send_email_function(logic_app_tool, logic_app_name)
get_weather_func = create_get_weather_function(logic_app_tool, logic_app_name_weather)

In [23]:
databricks_workspace_client = WorkspaceClient(
    host=connection.target,
    token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token,
)
print(f"Databricks workspace client created for host: {connection.target}")

genie_api = GenieAPI(databricks_workspace_client.api_client)

toolset = ToolSet()
user_functions: Set[Callable[..., Any]] = { ask_genie, send_email_func, get_weather_func }
functions = FunctionTool(functions=user_functions)
toolset.add(functions)

Databricks workspace client created for host: https://adb-3707350321347716.16.azuredatabricks.net


In [24]:
with project_client:
    # Create an agent and run user's request with ask_genie function
    project_client.agents.enable_auto_function_calls(toolset)

    agent = project_client.agents.create_agent(
        model='gpt-4.1',
        name="AI Foundry Databricks Agent",
        instructions="You're an helpful assistant, use Databricks Genie to answer questions. " \
        "Use the conversation_id returned by the first call to the ask_genie function to continue the conversation in Genie.",
        toolset=toolset,
    )

    print(f"Agent '{agent.name}' created with model '{agent.model}'")

    thread = project_client.agents.threads.create()
    print(f"Created thread, ID: {thread.id}")

    agent_message_1 = project_client.agents.messages.create(
        thread_id=thread.id,
        role="user",
        content=GENIE_QUESTION_1,
    )
    print(f"Created message 1 with question: {GENIE_QUESTION_1}, ID: {agent_message_1.id}")

    print("Creating and processing Run 1")

    run_1 = project_client.agents.runs.create_and_process(
        thread_id=thread.id,
        agent_id=agent.id  
    )

    print(f"Run 1 completed with status: {run_1.status}")

    agent_message_2 = project_client.agents.messages.create(
        thread_id=thread.id,
        role="user",
        content=GENIE_QUESTION_2,
    )
    print(f"Created message 1 with question: {GENIE_QUESTION_2}, ID: {agent_message_2.id}")

    print("Creating and processing Run 2")

    run_2 = project_client.agents.runs.create_and_process(
        thread_id=thread.id,
        agent_id=agent.id  
    )

    print(f"Run 2 completed with status: {run_2.status}")

    # # Delete the agent when done
    # project_client.agents.delete_agent(agent.id)
    # print("Deleted agent")

    # Fetch and log all messages
    messages = project_client.agents.messages.list(thread_id=thread.id)
    for message in messages:
        print(f"Message ID: {message.id}, Role: {message.role}, Content: {message.content}")

    # Fetch and log all run steps
    print("\n\nSteps in Run 1:")
    steps = project_client.agents.run_steps.list(thread_id=thread.id, run_id=run_1.id)
    for step in steps:
        print(json.dumps(step.as_dict(), indent=2))

    print("\n\nSteps in Run 2:")
    steps = project_client.agents.run_steps.list(thread_id=thread.id, run_id=run_2.id)
    for step in steps:
        print(json.dumps(step.as_dict(), indent=2))

Agent 'AI Foundry Databricks Agent' created with model 'gpt-4.1'
Created thread, ID: thread_nNRLDl4j1oZVkIxk2S5JiIg3
Created message 1 with question: What is the average transaction value?, ID: msg_RSzzg6rJwq9fLR0O7sOf5Jb1
Creating and processing Run 1
Run 1 completed with status: RunStatus.COMPLETED
Created message 1 with question: How many transactions were made that were above that value?, ID: msg_oo7IEaTp89ca4a6B9an9BkJT
Creating and processing Run 2
Run 2 completed with status: RunStatus.COMPLETED
Message ID: msg_sHS26ddeJC4UIrWQdOHroZPH, Role: MessageRole.AGENT, Content: [{'type': 'text', 'text': {'value': "There were 6,981 transactions made that were above the average transaction value. If you'd like more analysis or information, feel free to ask!", 'annotations': []}}]
Message ID: msg_oo7IEaTp89ca4a6B9an9BkJT, Role: MessageRole.USER, Content: [{'type': 'text', 'text': {'value': 'How many transactions were made that were above that value?', 'annotations': []}}]
Message ID: msg_m