# Document Indexing Workflow

## Important Note
The document indexing process should be implemented using the **indexer feature** and executed in **multiple steps** for optimal performance and maintainability. 

This notebook demonstrates the step-by-step approach to document indexing, breaking down the process into manageable stages that can be:
- Monitored individually
- Debugged more easily
- Rerun selectively if needed
- Scaled appropriately based on document volume

Each step in this notebook represents a distinct phase of the indexing pipeline, ensuring a structured and systematic approach to document processing.

In [19]:
from dotenv import load_dotenv
from azure.core.credentials import AzureKeyCredential
from azure.ai.textanalytics.aio import TextAnalyticsClient
from azure.ai.textanalytics._models import DetectLanguageInput, DocumentError
from openai import AsyncAzureOpenAI
from typing import List, Dict
from services.base_embedding_service import BaseEmbeddingService
from factory.embedding_factory import EmbeddingFactory
from models.document import Document
from services.base_embedding_service import BaseEmbeddingService
from azure.ai.inference.aio import EmbeddingsClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.translation.text.aio import TextTranslationClient
from azure.ai.translation.text import TranslatorCredential
from azure.ai.translation.text.models import InputTextItem
from typing import Dict, List
from models.document import Document
import os
import pandas as pd
import json
import aiohttp
import asyncio
import uuid
import cohere
import numpy as np
import sys

In [15]:
load_dotenv(override=True)

language_endpoint=os.getenv('LANGUAGE_ENDPOINT')
language_api_key=os.getenv('LANGUAGE_KEY')
openai_endpoint=os.getenv('OPENAI_ENDPOINT')
openai_key = os.getenv('OPENAI_KEY')
openai_embedding_deployment = os.getenv('EMBEDDING_OPENAI_DEPLOYMENT')
region="westus"

# Translation Service
translation_endpoint = os.getenv('TRANSLATION_ENDPOINT')
translation_key = os.getenv('TRANSLATION_KEY')
translation_region = os.getenv('TRANSLATION_REGION')

# See the list of models available here
# https://docs.cohere.com/docs/cohere-embed
cohere_key = os.getenv('COHERE_KEY')
cohere_model=os.getenv('COHERE_MODEL')
cohere_endpoint=os.getenv('COHERE_ENDPOINT')

### Load supported languages

We use [Cohere](https://docs.cohere.com/docs/cohere-embed) since it support embedding for multiple languages here.

We load the JSON files that support all the languages

In [None]:
# This is the official supported languages in Cohere
path = os.path.join("cohere","supported_languages.json")

with open(path,"r",encoding="utf-8") as f:
    supported_languages = json.load(f)

print(supported_languages)

# Create a dictionary for fast lookup: {code: description}
language_dict = {lang["code"]: lang["description"] for lang in supported_languages}

Create two custom functions to validate if the languague is supported before
the embedding

In [4]:
# Function to check if a language code is supported
def is_language_supported(language_code: str) -> bool:
    """Check if a language code is supported by Cohere embeddings"""

    return language_code in language_dict

# Function to get language description
def get_language_description(language_code: str) -> str:
    """Get the description for a language code, or None if not supported"""
    return language_dict.get(language_code)

You want to send multiple documents to the Text Language Services to detect the language.  This service contains certains limits and this provide the threshold

In [5]:
def reached_size_limit(docs:list) -> bool:

    number_of_documents = len(docs)

    if number_of_documents >= 950 and number_of_documents < 1000:
        return True

    json_string = json.dumps(docs, ensure_ascii=False)
    accurate_size_bytes = len(json_string.encode('utf-8'))

    # Check if size exceeds 1 MB (1,048,576 bytes)
    max_size_limit_bytes = 1 * 1024 * 1024  # 1 MB
    # Set size limit to 700 KB (allowing room before 1 MB limit)
    size_limit_bytes = 700 * 1024  # 700 KB

    if accurate_size_bytes > size_limit_bytes and accurate_size_bytes <= max_size_limit_bytes:
        return True
    else:
        return False

Create Text Analytics client to extract the language code of the document

In [20]:
text_analytics_client = TextAnalyticsClient(language_endpoint, AzureKeyCredential(language_api_key))
credential = TranslatorCredential(translation_key,translation_region)
text_translation_client = TextTranslationClient(endpoint=translation_endpoint,credential=credential)

In [7]:
client = EmbeddingsClient(
            endpoint=cohere_endpoint,
            credential=AzureKeyCredential(cohere_key)
        )
model_name = cohere_model

#### Convert file to JSON

We convert the XLSX file to JSON, by doing so we load the JSON in a dictionnary and add new columns to be able to load them in the index

In [None]:
def csv_to_json_array(csv_file:str, output_file:str):
    """Convert CSV or Excel file to array of JSON objects with snake_case field names"""
    # Check file extension and read accordingly
    if csv_file.endswith('.xlsx') or csv_file.endswith('.xls'):
        # Read Excel file into DataFrame
        df = pd.read_excel(csv_file)
    else:
        # Read CSV file into DataFrame
        df = pd.read_csv(csv_file)
    
    # Replace NaN values with empty strings
    df = df.fillna('')
    
    # Convert column names from "Title Case" to "snake_case"
    def to_snake_case(name):
        # Replace spaces with underscores and convert to lowercase
        return name.replace(' ', '_').lower()
    
    # Rename all columns to snake_case
    df.columns = [to_snake_case(col) for col in df.columns]
    
    # Convert DataFrame to list of dictionaries (JSON objects)
    data = df.to_dict(orient='records')
    
    # Print the result
    print(f"Converted {len(data)} records from {csv_file} to JSON array")
    print(f"Converted column names: {list(df.columns)}")
    print("\nFirst record example:")
    print(json.dumps(data[0], indent=2))
    
    # Save JSON array to file
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    print(f"\nJSON array saved to: {output_file}")

csv_to_json_array(csv_file="car_problems_multilingual.xlsx",output_file="car_problems_multilingual.json")

In [None]:
# Load the json in a dictionnary
with open("car_problems_multilingual.json","r",encoding="utf-8") as f:
    documents = json.load(f)

print(f"{len(documents)} loaded")    

In [21]:
async def get_language_documents(docs:list):
    documents = await text_analytics_client.detect_language(docs)    
    processed_documents = []

    # Parse all documents
    for document in documents:

        doc = {
            "id": document.id            
        }

        if document.is_error:
            doc['is_error'] = True
            doc['error'] = document.error
        else:
            # Language simplified is different from our AI Services vs Cohere
            if document.primary_language.iso6391_name == "zh_chs":
                doc['language_code'] = "zh"
            else:
                doc['language_code'] = document.primary_language.iso6391_name            
        
        processed_documents.append(doc)

    return processed_documents
    

Here you could use the Batch Translation service to translate the complete document, this will be more performant but in this case, we want to only translate the fix in english.  For high volume it will be better to use the batch to avoid getting throttling.  THE SDK in PYTHON IT'S BETA, FOR PRODUCTION USE THE REST API

In [47]:
async def translate_documents_english(documents:list):
    """
      We only translate the fault in this case since we only do a research
      on this field.  If you want to do research on other field like a text search
      this mean you will need to translate those field in the index
    """
    
    processed_documents = []
    
    for document in documents:
        
        if document['language_code'] == 'en':
            processed_documents.append(document)
            continue

        from_language = document['language_code']
        to_language = ["en"]

        #inputs:List[InputTextItem] = [document['fault']]
        inputs = [{"text": document['fault']}]
        response = await text_translation_client.translate(
            content=inputs,
            from_parameter=from_language,
            to=to_language
        )
        
        translation = response[0]

        if translation:
            document['fault'] = translation.translations[0].text
            processed_documents.append(document)

    return processed_documents

            

In [None]:
translated_documents = await translate_documents_english(documents)

In [None]:
print(len(translated_documents))

In [50]:
with open("car_problems_english_translation.json", 'w', encoding='utf-8') as f:
    json.dump(translated_documents, f, indent=2, ensure_ascii=False)

In [None]:
docs = []
processed_documents = []

for document in documents:

    # Detect the language of the documents, here the maximum is 1000 documents with a size of 1 MB
    docs.append({
        "id": document["id"],
        "text": document['fault']
    })

    if reached_size_limit(docs):
        print(f"Reached size limits {len(docs)}")
        # Add something here
        break

if len(docs) > 0:
    results = await get_language_documents(docs)
    # Map the language results back to the original documents
    for result in results:
        # Find the matching document by id
        matching_doc = next((d for d in documents if d['id'] == result['id']), None)
        if matching_doc and not result.get('is_error'):
            matching_doc['language_code'] = result['language_code']
            processed_documents.append(matching_doc)

print("all documents processed")       


In [None]:
print(json.dumps(processed_documents,indent=4))

Loop each documents and validate if the language is supported by the embedding model, if not you will need to add a translation step

In [None]:
documents_not_supported = []
documents_to_embeds = []

for doc in processed_documents:
   
   if not is_language_supported(doc['language_code']):
      documents_not_supported.append({
         "id": doc['id'],
         "language_code": doc['language_code'],
         "language_description": get_language_description(doc['language_code'])
      })
      continue

   # Save supported document so the embedding can be a batch job
   # for performance reason, for indexing the first batch this is the better 
   # options
   documents_to_embeds.append({
         "id": doc['id'],
         "language_code": doc['language_code'],     
         "text": doc['fault']  # Important this is the text that we want to create the embedding
   })

print(f"Not supported documents {len(documents_not_supported)}")
print(f"Documents supported {len(documents_to_embeds)}")
   

In [None]:
print(json.dumps(documents_not_supported,indent=4))

Save the file to process for the batch

In [27]:
with open("documents_to_embed.jsonl", 'w', encoding='utf-8') as f:
    for doc in documents_to_embeds:
      f.write(json.dumps(doc,ensure_ascii=False) + '\n')
    #json.dump(documents_to_embeds, f, indent=2, ensure_ascii=False)

Load the generated documents into a pydantic class, easier to manipulate

In [52]:
def load_documents_from_jsonl(file_path: str) -> List[Document]:
    """
    Load documents from a JSONL file into a list of Document objects.
    
    Args:
        file_path: Path to the JSONL file
        
    Returns:
        List of Document objects
    """
    documents = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():  # Skip empty lines
                data = json.loads(line)
                documents.append(Document(**data))
    return documents

In [None]:
documents = load_documents_from_jsonl("documents_to_embed.jsonl")

len(documents)

In [54]:
async def create_embeddings(documents:List[str]) -> List[float]:
    """Call Azure AI Inference endpoint using Github Model Cohere 3"""
    
    vectors:List[float] = []
    response = await client.embed(input=documents,
                                  model=cohere_model)
    
    for data in response.data:
        vectors.append(data['embedding'])

    return vectors

In [None]:
idx = 0       
number_of_documents = len(documents) - 1
documents_to_embed:List[str] = []

doc_test = [ documents[0],documents[1],documents[2] ]

idx_document = 0

print(len(doc_test))

while idx < len(documents):
            
    idx+=1   
    #print(idx)                     
    documents_to_embed.append(documents[idx-1].text)

    if idx % 10 == 0:
        vectors = await create_embeddings(documents_to_embed)

        for v in vectors:
            documents[idx_document].vector = v
            idx_document+=1            

        documents_to_embed.clear()

if len(documents_to_embed) > 0:
    vectors = await create_embeddings(documents_to_embed)

    for v in vectors:
        documents[idx_document].vector = v
        idx_document+=1    

print(f"Documents embedded : {len(documents_to_embed)}")        

In [53]:
with open("documents_with_vectors.json", 'w', encoding='utf-8') as f:
    # Convert all Pydantic models to dictionaries
    json_data = [doc.model_dump() for doc in documents]
    json.dump(json_data, f, indent=2, ensure_ascii=False)

Now add the vector to the original document

In [None]:
# vector_fix
with open("documents_with_vectors.json","r",encoding="utf-8") as f:
    doc_with_vectors = json.load(f)

with open("car_problems_multilingual.json","r",encoding="utf-8") as f:
    cars = json.load(f)    

# Create a dictionary for fast lookup: {id: vector}
vector_dict = {doc["id"]: doc["vector"] for doc in doc_with_vectors}

# Add vectors to cars documents
for car in cars:
    car_id = car["id"]
    if car_id in vector_dict:
        car["vector"] = vector_dict[car_id]

print(f"Added vectors to {sum(1 for car in cars if 'vector' in car)} out of {len(cars)} documents")    

In [55]:
with open("car_problems_multilingual_with_vectors.json", 'w', encoding='utf-8') as f:
    for car in cars:
      f.write(json.dumps(car,ensure_ascii=False) + '\n')

Now upload in the index

In [60]:
from dotenv import load_dotenv
from azure.search.documents.aio import SearchClient
from azure.core.credentials import AzureKeyCredential
import os

load_dotenv(override=True)

search_endpoint = os.getenv('SEARCH_ENDPOINT')
search_api_key = os.getenv('SEARCH_API_KEY')

In [61]:
index_name = "cartroubleshooting"

credential = AzureKeyCredential(search_api_key)
# Initialize the search index client
search_client = SearchClient(endpoint=search_endpoint,
                             index_name=index_name,
                             credential=credential)

In [None]:
try:
    result = await search_client.upload_documents(cars)
    print("Upload of new document succeeded: {}".format(result[0].succeeded))
except Exception as ex:
    print(ex)
  