In [None]:
# Multi-Agent System with Azure AI and Microsoft Fabric
# PRODUCTION-SAFE VERSION - Uses environment variables for all sensitive data


#### Install required libraries
Install necessary python packages for the multi-agent system

In [None]:
%pip install semantic-kernel azure-ai-projects azure-identity azure-mgmt-resource

#### Import dependencies
Import needed modules for Azure AI, Fabric, and Semantic Kernel

In [None]:
# Fabric utilities
notebookutils.fs.help()

In [None]:
print("HJ: ", notebookutils.fs.ls("/./"))

In [None]:
notebookutils.fs.mounts()

### Sets up Azure credentials and initializes the AI project client.
**SECURITY NOTE**: This cell uses environment variables for sensitive information.
Set these environment variables before running:
- AZURE_TENANT_ID
- AZURE_CLIENT_ID  
- AZURE_CLIENT_SECRET
- AZURE_PROJECT_CONNECTION_STRING
- AZURE_MODEL_DEPLOYMENT_NAME
- FABRIC_BASE_URL
- ML_DATA_PATH
- ML_MODEL_NAME

In [None]:
from azure.mgmt.resource import SubscriptionClient

import os
import time
import uuid
import re
import json
from typing import Any, Dict
from pprint import pprint

from openai import OpenAI
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
from synapse.ml.mlflow import get_mlflow_env_config

from azure.identity import ClientSecretCredential
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import AzureAISearchTool, FunctionTool, RequiredFunctionToolCall, SubmitToolOutputsAction, ToolOutput

from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.functions import kernel_function

# Load credentials from environment variables for security
credential = ClientSecretCredential(
    tenant_id=os.environ.get("AZURE_TENANT_ID"),
    client_id=os.environ.get("AZURE_CLIENT_ID"),
    client_secret=os.environ.get("AZURE_CLIENT_SECRET"),
)

# Load configuration from environment variables
project_connection_string = os.environ.get("AZURE_PROJECT_CONNECTION_STRING")
model_deployment_name = os.environ.get("AZURE_MODEL_DEPLOYMENT_NAME", "hjfab-gpt-4o")

# Validate required environment variables
required_vars = [
    "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CLIENT_SECRET", 
    "AZURE_PROJECT_CONNECTION_STRING", "FABRIC_BASE_URL"
]

missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
    raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

project_client = AIProjectClient.from_connection_string(
    credential=credential,
    conn_str=project_connection_string
)

print("Testing Azure credentials...")
try:
    subscription_client = SubscriptionClient(credential)
    subscriptions = list(subscription_client.subscriptions.list())
    print("✅ Successfully authenticated. Available subscriptions:")
    for sub in subscriptions:
        print(f"- {sub.subscription_id}: {sub.display_name}")
except Exception as e:
    print(f"❌ Authentication failed: {e}")

### Returning all the connections in the project in AI Foundry

In [None]:
connections = project_client.connections.list()
for connection in connections:
    print(connection)

### Returning just the AI Search connection in the project in AI Foundry

In [None]:
from azure.ai.projects.models import Evaluation, Dataset, EvaluatorConfiguration, ConnectionType

connections = project_client.connections.list(
    connection_type=ConnectionType.AZURE_AI_SEARCH,
)
search_connection_id = ""
search_connection_name = os.environ.get("AZURE_SEARCH_CONNECTION_NAME", "AzureAISearch")

for connection in connections:
    print("The connection name: ", connection.name)
    if connection.name == search_connection_name:
        search_connection_id = connection.id
        print(f"✅ Found search connection: {connection.name}")
        break

if not search_connection_id:
    print(f"⚠️ Search connection '{search_connection_name}' not found")
    
print("Connection ID for Search: ", search_connection_id)

#### Create an AI agent
Creates an agent, the tools, the definitions and plugin in the class if it comes from the request

In [None]:
async def create_agent(
    client,
    name,
    instructions,
    tools=None,
    tool_resources=None,
    plugins=None,  # <- Optional Parameter
    model=None     # <- To override models for agents if needed.
):
    agent_definition = await client.agents.create_agent(
        model=model or model_deployment_name,
        name=name,
        instructions=instructions,
        tools=tools,
        tool_resources=tool_resources,
        headers={"x-ms-enable-preview": "true"},
    )

    return AzureAIAgent(
        client=client,
        definition=agent_definition,
        plugins=plugins  # <- Created in the class if receive from request
    )

### This class is used as a utility to connect to the Data Agent with a query
**SECURITY NOTE**: The base_url is loaded from environment variables.
FABRIC_BASE_URL environment variable must be set with your Fabric workspace URL.

In [None]:
import requests
import typing as t
from openai import OpenAI
from openai._exceptions import APIStatusError
from sempy.fabric._token_provider import SynapseTokenProvider

# Load Fabric URL from environment variable
base_url = os.environ.get("FABRIC_BASE_URL")
if not base_url:
    raise ValueError("FABRIC_BASE_URL environment variable is required")

question = "What datasources do you have access to?"
configs = get_mlflow_env_config()

# Create OpenAI Client for Fabric
class HJFabricOpenAI(OpenAI):
    def __init__(
        self,
        api_version: str = "2024-05-01-preview",
        **kwargs: t.Any,
    ) -> None:
        self.api_version = api_version
        default_query = kwargs.pop("default_query", {})
        default_query["api-version"] = self.api_version
        super().__init__(
            api_key="",
            base_url=base_url,
            default_query=default_query,
            **kwargs,
        )
    
    def _prepare_options(self, options: FinalRequestOptions) -> None:
        headers: dict[str, str | Omit] = (
            {**options.headers} if is_given(options.headers) else {}
        )
        options.headers = headers
        headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
        if "Accept" not in headers:
            headers["Accept"] = "application/json"
        if "ActivityId" not in headers:
            correlation_id = str(uuid.uuid4())
            headers["ActivityId"] = correlation_id

        return super()._prepare_options(options)

# Pretty printing helper
def pretty_print(messages):
    print("---Conversation---")
    for m in messages:
        print(f"{m.role}: {m.content[0].text.value}")
    print()

print("Testing Fabric Data Agent connection...")
try:
    fabric_client = HJFabricOpenAI()
    assistant = fabric_client.beta.assistants.create(model="not used")
    thread = fabric_client.beta.threads.create()
    message = fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
    run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)

    # Wait for completion
    while run.status in ["queued", "in_progress"]:
        run = fabric_client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
        time.sleep(2)

    response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
    pretty_print(response)
    fabric_client.beta.threads.delete(thread_id=thread.id)
    print("✅ Fabric Data Agent connection successful")
except Exception as e:
    print(f"⚠️ Fabric Data Agent connection failed: {e}")

### Implements a custom Data Agent for retrieving customer information using Fabric AI.
This class is designed to be a Function Tool, which interacts with Fabric to retrieve customer-related information.
**SECURITY NOTE**: This class loads the Fabric base URL from environment variables for security.

In [None]:
class FabricFunctionAISkill:
    @kernel_function(description="Provides information such as age and tenure about clients.")
    def call_fabric_ai_skill(self, question: str) -> str:
        print("Calling Fabric AI Skill with question:", question)
        configs = get_mlflow_env_config()
        
        # Load base URL from environment variable
        base_url = os.environ.get("FABRIC_BASE_URL")
        if not base_url:
            raise ValueError("FABRIC_BASE_URL environment variable is required")

        class FabricOpenAI(OpenAI):
            def __init__(self, api_version: str = "2024-05-01-preview", **kwargs: Any) -> None:
                self.api_version = api_version
                default_query = kwargs.pop("default_query", {})
                default_query["api-version"] = self.api_version
                super().__init__(api_key="", base_url=base_url, default_query=default_query, **kwargs)

            def _prepare_options(self, options: FinalRequestOptions) -> None:
                headers: Dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {}
                headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
                headers.setdefault("Accept", "application/json")
                headers.setdefault("ActivityId", str(uuid.uuid4()))
                options.headers = headers
                return super()._prepare_options(options)

        try:
            fabric_client = FabricOpenAI()
            assistant = fabric_client.beta.assistants.create(model="not used")
            thread = fabric_client.beta.threads.create()
            fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
            run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)

            while run.status in ["queued", "in_progress"]:
                run = fabric_client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
                time.sleep(2)

            response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
            answer = next((m.content[0].text.value for m in response if m.role == "assistant"), None)
            fabric_client.beta.threads.delete(thread_id=thread.id)
            
            return json.dumps({"answer": answer if answer else "No response from Fabric Data Agent."})
        except Exception as e:
            print(f"Error in Fabric AI Skill: {e}")
            return json.dumps({"answer": f"Error: {str(e)}"})

### Implements a custom Data Agent for predicting customer churn using MLFlow.
This class demonstrates the use of SynapseML for predictions and interacts with MLFlow to predict customer churn.
**SECURITY NOTE**: Data paths and model names are configurable via environment variables.

In [None]:
import json
import time
import uuid
from typing import Any, Dict
from pyspark.sql import SparkSession
from mlflow import pyfunc

class FabricFunctionML:
    @kernel_function(description="Predicts which customers will become a churn")
    def call_ML_model(self, question: str) -> str:
        print("Loading ML model for churn prediction...")
       
        # Load configuration from environment variables
        data_path = os.environ.get("ML_DATA_PATH")
        model_name = os.environ.get("ML_MODEL_NAME", "hj_lgbm_sm")
        model_version = os.environ.get("ML_MODEL_VERSION", "1")
        
        if not data_path:
            raise ValueError("ML_DATA_PATH environment variable is required")

        try:
            print(f"Loading data from: {data_path}")
            df_test = spark.read.format("delta").load(data_path)
            
            print(f"Loading model: {model_name} version {model_version}")
            model = pyfunc.load_model(f"models:/{model_name}/{model_version}")
            
            print("Making predictions...")
            predictions = model.predict(df_test.toPandas())
            
            # Convert predictions to JSON
            answer = json.dumps(predictions.tolist())
            print("✅ ML prediction completed successfully")
            
            return json.dumps({"answer": answer if answer else "No response from ML."})
            
        except Exception as e:
            print(f"❌ Error in ML model: {e}")
            return json.dumps({"answer": f"Error: {str(e)}"})

### Sets up various agents and defines their roles.
Creates routing agent, synthesis agent, customer info agent, churn prediction agent, loyalty programs agent, and chit-chat agent.
**SECURITY NOTE**: All connection strings and sensitive configuration loaded from environment variables.

In [None]:
# Initialize plugins
AISKill_plugin = FabricFunctionAISkill()
FabricML_plugin = FabricFunctionML()

print("Creating Azure AI Agent client...")
sk_client = AzureAIAgent.create_client(
    credential=credential,
    conn_str=project_connection_string
)

print("Creating thread for agents...")
thread = await sk_client.agents.create_thread()

# Coordinator agents
router_instructions = (
    "You are a routing coordinator. Your only task is to route the user query to the correct agents by generating a JSON object. "
    "The JSON should map agent names to sub-queries. "
    "The valid agents are: "
    "- CustomerInfoAgent: Handles questions related to data and customer information. "
    "- LoyaltyProgramsAgent: Expert in Loyalty Programs. "
    "- ChitChatAgent: Handles informal, social, or general-purpose questions. "
    "- ChurnPredAgent: Predicts customer churn. "
    "IMPORTANT: Do not answer the user's question yourself. ONLY return a valid JSON object mapping the agents to sub-queries. "
    "Example: {\"CustomerInfoAgent\": \"What data sources are available?\", \"ChurnPredAgent\": \"Predict churn for customer 123\"} "
    "If none of the agents are a good match, route the full question to ChitChatAgent as fallback. "
    "Never include explanations, comments, or assistant-like messages."
)

print("Creating routing agent...")
routing_agent = await create_agent(
    sk_client,
    "RoutingAgent",
    router_instructions
)

print("Creating synthesis agent...")
synthesis_agent = await create_agent(
    sk_client,
    "SynthesisAgent",
    "You are a synthesis agent. Combine responses from other agents into a clear, friendly answer. Don't include any JSON."
)

# Customer Info Agent with Fabric AI Skill
print("Creating customer info agent...")
functions = FunctionTool(functions={FabricFunctionAISkill.call_fabric_ai_skill})
CustomerAgent = await create_agent(
    sk_client,
    "CustomerInfoAgent",
    "You are responsible for answering customer-related questions using an internal Data Agent. Please, try to use your tool any time you receive a question.",
    tools=functions.definitions,
    plugins=[AISKill_plugin]
)

# Churn Prediction Agent with ML Model
print("Creating churn prediction agent...")
ml_functions = FunctionTool(functions={FabricFunctionML.call_ML_model})
ChurnPredAgent = await create_agent(
    sk_client,
    "ChurnPredAgent",
    "You are responsible for predicting which customers will become churn using ML models.",
    tools=ml_functions.definitions,
    plugins=[FabricML_plugin]
)

# Loyalty Programs Agent with Azure AI Search
print("Creating loyalty programs agent...")
search_index_name = os.environ.get("AZURE_SEARCH_INDEX_NAME", "hjazureblob-index")

if search_connection_id:
    ai_search_tool = AzureAISearchTool(
        index_connection_id=search_connection_id,
        index_name=search_index_name
    )
    
    loyalty_agent = await create_agent(
        sk_client,
        "LoyaltyProgramsAgent",
        "You are an expert in loyalty programs. Use Azure AI Search to retrieve relevant information.",
        tools=ai_search_tool.definitions,
        tool_resources=ai_search_tool.resources
    )
    print("✅ Loyalty programs agent created with Azure AI Search")
else:
    loyalty_agent = await create_agent(
        sk_client,
        "LoyaltyProgramsAgent",
        "You are an expert in loyalty programs. Provide general loyalty program information."
    )
    print("⚠️ Loyalty programs agent created without Azure AI Search (connection not found)")

# ChitChat Agent (fallback)
print("Creating chitchat agent...")
chitchat_agent = await create_agent(
    sk_client,
    "ChitChatAgent",
    "You are a friendly assistant. Handle general, informal, or social questions not related to business-specific topics."
)

print("✅ All agents created successfully!")

### Demonstrates how to handle user queries and route them to the appropriate agents.
This cell handles a user query, routing it to the appropriate agents, and synthesizing the responses.

In [None]:
# Example user messages - you can modify these
user_message = (
    "Hello, how are you doing? How is everything going? "
    "I am a bit worried about customer ID 15701354. "
    "Could you tell me if they are likely to become a churn? "
    "Give me more information about this customer, "
    "and find the best loyalty program to recommend them?"
)

# Alternative shorter message
# user_message = (
#     "Hello.. "
#     "I am a bit worried about customer ID 15647311. "
#     "Could you tell me if they are likely to become a churn?"
# )

print("Processing user query:", user_message)
print("\n" + "="*50)

user_chat_message = ChatMessageContent(role=AuthorRole.USER, content=user_message)

# Route the query
print("🔄 Routing query to appropriate agents...")
routing_response = ""
async for chunk in routing_agent.invoke(thread_id=thread.id, messages=[user_chat_message]):
    if hasattr(chunk, "content"):
        routing_response += str(chunk.content)

print("📋 Routing agent response:")
print(routing_response)

# Clean and parse the routing JSON
try:
    cleaned = re.sub(r"^```json\n|```$", "", routing_response.strip(), flags=re.MULTILINE)
    routing_dict = json.loads(cleaned)
    print("✅ Successfully parsed routing instructions")
except json.JSONDecodeError as e:
    print(f"❌ Error parsing routing JSON: {e}")
    # Fallback routing
    routing_dict = {"ChitChatAgent": user_message}

agent_outputs = {}

# Map agents
agent_map = {
    "CustomerInfoAgent": CustomerAgent,
    "LoyaltyProgramsAgent": loyalty_agent,
    "ChitChatAgent": chitchat_agent,
    "ChurnPredAgent": ChurnPredAgent
}

# Process each routed query
print("\n🤖 Processing queries with specialist agents...")
for agent_name, subquery in routing_dict.items():
    agent = agent_map.get(agent_name)
    
    if agent:
        print(f"\n📤 Sending to {agent_name}: {subquery}")
        
        agent_thread = await sk_client.agents.create_thread()
        user_subquery_message = ChatMessageContent(role=AuthorRole.USER, content=subquery)

        response_text = ""
        async for chunk in agent.invoke(thread_id=agent_thread.id, messages=[user_subquery_message]):
            if hasattr(chunk, "content"):
                response_text += str(chunk.content)

        print(f"📥 [{agent_name}] response:")
        print(response_text)
        print("-" * 40)

        agent_outputs[agent_name] = response_text

        # Add to synthesis thread
        await synthesis_agent.add_chat_message(
            thread_id=thread.id,
            message=ChatMessageContent(role=AuthorRole.ASSISTANT, content=response_text)
        )
    else:
        print(f"⚠️ Unknown agent: {agent_name}")

# Synthesize final response
print("\n🔗 Synthesizing final response...")
synthesis_input = json.dumps({
    "user_query": user_message, 
    "agent_responses": agent_outputs
}, indent=2)

synthesis_chat_message = ChatMessageContent(role=AuthorRole.USER, content=synthesis_input)

synth_response = ""
async for chunk in synthesis_agent.invoke(thread_id=thread.id, messages=[synthesis_chat_message]):
    if hasattr(chunk, "content"):
        synth_response += str(chunk.content)

print("\n" + "="*50)
print("🎯 FINAL SYNTHESIZED RESPONSE:")
print("="*50)
print(synth_response)
print("="*50)

### Deletes all existing agents from the system
Clean up agents to prevent accumulation in your Azure AI project

In [None]:
# Get the complete list of agents
print("🧹 Cleaning up agents...")

try:
    agents_response = await sk_client.agents.list_agents()

    # Delete all existing agents
    for agent in agents_response['data']:
        agent_id = agent['id']
        await sk_client.agents.delete_agent(agent_id)
        print(f"🗑️ Deleted agent: {agent['name']} (ID: {agent_id})")
    
    print("✅ All agents cleaned up successfully!")
    
except Exception as e:
    print(f"❌ Error during cleanup: {e}")