# Agent Router - Guide

This notebook explains all the Azure AI Agent components and architecture used in the **Agent Router** application. The Agent Router is an agentic routing system that uses Microsoft Purview to discover data assets and routes user queries to the appropriate specialized agents for processing.

## ⚠️ Prerequisites

Before running this notebook, you need the following Azure services deployed and configured:

### Required Azure Services
- **Azure AI Agent Service** - For creating and managing connected agents
- **Microsoft Purview** - Data catalog with configured data assets
- **Azure OpenAI** - Language model deployment
- **Bing Search** - For web search capabilities
- **Databricks with Genie** - For natural language to SQL conversion

### Configuration
Copy `.env.example` to `.env` and configure all required endpoints and connection IDs. See the example file for detailed configuration requirements.

## Architecture Overview

```mermaid
graph TB
    User[👤 User Query] --> Router{🎯 Routing Agent}
    
    Router --> Catalog[📋 Purview Catalog Search]
    Catalog --> CatalogResults{Assets Found?}
    
    CatalogResults -->|Yes - Genie Agent| GenieFunc[🧞 Genie Function Tool]
    CatalogResults -->|Yes - RAG Agent| RAGAgent[📚 RAG Connected Agent]
    CatalogResults -->|Yes - Fabric Agent| FabricAgent[🏢 Fabric Connected Agent]
    CatalogResults -->|No Assets Found| WebAgent[🌐 Web Connected Agent]
    
    GenieFunc --> Databricks[(🧱 Databricks<br/>Genie API)]
    RAGAgent --> VectorStore[(🔍 Vector Store<br/>Documents)]
    WebAgent --> BingSearch[(🔎 Bing<br/>Search API)]
    
    Databricks --> Results[📊 Formatted Results]
    VectorStore --> Results
    BingSearch --> Results
    
    Results --> Response[💬 User Response]
    
    subgraph "Azure AI Agent Service"
        Router
        RAGAgent
        WebAgent
        VectorStore
        BingSearch
    end
    
    subgraph "Function Tools"
        Catalog
        GenieFunc
    end
    
    subgraph "External Services"
        Databricks
    end
    
```

The system consists of several key components:

1. **Routing Agent** - Orchestrates query routing based on catalog search results (via Connected Agents in Azure AI Agent Service)
2. **Microsoft Purview Integration** - Searches data catalogs to find relevant data assets (REST API)
3. **Databricks Genie Agent** - Handles natural language to SQL conversion and data analysis 
5. **RAG Agent** - Handles document search and knowledge retrieval
6. **Web Search Agent** - Provides internet search capabilities via Bing


Let's explore each component in detail with working examples.

## 1. Import Required Libraries and Setup Environment

First, let's import all the necessary libraries and configure our environment for working with Azure AI Agents.

In [100]:
# Core Python libraries
import os
import json
import time
import logging
from typing import Dict, Any, Optional
from pathlib import Path

# Azure AI and Identity libraries
from azure.identity import DefaultAzureCredential, AzureCliCredential
from azure.ai.projects import AIProjectClient

# Azure AI Agents models and tools
from azure.ai.agents.models import (
    ConnectedAgentTool, FunctionTool, FabricTool, BingGroundingTool,
    FileSearchTool, FilePurpose
)

# HTTP requests for API calls
import requests

# Environment configuration
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully!")
print("🔧 Environment variables loaded")
print("📋 Logging configured")

✅ All libraries imported successfully!
🔧 Environment variables loaded
📋 Logging configured


## 2. Configure Azure AI Project Client

The Azure AI Project Client is the foundation for all agent interactions. It manages authentication, agent lifecycle, and provides the execution environment for our connected agents.

In [101]:
# Initialize Azure AI Project Client
try:
    # The client uses DefaultAzureCredential for authentication
    project_client = AIProjectClient(
        endpoint=os.environ.get("AZURE_AI_AGENT_ENDPOINT"),
        credential=DefaultAzureCredential()
    )
    
    print("✅ Azure AI Project Client initialized successfully!")
    
    # Verify connection by listing existing agents (if any)
    # Note: In a demo environment, this might show agents from previous runs
    print("\n🤖 Checking existing agents...")
    
except Exception as e:
    print(f"❌ Failed to initialize Azure AI Project Client: {str(e)}")
    print("💡 Make sure AZURE_AI_AGENT_ENDPOINT is set in your environment variables")

# Helper function to demonstrate agent lifecycle
def demo_agent_info(agent):
    """Display information about an agent"""
    print(f"   • Name: {agent.name}")
    print(f"   • Model: {agent.model}")
    print(f"   • Tools: {len(agent.tools) if agent.tools else 0} configured")

# Read .env file
env_path = Path('.') / '.env'
if env_path.exists():
    with open(env_path) as f:
        for line in f:
            if line.strip() and not line.startswith('#'):
                key, value = line.strip().split('=', 1)
                os.environ[key] = value
    print("✅ .env file loaded successfully")

logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING)
logging.getLogger('azure.identity._credentials.chained').setLevel(logging.WARNING)
logging.getLogger('azure.identity._internal.decorators').setLevel(logging.WARNING)


2025-09-13 09:10:27,964 - azure.identity._credentials.environment - INFO - No environment configuration found.
2025-09-13 09:10:27,966 - azure.identity._credentials.managed_identity - INFO - ManagedIdentityCredential will use IMDS
2025-09-13 09:10:27,966 - azure.identity._credentials.managed_identity - INFO - ManagedIdentityCredential will use IMDS


✅ Azure AI Project Client initialized successfully!

🤖 Checking existing agents...
✅ .env file loaded successfully


## 3. Create Microsoft Fabric Agent

The Microsoft Fabric Agent enables direct querying of structured data sources within Microsoft Fabric environments. It uses the FabricTool to access data warehouses, lakehouses, and other Fabric resources.

In [102]:
def create_fabric_agent():
    """
    Creates a Microsoft Fabric agent for querying structured data sources.
    
    The Fabric agent can:
    - Query data warehouses and lakehouses
    - Perform data analysis on structured datasets
    - Generate insights from business data
    """
    
    # Check if Fabric connection is configured
    fabric_connection_id = os.environ.get('FABRIC_CONNECTION_ID')
    fabric_enabled = os.getenv('ENABLE_FABRIC_AGENT', 'false').lower() == 'true'
    
    if not fabric_enabled or not fabric_connection_id:
        print("⚠️ Fabric Agent is disabled or not configured")
        print(f"   ENABLE_FABRIC_AGENT: {os.getenv('ENABLE_FABRIC_AGENT', 'false')}")
        print(f"   FABRIC_CONNECTION_ID: {'Set' if fabric_connection_id else 'Not set'}")
        return None
    
    try:
        # Create Fabric tool with connection
        fabric_tool = FabricTool(connection_id=fabric_connection_id)
        
        # Create the Fabric agent
        fabric_agent = project_client.agents.create_agent(
            model=os.environ.get("MODEL_DEPLOYMENT_NAME", "gpt-4"),
            name="fabric-agent",
            instructions="""You are a data analysis agent with access to Microsoft Fabric data sources.
            
            Your capabilities include:
            - Querying data warehouses and lakehouses
            - Performing data analysis and aggregations
            - Generating business insights from structured data
            - Creating visualizations and reports
            
            Always provide clear, actionable insights based on the data you analyze.""",
            tools=fabric_tool.definitions
        )
        
        print("✅ Microsoft Fabric Agent created successfully!")
        demo_agent_info(fabric_agent)
        
        return fabric_agent
        
    except Exception as e:
        print(f"❌ Failed to create Fabric Agent: {str(e)}")
        return None

def query_fabric_demo(agent, query: str) -> str:
    """Demonstrate querying Fabric data with the agent"""
    if not agent:
        return "Fabric agent not available"
    
    try:
        print(f"🔍 Querying Fabric with: {query}")
        
        # Create a thread for the conversation
        thread = project_client.agents.threads.create()
        
        # Add user message
        project_client.agents.messages.create(
            thread_id=thread.id, 
            role="user", 
            content=query
        )
        
        # Execute the query
        run = project_client.agents.runs.create_and_process(
            thread_id=thread.id, 
            agent_id=agent.id
        )
        
        if run.status == "completed":
            messages = list(project_client.agents.messages.list(thread_id=thread.id))
            response = messages[0].content[0].text.value if messages[0].content else "No response"
            print("✅ Query completed successfully!")
            return response
        else:
            print(f"❌ Query failed with status: {run.status}")
            return f"Query failed: {run.status}"
            
    except Exception as e:
        print(f"💥 Error during Fabric query: {str(e)}")
        return f"Error: {str(e)}"

# Create the Fabric agent
fabric_agent = create_fabric_agent()

# Example query (only runs if agent is available)
if fabric_agent:
    sample_query = "Can you provide an overview of the available data sources and their schemas?"
    print(f"\n📝 Sample Query: {sample_query}")
    # Note: Uncomment the next line to actually run the query
    response = query_fabric_demo(fabric_agent, sample_query)
    print(f"📊 Response: {response}")
else:
    print("\n💡 To enable the Fabric Agent, set ENABLE_FABRIC_AGENT=true and configure FABRIC_CONNECTION_ID")



⚠️ Fabric Agent is disabled or not configured
   ENABLE_FABRIC_AGENT: false
   FABRIC_CONNECTION_ID: Set

💡 To enable the Fabric Agent, set ENABLE_FABRIC_AGENT=true and configure FABRIC_CONNECTION_ID


## 4. Build Web Search Agent with Bing

The Web Search Agent provides internet search capabilities using Bing Search. It's used for queries that don't match any data assets in the Purview catalog, such as current events, weather, or general knowledge questions.

In [110]:
def create_web_search_agent():
    """
    Creates a web search agent using Bing Search for internet queries.
    
    The Web Search agent can:
    - Search the internet for current information
    - Answer general knowledge questions
    - Provide current events and news
    - Handle queries not covered by data assets
    """
    
    bing_connection_id = os.environ.get('BING_CONNECTION_ID')
    
    if not bing_connection_id:
        print("❌ Web Search Agent cannot be created - BING_CONNECTION_ID not configured")
        return None
    
    try:
        # Create Bing search tool
        bing_tool = BingGroundingTool(connection_id=bing_connection_id)
        
        # Create the web search agent
        web_agent = project_client.agents.create_agent(
            model=os.environ.get("MODEL_DEPLOYMENT_NAME", "gpt-4"),
            name="web-agent",
            instructions="""You are a web search agent that finds current information from the internet using Bing Search.
            
            Your capabilities include:
            - Searching for current events and news
            - Finding general knowledge information
            - Answering questions about weather, sports, entertainment
            - Providing real-time information not available in local data sources
            
            Always provide accurate, up-to-date information with proper citations from your search results.""",
            tools=bing_tool.definitions
        )
        
        print("✅ Web Search Agent created successfully!")
        demo_agent_info(web_agent)
        
        return web_agent
        
    except Exception as e:
        print(f"❌ Failed to create Web Search Agent: {str(e)}")
        return None

def search_web_demo(agent, query: str) -> str:
    """Demonstrate web searching with the agent"""
    if not agent:
        return "Web search agent not available"
    
    try:
        print(f"🌐 Searching web for: {query}")
        
        # Create a thread for the conversation
        thread = project_client.agents.threads.create()
        
        # Add user message
        project_client.agents.messages.create(
            thread_id=thread.id,
            role="user",
            content=query
        )
        
        # Execute the search
        run = project_client.agents.runs.create_and_process(
            thread_id=thread.id,
            agent_id=agent.id
        )
        
        if run.status == "completed":
            messages = list(project_client.agents.messages.list(thread_id=thread.id))
            response = messages[0].content[0].text.value if messages[0].content else "No response"
            print("✅ Web search completed successfully!")
            return response
        else:
            print(f"❌ Web search failed with status: {run.status}")
            return f"Search failed: {run.status}"
            
    except Exception as e:
        print(f"💥 Error during web search: {str(e)}")
        return f"Error: {str(e)}"

# Create the web search agent
web_agent = create_web_search_agent()

# Example queries
if web_agent:
    sample_queries = [
        "What is the current weather in Seattle?"
    ]
    
    print(f"\n📝 Sample Queries:")
    for i, query in enumerate(sample_queries, 1):
        print(f"   {i}. {query}")
    
    selected_query = sample_queries[0]
    print(f"\n🔍 Running search: {selected_query}")
    response = search_web_demo(web_agent, selected_query)
    print(f"🌐 Search Result:\n{response}")
else:
    print("\n💡 To enable the Web Search Agent, configure BING_CONNECTION_ID in your environment variables")

2025-09-13 09:36:14,222 - azure.identity._credentials.default - INFO - DefaultAzureCredential acquired a token from AzureCliCredential


✅ Web Search Agent created successfully!
   • Name: web-agent
   • Model: gpt-4.1-mini
   • Tools: 1 configured

📝 Sample Queries:
   1. What is the current weather in Seattle?

🔍 Running search: What is the current weather in Seattle?
🌐 Searching web for: What is the current weather in Seattle?
✅ Web search completed successfully!
🌐 Search Result:
The current weather in Seattle is mostly cloudy with a temperature around 75°F (about 24°C) as of early evening. The wind is light from the North-Northwest at about 2 mph, and the air quality is currently poor, which means sensitive groups should limit time outside if feeling symptoms like difficulty breathing or throat irritation. Tonight, the sky will be clear to partly cloudy with low temperatures around 63°F. Tomorrow, it is expected to be mostly sunny and pleasant with a high near 78°F【3:0†source】【3:1†source】【3:4†source】.


## 5. Implement RAG Agent for Document Search

The Retrieval-Augmented Generation (RAG) Agent enables semantic search across uploaded documents. It creates vector embeddings of documents and can answer questions by retrieving relevant content and generating contextualized responses.

In [104]:
def create_rag_agent():
    """
    Creates a RAG (Retrieval-Augmented Generation) agent for document search and analysis.
    
    The RAG agent can:
    - Search through uploaded documents using semantic similarity
    - Answer questions based on document content
    - Provide citations and references to source documents
    - Handle complex document queries with context
    """
    
    try:
        # Download a sample document for demonstration
        doc_url = "https://download.microsoft.com/documents/uk/athome/SM_Learn_5MinEncarta_F.pdf"
        data_dir = Path("./data")
        data_dir.mkdir(exist_ok=True)
        file_path = data_dir / "encarta_guide.pdf"
        
        if not file_path.exists():
            print("📥 Downloading sample document...")
            response = requests.get(doc_url)
            with open(file_path, 'wb') as f:
                f.write(response.content)
            print(f"✅ Document downloaded: {file_path}")
        else:
            print(f"📄 Using existing document: {file_path}")
        
        # Upload file to Azure AI
        print("☁️ Uploading file to Azure AI...")
        file = project_client.agents.files.upload_and_poll(
            file_path=str(file_path), 
            purpose=FilePurpose.AGENTS
        )
        print(f"✅ File uploaded")
        
        # Create vector store for semantic search
        print("🔍 Creating vector store...")
        vector_store = project_client.agents.vector_stores.create_and_poll(
            file_ids=[file.id], 
            name="rag_vectorstore"
        )
        print(f"✅ Vector store created")
        
        # Create file search tool
        file_search = FileSearchTool(vector_store_ids=[vector_store.id])
        
        # Create the RAG agent
        rag_agent = project_client.agents.create_agent(
            model=os.environ.get("MODEL_DEPLOYMENT_NAME", "gpt-4"),
            name="rag-agent",
            instructions="""You are a RAG (Retrieval-Augmented Generation) agent that searches documents to answer questions.
            
            Your capabilities include:
            - Semantic search through uploaded documents
            - Providing accurate answers based on document content
            - Including citations and references to source material
            - Handling complex queries that require multiple document sections
            
            Always cite your sources and indicate when information is not found in the documents.""",
            tools=file_search.definitions,
            tool_resources=file_search.resources,
        )
        
        print("✅ RAG Agent created successfully!")
        demo_agent_info(rag_agent)
        
        return rag_agent, {"vector_store": vector_store, "file": file}
        
    except Exception as e:
        print(f"❌ Failed to create RAG Agent: {str(e)}")
        return None, {}

def query_documents_demo(agent, query: str) -> str:
    """Demonstrate document querying with the RAG agent"""
    if not agent:
        return "RAG agent not available"
    
    try:
        print(f"📚 Querying documents with: {query}")
        
        # Create a thread for the conversation
        thread = project_client.agents.threads.create()
        
        # Add user message
        project_client.agents.messages.create(
            thread_id=thread.id,
            role="user",
            content=query
        )
        
        # Execute the query
        run = project_client.agents.runs.create_and_process(
            thread_id=thread.id,
            agent_id=agent.id
        )
        
        if run.status == "completed":
            messages = list(project_client.agents.messages.list(thread_id=thread.id))
            response = messages[0].content[0].text.value if messages[0].content else "No response"
            print("✅ Document query completed successfully!")
            return response
        else:
            print(f"❌ Document query failed with status: {run.status}")
            return f"Query failed: {run.status}"
            
    except Exception as e:
        print(f"💥 Error during document query: {str(e)}")
        return f"Error: {str(e)}"

# Create the RAG agent
rag_agent, rag_cleanup_resources = create_rag_agent()

# Example queries
if rag_agent:
    sample_queries = [
        "What is the price of Encarta?"
    ]
    
    print(f"\n📝 Sample Document Queries:")
    for i, query in enumerate(sample_queries, 1):
        print(f"   {i}. {query}")
    
    selected_query = sample_queries[0]
    print(f"\n🔍 Running document query: {selected_query}")
    response = query_documents_demo(rag_agent, selected_query)
    print(f"📚 Document Response:\n{response}")
else:
    print("\n💡 RAG Agent creation failed - check your Azure AI configuration")

📄 Using existing document: data/encarta_guide.pdf
☁️ Uploading file to Azure AI...
✅ File uploaded
🔍 Creating vector store...
✅ File uploaded
🔍 Creating vector store...
✅ Vector store created
✅ Vector store created
✅ RAG Agent created successfully!
   • Name: rag-agent
   • Model: gpt-4.1-mini
   • Tools: 1 configured

📝 Sample Document Queries:
   1. What is the price of Encarta?

🔍 Running document query: What is the price of Encarta?
📚 Querying documents with: What is the price of Encarta?
✅ RAG Agent created successfully!
   • Name: rag-agent
   • Model: gpt-4.1-mini
   • Tools: 1 configured

📝 Sample Document Queries:
   1. What is the price of Encarta?

🔍 Running document query: What is the price of Encarta?
📚 Querying documents with: What is the price of Encarta?
✅ Document query completed successfully!
📚 Document Response:
The price of Microsoft Encarta ranges from £29.99 to £69.99, depending on the version or package you choose. There are several versions available, from Stand

## 6. Setup Databricks Genie Integration

The Databricks Genie integration enables natural language to SQL conversion and data analysis. Genie can understand natural language queries and convert them into SQL queries, execute them, and return formatted results with data visualizations.

In [105]:
class GenieService:
    """
    Service class for integrating with Databricks Genie API for natural language data analysis.
    
    Genie capabilities:
    - Natural language to SQL conversion
    - Automatic query execution
    - Data visualization generation
    - Results formatting and presentation
    """
    
    def __init__(self):
        # Strip quotes from environment variables if present
        self.databricks_instance = os.getenv('DATABRICKS_INSTANCE', '').strip('"').strip("'")
        self.genie_space_id = os.getenv('GENIE_SPACE_ID', '').strip('"').strip("'")
        self.auth_token = os.getenv('DATABRICKS_AUTH_TOKEN', '').strip('"').strip("'")
        
    def is_configured(self) -> bool:
        """Check if Genie service is properly configured."""
        return all([self.databricks_instance, self.genie_space_id, self.auth_token])
    
    def query_genie(self, query: str) -> dict:
        """
        Query Databricks Genie API for natural language data analysis.
        
        Process:
        1. Start a conversation with Genie
        2. Poll for completion
        3. Extract generated SQL and results
        4. Format response with data samples
        """
        
        print(f"🧞 Querying Genie with: {query}")
        
        if not self.is_configured():
            return {
                "status": "error",
                "message": "Missing Genie configuration. Check DATABRICKS_INSTANCE, GENIE_SPACE_ID, and DATABRICKS_AUTH_TOKEN."
            }
        
        try:
            # Start conversation with Genie
            start_url = f"https://{self.databricks_instance}/api/2.0/genie/spaces/{self.genie_space_id}/start-conversation"
            headers = {
                'Authorization': f'Bearer {self.auth_token}',
                'Content-Type': 'application/json'
            }
            
            print("📞 Starting Genie conversation...")
            
            start_response = requests.post(start_url, headers=headers, json={"content": query})
            start_response.raise_for_status()
            start_data = start_response.json()
            
            conversation_id = start_data['conversation']['id']
            message_id = start_data['message']['id']
            
            # Poll for completion
            status_url = f"https://{self.databricks_instance}/api/2.0/genie/spaces/{self.genie_space_id}/conversations/{conversation_id}/messages/{message_id}"
            
            for attempt in range(12):  # Poll for up to 1 minute (5s * 12)
                print(f"⏰ Polling attempt {attempt + 1}/12...")
                time.sleep(5)
                
                status_response = requests.get(status_url, headers=headers)
                status_response.raise_for_status()
                status_data = status_response.json()
                
                current_status = status_data.get('status', 'UNKNOWN')
                print(f"📊 Current status: {current_status}")
                
                if current_status in ['COMPLETED', 'FAILED', 'CANCELLED']:
                    break
            
            # Process results
            if status_data.get('status') == 'COMPLETED':
                return self._process_genie_results(status_data, conversation_id, message_id)
            else:
                return {
                    "status": "error",
                    "message": f"Genie query failed with status: {status_data.get('status')}",
                    "details": status_data
                }
                
        except Exception as e:
            return {
                "status": "error",
                "message": f"Genie API error: {str(e)}",
                "exception_type": type(e).__name__
            }
    
    def _process_genie_results(self, status_data: dict, conversation_id: str, message_id: str) -> dict:
        """Process and format Genie results - return only the response text"""
        attachments = status_data.get('attachments', [])
        print(f"📎 Found {len(attachments)} attachments")
        
        text_response = "No response generated"
        generated_query = None
        query_description = None
        row_count = None
        
        # Extract response and query information
        for attachment in attachments:
            # Extract text response
            if 'text' in attachment and attachment['text']:
                text_content = attachment['text'].get('content', '')
                if text_content:
                    text_response = text_content
            
            # Extract query information
            if 'query' in attachment and attachment['query']:
                query_info = attachment['query']
                generated_query = query_info.get('query', '')
                query_description = query_info.get('description', '')
                query_result_metadata = query_info.get('query_result_metadata', {})
                row_count = query_result_metadata.get('row_count')
        
        # Format response with query details if available
        if generated_query and text_response == "No response generated":
            if query_description:
                text_response = f"{query_description}\n\nGenerated SQL:\n{generated_query}"
            else:
                text_response = f"Generated SQL query:\n{generated_query}"
            
            if row_count is not None:
                text_response += f"\n\nTotal rows: {row_count}"
        
        return {
            "status": "success",
            "response": text_response,
            "generated_query": generated_query,
            "query_description": query_description,
            "row_count": row_count,
            "attachments_count": len(attachments)
        }

# Initialize Genie service
genie_service = GenieService()

# Check configuration
print("🧞 Databricks Genie Service Configuration:")
print(f"   Service Ready: {'✅ Yes' if genie_service.is_configured() else '❌ No'}")

# Example queries for Genie
if genie_service.is_configured():
    sample_queries = [
        "Give me the single longest taxi rides in New York City.",
    ]
    
    print(f"\n📝 Sample Genie Queries:")
    for i, query in enumerate(sample_queries, 1):
        print(f"   {i}. {query}")
    
    # Test the connection first
    selected_query = sample_queries[0]
    print(f"\n🔍 Running Genie query: {selected_query}")
    result = genie_service.query_genie(selected_query)
    print(f"🧞 Genie Result:\n{json.dumps(result, indent=2)}")
else:
    print("\n💡 To enable Genie integration, configure DATABRICKS_INSTANCE, GENIE_SPACE_ID, and DATABRICKS_AUTH_TOKEN")

🧞 Databricks Genie Service Configuration:
   Service Ready: ✅ Yes

📝 Sample Genie Queries:
   1. Give me the single longest taxi rides in New York City.

🔍 Running Genie query: Give me the single longest taxi rides in New York City.
🧞 Querying Genie with: Give me the single longest taxi rides in New York City.
📞 Starting Genie conversation...
⏰ Polling attempt 1/12...
⏰ Polling attempt 1/12...
📊 Current status: COMPLETED
📎 Found 1 attachments
🧞 Genie Result:
{
  "status": "success",
  "response": "The user wants to find the single longest taxi ride in New York City based on the distance traveled.\n\nGenerated SQL:\nSELECT * FROM `samples`.`nyctaxi`.`trips` WHERE `trip_distance` IS NOT NULL ORDER BY `trip_distance` DESC LIMIT 1\n\nTotal rows: 1",
  "generated_query": "SELECT * FROM `samples`.`nyctaxi`.`trips` WHERE `trip_distance` IS NOT NULL ORDER BY `trip_distance` DESC LIMIT 1",
  "query_description": "The user wants to find the single longest taxi ride in New York City based on the 

## 7. Create Purview Catalog Search Function

The Purview Catalog Search is the core routing mechanism that discovers data assets and determines which agents should handle specific queries. It searches Microsoft Purview for relevant data sources and extracts metadata about available agents and contacts.

In [106]:
def parse_agent_from_description(description: str) -> Optional[str]:
    """Extract agent name from asset description using regex pattern"""
    import re
    if not description:
        return None
    
    # Look for patterns like "agent: genie_agent" or "agent:genie_agent" 
    match = re.search(r'agent:\s*(\w+)', description, re.IGNORECASE)
    if match:
        return match.group(1)
    
    # Also check for patterns without colon like "genie_agent" at end of description
    match = re.search(r'\b(genie_agent|fabric_agent|rag_agent|web_agent)\b', description, re.IGNORECASE)
    if match:
        return match.group(1)
    
    return None

def search_purview_catalog(query: str) -> dict:
    """
    Search Microsoft Purview catalog for data assets and extract agent/contact information.
    
    The search process:
    1. Authenticate with Azure CLI credentials
    2. Query Purview catalog API with filters for relevant asset types
    3. Extract asset metadata, descriptions, and associated agents/contacts
    4. Return structured results for routing decisions
    """
    
    purview_endpoint = os.getenv('PURVIEW_ENDPOINT')
    if not purview_endpoint:
        return {
            "status": "error",
            "message": "PURVIEW_ENDPOINT not configured",
            "assets_found": 0,
            "results": []
        }
    
    # Static mappings for demonstration (in production, these would come from a database)
    contacts = {
        "c53c736b-8469-409c-9dcc-b3a61953d4dd": "Aymen Furter (aymen.furter@microsoft.com)"
    }
    
    try:
        # Get authentication token
        token = AzureCliCredential().get_token("https://purview.azure.net/.default")
        
        # Prepare search request
        search_url = f"{purview_endpoint}/datamap/api/search/query?api-version=2023-09-01"
        search_payload = {
            "limit": 10,
            "keywords": query,
            "filter": {
                "or": [
                    {"entityType": "azure_storage_account"},
                    {"entityType": "fabric_lakehouse"},
                    {"entityType": "databricks_schema"}
                ]
            }
        }
        headers = {"Authorization": f"Bearer {token.token}"}
        
        # Execute search
        search_response = requests.post(search_url, json=search_payload, headers=headers)
        search_response.raise_for_status()
        
        search_data = search_response.json()
        assets = search_data.get('value', [])
        
        # Process results
        results = []
        for asset in assets:
            description = asset.get('userDescription', 'No description')
            print(f"📝 Asset: {asset['displayText']} - Description: '{description}'")
            
            agent_name = parse_agent_from_description(description)
                
            contact_id = asset.get('contact', [{}])[0].get('id') if asset.get('contact') else None
            contact_info = contacts.get(contact_id) if contact_id else None
            
            results.append({
                "name": asset['displayText'],
                "description": description,
                "asset_id": asset['id'],
                "entity_type": asset.get('entityType', 'Unknown'),
                "connected_agent": agent_name,
                "contact": contact_info
            })
        
        print(f"✅ Found {len(results)} assets in Purview catalog")
        
        return {
            "status": "success",
            "assets_found": len(results),
            "results": results,
            "search_query": query
        }
        
    except Exception as e:
        print(f"❌ Error searching Purview catalog: {str(e)}")
        return {
            "status": "error",
            "message": f"Failed to search catalog: {str(e)}",
            "assets_found": 0,
            "results": []
        }

def demo_catalog_search():
    """Demonstrate catalog search functionality"""
    
    print("📋 Purview Catalog Search Demonstration\n")
    
    # Test queries representing different types of data requests
    test_queries = [
        "taxi",           # Should find transportation data
        "software",       # Should find software-related assets
        "weather"         # Should find no relevant assets (triggers web search)
    ]
    
    for query in test_queries:
        print(f"🔍 Testing query: '{query}'")
        results = search_purview_catalog(query)
        
        print(f"   Status: {results['status']}")
        print(f"   Assets found: {results['assets_found']}")
        
        if results['assets_found'] > 0:
            for i, asset in enumerate(results['results'][:3], 1):  # Show first 3 results
                print(f"   {i}. {asset['name']}")
                print(f"      Type: {asset['entity_type']}")
                print(f"      Agent: {asset['connected_agent'] or 'None'}")
                print(f"      Contact: {asset['contact'] or 'None'}")
        
        print("   " + "-" * 50)
        print()

# Run the demonstration
demo_catalog_search()

📋 Purview Catalog Search Demonstration

🔍 Testing query: 'taxi'
📝 Asset: Taxi Data (Seattle) - Description: ''
📝 Asset: Taxi Data (New York City) - Description: 'agent: genie_agent'
✅ Found 2 assets in Purview catalog
   Status: success
   Assets found: 2
   1. Taxi Data (Seattle)
      Type: databricks_schema
      Agent: None
      Contact: Aymen Furter (aymen.furter@microsoft.com)
   2. Taxi Data (New York City)
      Type: databricks_schema
      Agent: genie_agent
      Contact: None
   --------------------------------------------------

🔍 Testing query: 'software'
📝 Asset: Taxi Data (Seattle) - Description: ''
📝 Asset: Taxi Data (New York City) - Description: 'agent: genie_agent'
✅ Found 2 assets in Purview catalog
   Status: success
   Assets found: 2
   1. Taxi Data (Seattle)
      Type: databricks_schema
      Agent: None
      Contact: Aymen Furter (aymen.furter@microsoft.com)
   2. Taxi Data (New York City)
      Type: databricks_schema
      Agent: genie_agent
      Contact

## 8. Build Main Routing Agent with Function Tools

The Main Routing Agent orchestrates the entire system by combining function tools (catalog search, Genie queries) with connected agents (Fabric, Web, RAG). It implements sophisticated routing logic to determine the best agent for each query.

In [107]:
# Create function tools for catalog search and Genie queries
def search_catalog_wrapper(query: str) -> str:
    result = search_purview_catalog(query)
    return json.dumps(result)

def query_genie_wrapper(query: str) -> str:
    result = genie_service.query_genie(query)
    return json.dumps(result)
        

def create_main_routing_agent():
    """
    Create the main routing agent that orchestrates query handling across all specialized agents.
    
    The routing agent:
    1. Always starts with Purview catalog search
    2. Analyzes catalog results to determine routing strategy
    3. Calls appropriate function tools (Genie) or connected agents (Fabric, Web, RAG)
    4. Handles edge cases and provides fallback options
    """
    
    # Collect all available connected agents
    connected_agents = {}
    if fabric_agent:
        connected_agents["fabric_agent"] = fabric_agent
    if web_agent:
        connected_agents["web_agent"] = web_agent
    if rag_agent:
        connected_agents["rag_agent"] = rag_agent
    
    if not connected_agents:
        print("⚠️ No connected agents available for routing")
        return None
    
    try:

        # Create function tool with both search and Genie capabilities
        function_tool = FunctionTool(functions={search_catalog_wrapper, query_genie_wrapper})
        
        # Create connected agent tools
        connected_tools = []
        for agent_name, agent in connected_agents.items():
            connected_tool = ConnectedAgentTool(
                id=agent.id,
                name=agent_name,
                description=f"Delegate to {agent_name} based on the query and catalog results"
            )
            connected_tools.append(connected_tool)
        
        # Combine all tools
        all_tools = function_tool.definitions + [tool.definitions[0] for tool in connected_tools]
        
        # Create the main routing agent with comprehensive instructions
        routing_instructions = """You are a routing agent for Microsoft Purview. 
        Your role is to help users find and access the right data sources and agents.

        WORKFLOW:
        1. ALWAYS start by using the search_catalog_wrapper function to find relevant data assets via Purview
        2. Analyze the catalog results to determine the best routing approach:
           - If genie agent is mentioned in asset descriptions → use query_genie_wrapper function
           - If RAG agent is associated with relevant assets → call rag_agent
           - If Fabric agent is associated with relevant assets → call fabric_agent
           - If no relevant data assets found → use web_agent for general queries
           - If only contact info is available → provide contact details

        ROUTING DECISION LOGIC:
        - Data analysis queries with "genie" in catalog results → query_genie_wrapper
        - Document/knowledge queries with RAG assets → rag_agent
        - Structured data queries with Fabric assets → fabric_agent
        - Weather, current events, general knowledge → web_agent
        - Multiple matching assets → ask clarifying questions

        CRITICAL DATA PRESENTATION RULES:
        - When query_genie_wrapper returns successful results, include ALL response text
        - Present complete data tables, SQL queries, and data samples
        - Never summarize or filter data results from query_genie_wrapper
        - Always provide complete responses from connected agents

        IMPORTANT:
        - NEVER answer questions yourself - always route to functions or agents
        - Always search the catalog first to guide routing decisions
        - Provide contact information when agents aren't available
        - Present all data results directly to users without modification"""
        
        main_agent = project_client.agents.create_agent(
            model=os.environ.get("MODEL_DEPLOYMENT_NAME", "gpt-4"),
            name="purview_routing_agent",
            instructions=routing_instructions,
            tools=all_tools
        )
        
        print("✅ Main Routing Agent created successfully!")
        demo_agent_info(main_agent)
        print(f"   Connected Agents: {list(connected_agents.keys())}")
        print(f"   Function Tools: search_catalog_wrapper, query_genie_wrapper")
        
        return main_agent, connected_agents
        
    except Exception as e:
        print(f"❌ Failed to create Main Routing Agent: {str(e)}")
        return None, {}

# Create the main routing agent
main_agent, connected_agents_dict = create_main_routing_agent()

✅ Main Routing Agent created successfully!
   • Name: purview_routing_agent
   • Model: gpt-4.1-mini
   • Tools: 4 configured
   Connected Agents: ['web_agent', 'rag_agent']
   Function Tools: search_catalog_wrapper, query_genie_wrapper


## 9. Test Agent Interactions and Routing Logic

Now let's test how queries are processed through the complete system, from initial catalog search through to specialized agent execution.

In [108]:
def test_routing_system(agent, query: str) -> dict:
    
    if not agent:
        return {"error": "Main routing agent not available"}
    
    try:
        print(f"🔍 Processing query through routing system: '{query}'")
        
        # Create a conversation thread
        thread = project_client.agents.threads.create()
        
        # Add user message
        project_client.agents.messages.create(
            thread_id=thread.id,
            role="user", 
            content=query
        )
        
        # Start execution
        run = project_client.agents.runs.create(
            thread_id=thread.id,
            agent_id=agent.id
        )
        
        # Track execution details
        tools_called = []
        agents_called = []
        
        # Monitor execution and handle tool calls
        while run.status in ["queued", "in_progress", "requires_action"]:
            time.sleep(2)
            run = project_client.agents.runs.get(thread_id=thread.id, run_id=run.id)
            
            if run.status == "requires_action":
                tool_outputs = []
                
                for tool_call in run.required_action.submit_tool_outputs.tool_calls:
                    if hasattr(tool_call, 'function'):
                        func_name = tool_call.function.name
                        args = json.loads(tool_call.function.arguments)
                        
                        if func_name == "search_catalog_wrapper":
                            search_query = args.get("query", "")
                            output = search_catalog_wrapper(search_query)
                            tools_called.append(f"search_catalog({search_query})")
                        
                        elif func_name == "query_genie_wrapper":
                            genie_query = args.get("query", "")
                            output = query_genie_wrapper(genie_query)
                            tools_called.append(f"query_genie({genie_query})")
                        
                        else:
                            output = json.dumps({"error": f"Unknown function: {func_name}"})
                            tools_called.append(f"unknown_function({func_name})")
                        
                        tool_outputs.append({
                            "tool_call_id": tool_call.id,
                            "output": output
                        })
                
                if tool_outputs:
                    project_client.agents.runs.submit_tool_outputs(
                        thread_id=thread.id,
                        run_id=run.id,
                        tool_outputs=tool_outputs
                    )
        
        # Extract connected agent calls from run steps
        if run.status == "completed":
            run_steps = list(project_client.agents.run_steps.list(thread_id=thread.id, run_id=run.id))
            
            for step in run_steps:
                if hasattr(step, 'step_details') and hasattr(step.step_details, 'tool_calls'):
                    for tool_call in step.step_details.tool_calls:
                        if hasattr(tool_call, 'type') and tool_call.type == 'connected_agent':
                            if hasattr(tool_call, 'connected_agent'):
                                agent_name = tool_call.connected_agent.get('name', 'unknown')
                                agents_called.append(agent_name)
        
        # Get final response
        response_text = "No response generated"
        if run.status == "completed":
            messages = list(project_client.agents.messages.list(thread_id=thread.id))
            for message in messages:
                if message.role == "assistant":
                    response_text = message.content[0].text.value if message.content else "No response"
                    break
        
        return {
            "status": "success",
            "query": query,
            "response": response_text,
            "run_status": run.status,
            "tools_called": tools_called,
            "agents_called": agents_called,
            "thread_id": thread.id
        }
        
    except Exception as e:
        return {
            "status": "error",
            "query": query,
            "error": str(e),
            "response": f"Error processing query: {str(e)}"
        }

def run_test():
    if not main_agent:
        print("❌ Cannot run tests - main routing agent not available")
        return
    
    test_queries = [
        {
            "query": "What is the cost of the software Microsoft Encarta?",
            "expected_routing": "Catalog search → RAG o",
        },
        {
            "query": "What are the top taxi trip distances?",
            "expected_routing": "Catalog search → Genie",
        },
        {
            "query": "What's the current weather in New York?",
            "expected_routing": "Catalog search → Web agent",
        }
    ]
    
    results = []
    
    for i, test_case in enumerate(test_queries, 1):
        print(f"   Query: {test_case['query']}")
        print(f"   Expected: {test_case['expected_routing']}")
        
        # Note: Uncomment to actually run the test
        result = test_routing_system(main_agent, test_case['query'])
        results.append(result)
        # 
        print(f"   Status: {result['status']}")
        print(f"   Tools Called: {result.get('tools_called', [])}")
        print(f"   Agents Called: {result.get('agents_called', [])}")
        print(f"   Response Preview: {result['response'][:100]}...")
        
        print("   " + "-" * 60)
        print()
    
    print("⚠️ Note: Tests require proper configuration of all services (Purview, Genie, Bing, etc.)")

run_test()

   Query: What is the cost of the software Microsoft Encarta?
   Expected: Catalog search → RAG o
🔍 Processing query through routing system: 'What is the cost of the software Microsoft Encarta?'
📝 Asset: Software Product Information (Vector Store) - Description: 'Information about software products such as encarta. agent: rag_agent'
📝 Asset: Stock Information - Description: 'No description'
📝 Asset: Taxi Data (Seattle) - Description: ''
📝 Asset: ContosoSalesLakehouse - Description: 'Contains up to date sales information'
📝 Asset: Billing Information (Vector Store) - Description: 'agent: rag_agent'
📝 Asset: Taxi Data (New York City) - Description: 'agent: genie_agent'
📝 Asset: tpch - Description: 'No description'
✅ Found 7 assets in Purview catalog
📝 Asset: Software Product Information (Vector Store) - Description: 'Information about software products such as encarta. agent: rag_agent'
📝 Asset: Stock Information - Description: 'No description'
📝 Asset: Taxi Data (Seattle) - Description:

## Cleanup

This notebook has demonstrated all the key components. Let's perform proper cleanup of Azure resources.

In [109]:
def cleanup_azure_resources():
    print("🧹 Cleaning up Azure resources...\\n")
    
    cleanup_tasks = []
    
    try:
        # Clean up agents
        agents_to_cleanup = []
        if main_agent:
            agents_to_cleanup.append(("Main Routing Agent", main_agent.id))
        if fabric_agent:
            agents_to_cleanup.append(("Fabric Agent", fabric_agent.id))
        if web_agent:
            agents_to_cleanup.append(("Web Agent", web_agent.id))
        if rag_agent:
            agents_to_cleanup.append(("RAG Agent", rag_agent.id))
        
        for agent_name, agent_id in agents_to_cleanup:
            try:
                project_client.agents.delete_agent(agent_id)
                cleanup_tasks.append(f"✅ Deleted {agent_name}")
            except Exception as e:
                cleanup_tasks.append(f"❌ Failed to delete {agent_name}: {str(e)}")
        
        # Clean up RAG resources
        if rag_cleanup_resources:
            if rag_cleanup_resources.get("vector_store"):
                try:
                    project_client.agents.vector_stores.delete(rag_cleanup_resources["vector_store"].id)
                    cleanup_tasks.append("✅ Deleted vector store")
                except Exception as e:
                    cleanup_tasks.append(f"❌ Failed to delete vector store: {str(e)}")
            
            if rag_cleanup_resources.get("file"):
                try:
                    project_client.agents.files.delete(rag_cleanup_resources["file"].id)
                    cleanup_tasks.append("✅ Deleted uploaded file")
                except Exception as e:
                    cleanup_tasks.append(f"❌ Failed to delete file: {str(e)}")
    
    except Exception as e:
        cleanup_tasks.append(f"❌ General cleanup error: {str(e)}")
    
    # Print cleanup results
    for task in cleanup_tasks:
        print(task)
    
    print("\\n🎯 Cleanup completed!")
    print("💡 Note: Some resources may have been created outside this notebook")

# Note: Uncomment the following line to actually perform cleanup
cleanup_azure_resources()

🧹 Cleaning up Azure resources...\n
✅ Deleted Main Routing Agent
✅ Deleted Web Agent
✅ Deleted RAG Agent
✅ Deleted vector store
✅ Deleted uploaded file
\n🎯 Cleanup completed!
💡 Note: Some resources may have been created outside this notebook
✅ Deleted Main Routing Agent
✅ Deleted Web Agent
✅ Deleted RAG Agent
✅ Deleted vector store
✅ Deleted uploaded file
\n🎯 Cleanup completed!
💡 Note: Some resources may have been created outside this notebook
