# Azure AI Search Document Indexing Pipeline

In [1]:
# import modules

import os
import logging
from typing import Any
import shutil

from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient


from langchain_openai import AzureChatOpenAI
from langchain.text_splitter import NLTKTextSplitter
from langchain_openai import AzureOpenAIEmbeddings
from langchain.vectorstores.azuresearch import AzureSearch

In [2]:
# config

load_dotenv()

AZURE_AI_SEARCH_ENDPOINT = os.environ["AZURE_AI_SEARCH_ENDPOINT"]
AZURE_AI_SEARCH_API_KEY = os.environ["AZURE_AI_SEARCH_API_KEY"]
AZURE_STORAGE_ACC_CONNECTION_STRING = os.environ["AZURE_STORAGE_ACC_CONNECTION_STRING"]
BLOB_CONTAINER_NAME = os.environ["BLOB_CONTAINER_NAME"]


AZURE_OPENAI_API_KEY = os.environ["AZURE_OPENAI_API_KEY"]
AZURE_OPENAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"]
AZURE_OPENAI_VERSION = os.environ["AZURE_OPENAI_VERSION"]
AZURE_OPENAI_DEPLOYMENT_NAME = os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"]
AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.environ["AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME"]

DOC_INTELLIGENCE_ENDPOINT = os.environ["DOC_INTELLIGENCE_ENDPOINT"]
DOC_INTELLIGENCE_KEY = os.environ["DOC_INTELLIGENCE_KEY"]

SEARCH_TARGET_INDEX_NAME = os.getenv("AZURE_AI_FISHING_INDEX")

In [3]:
# Configure logging
logging.basicConfig(level=logging.WARNING, format='%(asctime)s :: %(levelname)s :: %(message)s')

## Initialize clients

#### Blob Storage

In [4]:
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_ACC_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)
print(f"Access to Blob Storage container | SUCCESS: {container_client.container_name}")

Access to Blob Storage container | SUCCESS: data


#### OpenAI

In [5]:
oai_llm = AzureChatOpenAI(
    openai_api_version="2024-02-01",
    azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
    temperature=0,
)
print(f"Access to OpenAI generation model | SUCCESS: {oai_llm.deployment_name}")

Access to OpenAI generation model | SUCCESS: gpt4-32k-test-instance


In [6]:
oai_emb_model = AzureOpenAIEmbeddings(
    azure_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME,
    openai_api_version="2024-02-01"
)
print(f"Access to OpenAI embedding model | SUCCESS: {oai_emb_model.model}")

Access to OpenAI embedding model | SUCCESS: text-embedding-ada-002


#### Search

In [7]:
search_client = SearchClient(endpoint=AZURE_AI_SEARCH_ENDPOINT,
                      index_name=SEARCH_TARGET_INDEX_NAME,
                      credential=AzureKeyCredential(AZURE_AI_SEARCH_API_KEY))
print(f"Access to AI Search client | SUCCESS: {search_client._index_name}")

Access to AI Search client | SUCCESS: fishing-index-d


In [8]:
search_index_client = SearchIndexClient(AZURE_AI_SEARCH_ENDPOINT, AzureKeyCredential(AZURE_AI_SEARCH_API_KEY))
indexes = [index for index in search_index_client.list_index_names()]
print(f"Access to AI Search index client | SUCCESS: {indexes}")

Access to AI Search index client | SUCCESS: ['fishing-index-d', 'rag-vector-store-d']


In [9]:
search_indexer_client = SearchIndexerClient(AZURE_AI_SEARCH_ENDPOINT, AzureKeyCredential(AZURE_AI_SEARCH_API_KEY))
indexers = search_indexer_client.get_indexer_names()
print(f"Access to AI Search indexer client | SUCCESS: {indexers}")

Access to AI Search indexer client | SUCCESS: ['fishing-indexer-d', 'rag-vector-store-d-indexer']


#### Document Intelligence

In [10]:
DI_KEY = AzureKeyCredential(DOC_INTELLIGENCE_KEY)
di_client = DocumentIntelligenceClient(DOC_INTELLIGENCE_ENDPOINT, DI_KEY)
print(f"Access to Azure Document Intelligence | SUCCESS: {di_client}")

Access to Azure Document Intelligence | SUCCESS: <azure.ai.documentintelligence._patch.DocumentIntelligenceClient object at 0x00000229C8648290>


## Download & process documents from blob storage

In [11]:
# (option 1) w/ temporary pdf generation locally

from langchain_community.document_loaders import AzureAIDocumentIntelligenceLoader

def load_documents_from_blob(container_client, doc_intelligence_key, doc_intelligence_endpoint):
    docs = []

    blob_list = container_client.list_blobs()
    for blob in blob_list:
        if blob.name.lower().endswith('.pdf'): # filter data types            
            blob_client = container_client.get_blob_client(blob)

            with open("temp.pdf", "wb") as download_file:
                download_file.write(blob_client.download_blob().readall())

            # process the downloaded PDF with Azure AI Document Intelligence
            doc = AzureAIDocumentIntelligenceLoader(
                file_path="temp.pdf",
                api_key=doc_intelligence_key, 
                api_endpoint=doc_intelligence_endpoint, 
                api_model="prebuilt-layout"
            ).load()

            # # add metafields | adjust metafields
            doc[0].metadata["url"] = str(blob_client.url)
            doc[0].metadata["name"] = str(blob.name)      
            doc[0].metadata["container"] = str(blob_client.container_name)

            docs.append(doc)
            os.remove("temp.pdf")  # remove temp file

    print(f"Successfully loaded {len(docs)} documents!")
    return docs

In [12]:
# load documents
loaded_docs = load_documents_from_blob(container_client, DOC_INTELLIGENCE_KEY, DOC_INTELLIGENCE_ENDPOINT)

Successfully loaded 2 documents!


In [13]:
# semantic chunking

loaded_docs_list = [item for sublist in loaded_docs for item in sublist]

def split_documents(docs_list):

    text_splitter = NLTKTextSplitter(chunk_size=500)
    doc_chunks = text_splitter.split_documents(docs_list)
    print("Total no. of chunks: ", len(doc_chunks))
    return doc_chunks

doc_chunks = split_documents(loaded_docs_list) # keep as list of Documents for uploading via langchain instance
doc_dict_chunks = [dict(doc) for doc in doc_chunks] # convert to list of dicts for Azure Document Intelligence

Total no. of chunks:  13


## Add content & metadata to Azure AI Search vector store

In [14]:
def print_index_fields(search_index_client, index_name):

    try:
        index = search_index_client.get_index(index_name)
        print(f"Fields in the index '{index_name}':")
        for field in index.fields:
            print(f"Field Name: {field.name}, Type: {field.type}, Searchable: {field.searchable}")

    except Exception as e:
        print(f"Failed to retrieve index fields: {e}")

print_index_fields(search_index_client, SEARCH_TARGET_INDEX_NAME)

Fields in the index 'fishing-index-d':
Field Name: id, Type: Edm.String, Searchable: False
Field Name: content, Type: Edm.String, Searchable: True
Field Name: metadata, Type: Edm.String, Searchable: True
Field Name: content_vector, Type: Collection(Edm.Single), Searchable: True


In [15]:
# setup vector store

vector_store: AzureSearch = AzureSearch(
    embedding_function=oai_emb_model.embed_query,
    azure_search_endpoint=AZURE_AI_SEARCH_ENDPOINT,
    azure_search_key=AZURE_AI_SEARCH_API_KEY,
    index_name=SEARCH_TARGET_INDEX_NAME
)

In [16]:
# upload documents to the index

vector_store.add_documents(documents=doc_chunks)

['ZDBlZmE4MjYtZGM5Mi00Yzk1LTliNDItNGY1NmNjMTJlMDUx',
 'YTU3MjEwYTAtNDQ1OS00YWNkLTgwNDctOGJiNDA5MWI2ZDFh',
 'ZTIzNzY1ZDYtY2QyNC00Yjc4LWI1ODMtNjFlMDMxNDllMzIw',
 'Yzg5MTIzYjYtZmQ5MC00NjA3LWJmNDAtYjI1NGIxYjlkNjAy',
 'NzcxYThlYWQtNzJlYS00MjE3LWI1ODktYTRkNzc4NmJiNGY1',
 'NjcxODk1NDktNzQ0My00OThhLWE2NmUtMzYzOWQzZjVmNzM5',
 'MmE5ZTA1ZjYtZDU3OC00MDFlLTkwNjUtMDBkZTQ0NDFiNzkz',
 'ZDYyMTUyY2YtMDRjZC00MTcwLWJiMmMtZTU4YzU0YWNiOTg5',
 'M2E4YmU3NGItMTIyZi00ZjkyLWE1ZTEtYTA0MDkzMjI2MDhl',
 'OTgzMTY5ZjctZWYyNS00OTMwLTk3NzAtYThmODkzZTI5MDc4',
 'YmIxYzZiZWYtMWIyMS00MmI0LWIxYjctZDlmMjZmZDU0NGJl',
 'MDUxMjcwYmMtZmJkNi00ZWNlLWFiNDUtZGVhODhlOWU0M2Ni',
 'ODlhNTNlNzAtNjdiZi00YTk3LThmNzUtZThhYzYwYWIzN2Zk']

In [19]:
from langchain.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

azure_retriever = vector_store.as_retriever(search_type="similarity")

system_prompt_template = """ 
    You are the most powerful and skillfull expert in querying documents to find answers to user's questions.
    Your main task is to answer the USER QUERY based only on the provided CONTEXT.
    # CONTEXT
    {context}
    # USER QUERY
    {query}
"""

system_prompt = ChatPromptTemplate.from_template(system_prompt_template)

chain = (
    {"context": azure_retriever, "query": RunnablePassthrough()}
    | system_prompt
    | oai_llm
    | StrOutputParser()
)

In [21]:
user_query = "How to reduce injury and handling time for the fish?"

chain.invoke(user_query)

"To reduce injury and handling time for the fish, use barbless or circle hooks and needlenose pliers or forceps. Also, land the fish as quickly as possible to minimize the fish's fighting time. When handling a fish, use wet hands and minimize the time out of water to 20 to 30 seconds."