# Agentic retrieval using Azure Blob Storage Knowledge Source

Use this notebook to create an agentic retrieval pipeline from documents stored in Azure Blob Storage.

In this walkthrough, you will:

+ Upload documents to Azure Blob Storage
+ Create a knowledge source from Blob Storage with automatic ingestion
+ Create a knowledge base for intelligent query planning
+ Create a Foundry agent to determine when queries are needed
+ Create an MCP Tool Connection for authentication
+ Start a chat with the agent

This notebook demonstrates how to automatically chunk and index documents from blob storage.

## Prerequisites

+ Azure AI Search, basic tier or higher, in [any region that supports semantic ranker](https://learn.microsoft.com/azure/search/search-region-support#azure-public-regions).

+ Azure Blob Storage account with a container for documents.

+ Azure OpenAI, and you should have an **Azure AI Developer** role assignment to create a Foundry project.

+ An [Azure AI agent and Foundry project](https://learn.microsoft.com/azure/ai-services/agents/quickstart?pivots=ai-foundry-portal), created in the Azure AI Foundry portal.

+ A deployment of a [supported model](https://learn.microsoft.com/azure/search/search-agentic-retrieval-how-to-create#supported-models) in your Foundry project. This notebook uses gpt-4o-mini and text-embedding-3-large.

We recommend creating a virtual environment to run this sample code.

## Install required packages

Install the Azure SDK packages needed for this notebook.

In [None]:
%pip install azure-search-documents==11.7.0b2
%pip install azure-identity
%pip install azure-ai-projects
%pip install azure-mgmt-cognitiveservices
%pip install azure-storage-blob
%pip install python-dotenv

## Load connections

Load the environment variables to set up connections and object names.

In [None]:
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
from azure.mgmt.core.tools import parse_resource_id
import os

load_dotenv(override=True)

project_endpoint = os.environ["PROJECT_ENDPOINT"]
project_resource_id = os.environ["PROJECT_RESOURCE_ID"]
project_connection_name = os.getenv("PROJECT_CONNECTION_NAME", "blobknowledgeconnection")
agent_model = os.getenv("AGENT_MODEL", "gpt-4o-mini")
agent_name = os.getenv("AGENT_NAME", "blob-docs-agent")
endpoint = os.environ["AZURE_SEARCH_ENDPOINT"]
search_api_key = os.environ["AZURE_SEARCH_API_KEY"]
credential = DefaultAzureCredential()
knowledge_source_name = os.getenv("AZURE_SEARCH_KNOWLEDGE_SOURCE_NAME", "blob-docs-source")
base_name = os.getenv("AZURE_SEARCH_AGENT_NAME", "blob-docs-base")
azure_openai_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]
azure_openai_gpt_deployment = os.getenv("AZURE_OPENAI_GPT_DEPLOYMENT", "gpt-4o-mini")
azure_openai_gpt_model = os.getenv("AZURE_OPENAI_GPT_MODEL", "gpt-4o-mini")
azure_openai_embedding_deployment = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-3-large")
azure_openai_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_MODEL", "text-embedding-3-large")
blob_connection_string = os.environ["AZURE_BLOB_CONNECTION_STRING"]
blob_container_name = os.getenv("AZURE_BLOB_CONTAINER_NAME", "documents")

parsed_resource_id = parse_resource_id(project_resource_id)
subscription_id = parsed_resource_id['subscription']
resource_group = parsed_resource_id['resource_group']
account_name = parsed_resource_id['name']
project_name = parsed_resource_id['child_name_1']

## Upload sample documents to Blob Storage

Create and upload sample documents to your blob container.

In [None]:
from azure.storage.blob import BlobServiceClient
import os

# Create sample documents
os.makedirs("sample_docs", exist_ok=True)

with open("sample_docs/product_guide.txt", "w") as f:
    f.write("""
Product Guide - CloudMax Platform

CloudMax is a comprehensive cloud platform that provides scalable infrastructure,
managed services, and advanced analytics capabilities.

Key Features:
- Auto-scaling compute resources
- Managed database services (SQL, NoSQL)
- Built-in AI/ML capabilities
- Enterprise-grade security
- 99.99% uptime SLA

Getting Started:
1. Create a CloudMax account
2. Set up your first project
3. Deploy your application
4. Monitor and scale as needed
""")

with open("sample_docs/faq.txt", "w") as f:
    f.write("""
Frequently Asked Questions

Q: How do I reset my password?
A: Click on 'Forgot Password' on the login page and follow the instructions sent to your email.

Q: What payment methods are accepted?
A: We accept all major credit cards, PayPal, and wire transfers for enterprise customers.

Q: Is there a free trial available?
A: Yes, we offer a 30-day free trial with $200 in credits.

Q: How do I contact support?
A: You can reach our support team at support@cloudmax.com or via the chat widget in your dashboard.
""")

# Upload to blob storage
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
container_client = blob_service_client.get_container_client(blob_container_name)

# Create container if it doesn't exist
try:
    container_client.create_container()
except:
    pass

# Upload files
for filename in os.listdir("sample_docs"):
    with open(os.path.join("sample_docs", filename), "rb") as data:
        blob_client = container_client.get_blob_client(filename)
        blob_client.upload_blob(data, overwrite=True)
        print(f"Uploaded {filename}")

print(f"Documents uploaded to container '{blob_container_name}'")

## Create a knowledge source from Blob Storage

Create a knowledge source that automatically ingests and chunks documents from blob storage.

In [None]:
from azure.search.documents.indexes.models import (
    AzureBlobKnowledgeSource,
    AzureBlobKnowledgeSourceParameters,
    IngestionParameters,
    AzureOpenAIVectorizerParameters
)
from azure.search.documents.indexes import SearchIndexClient

aoai_params_embedding = AzureOpenAIVectorizerParameters(
    resource_url=azure_openai_endpoint,
    deployment_name=azure_openai_embedding_deployment,
    model_name=azure_openai_embedding_model
)

aoai_params_chat = AzureOpenAIVectorizerParameters(
    resource_url=azure_openai_endpoint,
    deployment_name=azure_openai_gpt_deployment,
    model_name=azure_openai_gpt_model
)

ks = AzureBlobKnowledgeSource(
    name=knowledge_source_name,
    description="Product documentation from blob storage",
    azure_blob_parameters=AzureBlobKnowledgeSourceParameters(
        connection_string=blob_connection_string,
        container_name=blob_container_name,
        ingestion_parameters=IngestionParameters(
            embedding_model_parameters=aoai_params_embedding,
            chat_completion_model_parameters=aoai_params_chat
        )
    )
)

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
index_client.create_or_update_knowledge_source(knowledge_source=ks)
print(f"Knowledge source '{knowledge_source_name}' created or updated successfully.")

## Monitor ingestion progress

Check the status of document ingestion.

In [None]:
import time

print("Waiting for ingestion to complete...")
while True:
    status = index_client.get_knowledge_source_status(knowledge_source_name)
    
    if status.status == "succeeded":
        print("Ingestion completed successfully!")
        break
    elif status.status == "failed":
        print("Ingestion failed!")
        print(f"Error: {status}")
        break
    else:
        print(f"Status: {status.status}")
        time.sleep(10)

## Create a knowledge base

Create a knowledge base that wraps the knowledge source.

In [None]:
from azure.search.documents.indexes.models import (
    KnowledgeBase,
    KnowledgeSourceReference,
    KnowledgeRetrievalOutputMode,
    KnowledgeRetrievalMinimalReasoningEffort
)

knowledge_base = KnowledgeBase(
    name=base_name,
    knowledge_sources=[
        KnowledgeSourceReference(name=knowledge_source_name)
    ],
    output_mode=KnowledgeRetrievalOutputMode.EXTRACTIVE_DATA,
    retrieval_reasoning_effort=KnowledgeRetrievalMinimalReasoningEffort()
)

index_client.create_or_update_knowledge_base(knowledge_base=knowledge_base)
print(f"Knowledge base '{base_name}' created or updated successfully")

mcp_endpoint = f"{endpoint}/knowledgebases/{base_name}/mcp?api-version=2025-11-01-Preview"

## Create an MCP Tool Connection

Create a connection to authenticate to your knowledge base tool.

In [None]:
from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from azure.mgmt.cognitiveservices.models import (
    ConnectionPropertiesV2BasicResource,
    CustomKeysConnectionProperties,
    CustomKeys
)

mgmt_client = CognitiveServicesManagementClient(credential, subscription_id)
resource = mgmt_client.project_connections.create(
    resource_group_name=resource_group,
    account_name=account_name,
    project_name=project_name,
    connection_name=project_connection_name,
    connection=ConnectionPropertiesV2BasicResource(
        properties=CustomKeysConnectionProperties(
            category="RemoteTool",
            target=mcp_endpoint,
            is_shared_to_all=True,
            metadata={"ApiType": "Azure"},
            credentials=CustomKeys(
                keys={"api-key": search_api_key}
            )
        )
    )
)

print(f"Connection '{resource.name}' created or updated successfully.")

## Create an Azure AI Agent

Create an agent that uses the knowledge base to answer questions about CloudMax.

In [None]:
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import PromptAgentDefinition, MCPTool

project_client = AIProjectClient(endpoint=project_endpoint, credential=credential)

instructions = """
A Q&A agent that can answer questions about CloudMax platform documentation.
Always provide references to the source documents used to answer the question.
If you do not have the answer, respond with "I don't know".
"""

mcp_kb_tool = MCPTool(
    server_label="knowledge-base",
    server_url=mcp_endpoint,
    require_approval="never",
    allowed_tools=["knowledge_base_retrieve"],
    project_connection_id=project_connection_name
)

agent = project_client.agents.create_version(
    agent_name=agent_name,
    definition=PromptAgentDefinition(
        model=agent_model,
        instructions=instructions,
        tools=[mcp_kb_tool]
    )
)

print(f"AI agent '{agent_name}' created or updated successfully")

## Start a chat with the agent

In [None]:
openai_client = project_client.get_openai_client()

conversation = openai_client.conversations.create()

response = openai_client.responses.create(
    conversation=conversation.id,
    input="What are the key features of CloudMax?",
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
)

print(f"Response: {response.output_text}")

## Ask follow-up questions

In [None]:
response = openai_client.responses.create(
    conversation=conversation.id,
    input="Is there a free trial available and what payment methods do you accept?",
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
)

print(f"Response: {response.output_text}")

## Clean up objects and resources

Delete resources when you're done.

### Delete the agent

In [None]:
project_client.agents.delete_version(agent.name, agent.version)
print(f"AI agent '{agent.name}' version '{agent.version}' deleted successfully")

### Delete the knowledge base

In [None]:
index_client.delete_knowledge_base(base_name)
print(f"Knowledge base '{base_name}' deleted successfully")

### Delete the knowledge source

In [None]:
index_client.delete_knowledge_source(knowledge_source=knowledge_source_name)
print(f"Knowledge source '{knowledge_source_name}' deleted successfully.")