# **Orchestrating a Retriever Pipeline with Azure AI Search**

Project Outline:
1. Setting Up Azure AI Search, Azure Blob Storage and Azure Open AI
2. Document Preprocessing 
3. Embedding & Uploading to Azure AI Search 
4. Retrieval Pipeline 
5. Orchestrate with LLM 
6. Evaluation (Relevancy Score)
-------

In [8]:
from dotenv import load_dotenv
load_dotenv(override=True)
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
import os 
## You need these to securely connect to Azure Services

In [28]:
endpoint = os.getenv("AZURE_SEARCH_ENDPOINT") # URL of your Azure Search AI 
credential = AzureKeyCredential(os.getenv("AZURE_SEARCH_KEY")) if os.getenv("AZURE_SEARCH_KEY") else DefaultAzureCredential() # Either the Admin Key or DefaultAzureCredential for authentication
index_name = os.getenv("AZURE_SEARCH_INDEX") # Name of your Azure Search Index
blob_connection_string = os.getenv("AZURE_BLOB_CONN_STR") # Connection string to your Azure Blob Storage
blob_container_name = os.getenv("AZURE_BLOB_CONTAINER") # Name of your Azure Blob Storage Container

azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") # URL of your Azure OpenAI Service
azure_openai_key = os.getenv("AZURE_OPENAI_KEY") # Key for your Azure OpenAI Service
azure_openai_embedding_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT") # Name of your Azure OpenAI Embedding Deployment
azure_openai_model_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") 
azure_openai_model_dimensions = int(os.getenv("AZURE_OPENAI_MODEL_DIMENSIONS", 1024)) # Dimensions of your Azure OpenAI Embedding Model

azure_ai_services_key = os.getenv("AZURE_AI_SERVICE") # Key for your Azure AI Service (Document Intelligence)
azure_ai_services_endpoint = os.getenv("AZURE_AI_SERVICE_ENDPOINT") # Endpoint for your Azure AI Service (Document Intelligence)
# Make sure you don't enable OCR or Markdown parsing at the same time, because Document Layout Skill is exclusive


## Connect to Blob Storage and Load Documents

In [10]:
from azure.storage.blob import BlobServiceClient
import glob

def upload_sample_documents(blob_connection_string, blob_container_name, documents_directory, use_user_identity=False):
    blob_service_client = BlobServiceClient.from_connection_string(
        logging_enable=True,
        conn_str=blob_connection_string,
        credential=DefaultAzureCredential() if use_user_identity else None
    )
    container_client = blob_service_client.get_container_client(blob_container_name)
    if not container_client.exists():
        container_client.create_container()
    
    files = glob.glob(documents_directory)
    for file in files:
        with open(file, "rb") as data:
            name = os.path.basename(file)
            if not container_client.get_blob_client(name).exists():
                container_client.upload_blob(name=name, data=data)
                print(f"Uploaded {name} to Blob Storage")

upload_sample_documents(
    blob_connection_string=blob_connection_string,
    blob_container_name=blob_container_name,
    documents_directory= "Documents/*.pdf")

Uploaded Leave Policy_Onsite.pdf to Blob Storage
Uploaded Maternity Leave and Paternity Leave policy-Offshore.pdf to Blob Storage
Uploaded Leave Policy-Offshore.pdf to Blob Storage
Uploaded Sabbatical_Long Leave Policy-Offshore.pdf to Blob Storage


## Create a blob data source connector on Azure AI Search

In [15]:
from azure.search.documents.indexes import SearchIndexerClient
from azure.search.documents.indexes.models import (
    SearchIndexerDataContainer,
    SearchIndexerDataSourceConnection,
    SoftDeleteColumnDeletionDetectionPolicy
)

indexer_client = SearchIndexerClient(endpoint=endpoint, credential=credential)

container = SearchIndexerDataContainer(name=blob_container_name)

delete_policy = SoftDeleteColumnDeletionDetectionPolicy(
    soft_delete_column_name="isDeleted",
    soft_delete_marker_value="true"
)

data_source_connection = SearchIndexerDataSourceConnection(
    name = f"{index_name}-blob",
    type="azureblob",
    connection_string=blob_connection_string,
    container=container,
    data_deletion_detection_policy=delete_policy
)

data_source = indexer_client.create_or_update_data_source_connection(data_source_connection)

print(f"Data Source {data_source.name} created or updated!!!!")


Data Source int-vec-blob created or updated!!!!


**This code does the following:**

`- Connects to your Azure Cognitive Search service.`

`- Tells Search which Blob Storage container to read files from.`

`- Configures the data source and deletion policy.`

`- Creates or updates the data source in Azure Search.`

## Create a search index

In [None]:
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchField, 
    SearchFieldDataType, 
    VectorSearch, 
    HnswAlgorithmConfiguration, 
    VectorSearchProfile, 
    AzureOpenAIVectorizer, 
    AzureOpenAIVectorizerParameters, 
    SemanticConfiguration, 
    SemanticSearch, 
    SemanticPrioritizedFields, 
    SemanticField, 
    SearchIndex,
)
## These are all the building blocks to define an index, vector search, semantic search and fields. 

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

fields = [  
    SearchField(name="parent_id", type=SearchFieldDataType.String, sortable=True, filterable=True, facetable=True),  
    SearchField(name="title", type=SearchFieldDataType.String),  
    SearchField(name="chunk_id", type=SearchFieldDataType.String, key=True, sortable=True, filterable=True, facetable=True, analyzer_name="keyword"),  
    SearchField(name="chunk", type=SearchFieldDataType.String, sortable=False, filterable=False, facetable=False),  
    SearchField(name="vector", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), vector_search_dimensions=azure_openai_model_dimensions, vector_search_profile_name="myHnswProfile"),  
]

vector_search = VectorSearch(  
    algorithms=[  
        HnswAlgorithmConfiguration(name="myHnsw"),
    ],  
    profiles=[  
        VectorSearchProfile(  
            name="myHnswProfile",  
            algorithm_configuration_name="myHnsw",  
            vectorizer_name="myOpenAI",  
        )
    ],  
    vectorizers=[  
        AzureOpenAIVectorizer(  
            vectorizer_name="myOpenAI",  
            kind="azureOpenAI",  
            parameters=AzureOpenAIVectorizerParameters(  
                resource_url=azure_openai_endpoint,  
                deployment_name=azure_openai_embedding_deployment,
                model_name=azure_openai_model_name,
                api_key=azure_openai_key,
            ),
        ),  
    ],  
) 

semantic_config = SemanticConfiguration(  
    name="my-semantic-config",  
    prioritized_fields=SemanticPrioritizedFields(  
        content_fields=[SemanticField(field_name="chunk")],
        title_field=SemanticField(field_name="title")
    ),  
)
semantic_search = SemanticSearch(configurations=[semantic_config])

index = SearchIndex(
    name=index_name, 
    fields=fields, 
    vector_search=vector_search, 
    semantic_search=semantic_search
)
result = index_client.create_or_update_index(index)
print(f"{result.name} created")

int-vec created


**This code is setting up the blueprint (schema) for your search system in Azure.**

`- Defines what data to store (fields).`

`- Configures embeddings (vector_search).`

`- Configures smart ranking (semantic_search).`

`- Creates the index in Azure AI Search.`

In [None]:
from azure.search.documents.indexes.models import (
    SplitSkill,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    AzureOpenAIEmbeddingSkill,
    SearchIndexerIndexProjection,
    SearchIndexerIndexProjectionSelector,
    SearchIndexerIndexProjectionsParameters,
    IndexProjectionMode,
    SearchIndexerSkillset
)

skillset_name = f"{index_name}-skillset"

def create_skillset():
    split_skill = SplitSkill(  
        description="Split skill to chunk documents",  
        text_split_mode="pages",  
        context="/document",  
        maximum_page_length=2000,  
        page_overlap_length=500,  
        inputs=[  
            InputFieldMappingEntry(name="text", source="/document/content"),  
        ],  
        outputs=[  
            OutputFieldMappingEntry(name="textItems", target_name="pages")  
        ]
    )

    embedding_skill = AzureOpenAIEmbeddingSkill(  
        description="Skill to generate embeddings via Azure OpenAI",  
        context="/document/pages/*",  
        resource_url=azure_openai_endpoint,  
        deployment_name=azure_openai_embedding_deployment,  
        model_name=azure_openai_model_name,
        dimensions=azure_openai_model_dimensions,
        api_key=azure_openai_key,  
        inputs=[  
            InputFieldMappingEntry(name="text", source="/document/pages/*"),  
        ],  
        outputs=[
            OutputFieldMappingEntry(name="embedding", target_name="vector")  
        ]
    )

    index_projections = SearchIndexerIndexProjection(  
        selectors=[  
            SearchIndexerIndexProjectionSelector(  
                target_index_name=index_name,  
                parent_key_field_name="parent_id",  
                source_context="/document/pages/*",  
                mappings=[
                    InputFieldMappingEntry(name="chunk", source="/document/pages/*"),  
                    InputFieldMappingEntry(name="vector", source="/document/pages/*/vector"),
                    InputFieldMappingEntry(name="title", source="/document/metadata_storage_name")
                ]
            )
        ],  
        parameters=SearchIndexerIndexProjectionsParameters(  
            projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS  
        )  
    )

    skills = [split_skill, embedding_skill]

    return SearchIndexerSkillset(  
        name=skillset_name,  
        description="Skillset to chunk documents and generating embeddings",  
        skills=skills,  
        index_projection=index_projections
    )

skillset = create_skillset()
client = SearchIndexerClient(endpoint, credential)  
client.create_or_update_skillset(skillset)  
print(f"{skillset.name} created")  

int-vec-skillset created


In [25]:
from azure.search.documents.indexes.models import (
    SearchIndexer
)

indexer_name = f"{index_name}-indexer"  

indexer_parameters = None

indexer = SearchIndexer(  
    name=indexer_name,  
    description="Indexer to index documents and generate embeddings",  
    skillset_name=skillset_name,  
    target_index_name=index_name,  
    data_source_name=data_source.name,
    parameters=indexer_parameters
)  

indexer_client = SearchIndexerClient(endpoint, credential)  
indexer_result = indexer_client.create_or_update_indexer(indexer)  
  
indexer_client.run_indexer(indexer_name)  
print(f' {indexer_name} is created and running. If queries return no results, please wait a bit and try again.')  

 int-vec-indexer is created and running. If queries return no results, please wait a bit and try again.


## Perform a vector similarity search

In [36]:
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizableTextQuery

query = "What are the eligibility criteria defined in Affine’s Sabbatical Leave Policy?"
  
search_client = SearchClient(endpoint, index_name, credential=credential)
vector_query = VectorizableTextQuery(text=query, k_nearest_neighbors=1, fields="vector", exhaustive=True)

  
results = search_client.search(  
    search_text=None,  
    vector_queries= [vector_query],
    top=1
)  
  
for result in results:  
    print(f"parent_id: {result['parent_id']}")  
    print(f"chunk_id: {result['chunk_id']}")  
    print(f"Score: {result['@search.score']}")  
    print(f"Content: {result['chunk']}")   

parent_id: aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1
chunk_id: dd87115b449e_aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1_pages_1
Score: 0.8023779
Content: • Full-Time employees may undertake a maximum of 2 sabbaticals during their tenure at Affine 

• An employee will not be eligible to take sabbatical twice in a year 

• Each sabbatical will be for a period of up to 3 months only. In case of longer duration, the same  

needs to be approved by VP/CEO 

• There should be a gap of at least one year between two subsequent sabbatical leaves 

• Sabbatical leave will be an unpaid leave with no entitlements like health insurance, PF, Gratuityetc.  

Eligible leave will not be adjusted against Sabbatical leave 

• Employee will not be eligible for any allowance/ reimbursements / Earned Leave will be paid 

/

## Performa a Hybrid Search

In [37]:
query = "What are the eligibility criteria defined in Affine’s Sabbatical Leave Policy?"

search_client = SearchClient(endpoint, index_name, credential=credential)
vector_query = VectorizableTextQuery(text=query, k_nearest_neighbors=1, fields="vector", exhaustive=True)
  
results = search_client.search(  
    search_text=query,  
    vector_queries= [vector_query],
    select=["parent_id", "chunk_id", "chunk"],
    top=1
)  
  
for result in results:  
    print(f"parent_id: {result['parent_id']}")  
    print(f"chunk_id: {result['chunk_id']}")  
    print(f"Score: {result['@search.score']}")  
    print(f"Content: {result['chunk']}")  

parent_id: aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1
chunk_id: dd87115b449e_aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1_pages_1
Score: 0.030000001192092896
Content: • Full-Time employees may undertake a maximum of 2 sabbaticals during their tenure at Affine 

• An employee will not be eligible to take sabbatical twice in a year 

• Each sabbatical will be for a period of up to 3 months only. In case of longer duration, the same  

needs to be approved by VP/CEO 

• There should be a gap of at least one year between two subsequent sabbatical leaves 

• Sabbatical leave will be an unpaid leave with no entitlements like health insurance, PF, Gratuityetc.  

Eligible leave will not be adjusted against Sabbatical leave 

• Employee will not be eligible for any allowance/ reimbursements / Earned Leave will 

## Perform a Hybrid Search + Semantic Reranking

In [38]:
from azure.search.documents.models import (
    QueryType,
    QueryCaptionType,
    QueryAnswerType
)

query = "What are the eligibility criteria defined in Affine’s Sabbatical Leave Policy?"

search_client = SearchClient(endpoint, index_name, credential)
vector_query = VectorizableTextQuery(text=query, k_nearest_neighbors=1, fields="vector", exhaustive=True)

results = search_client.search(  
    search_text=query,
    vector_queries=[vector_query],
    select=["parent_id", "chunk_id", "chunk"],
    query_type=QueryType.SEMANTIC,
    semantic_configuration_name='my-semantic-config',
    query_caption=QueryCaptionType.EXTRACTIVE,
    query_answer=QueryAnswerType.EXTRACTIVE,
    top=1
)

semantic_answers = results.get_answers()
if semantic_answers:
    for answer in semantic_answers:
        if answer.highlights:
            print(f"Semantic Answer: {answer.highlights}")
        else:
            print(f"Semantic Answer: {answer.text}")
        print(f"Semantic Answer Score: {answer.score}\n")

for result in results:
    print(f"parent_id: {result['parent_id']}")  
    print(f"chunk_id: {result['chunk_id']}")  
    print(f"Reranker Score: {result['@search.reranker_score']}")
    print(f"Content: {result['chunk']}")  

    captions = result["@search.captions"]
    if captions:
        caption = captions[0]
        if caption.highlights:
            print(f"Caption: {caption.highlights}\n")
        else:
            print(f"Caption: {caption.text}\n")

Semantic Answer: Eligibility   •<em> Full time employees with a minimum tenure of one year at Affine </em>  •<em> Should have a valid reason or need for the leave, for example: </em>  •<em> Pursue higher education or Medical emergency for self or family </em> •<em> Pursue their area of interest   Process </em>  • The employee should discuss and take a written approval from the Reporting Manager an...
Semantic Answer Score: 0.9980000257492065

parent_id: aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1
chunk_id: dd87115b449e_aHR0cHM6Ly9ocnBvbGljeWRvY3VtZW50cy5ibG9iLmNvcmUud2luZG93cy5uZXQvaW50LXZlYy9TYWJiYXRpY2FsX0xvbmclMjBMZWF2ZSUyMFBvbGljeS1PZmZzaG9yZS5wZGY1_pages_0
Reranker Score: 3.361161470413208
Content: Affine Analytics Private Limited 
 
453, BBMP No. 351/453, 4th sector, HSR Layout, Bangalore, KA, IN – 560102.                                                                                    

## **Conclusion**

`The Semantic Answer Score (0.998) is very high, meaning the first chunk you highlighted (with full eligibility + process details) is the most relevant and comprehensive match.`
