## 📚 Prerequisites

Before running this notebook, ensure you have provisioned your Azure resources, configured your environment variables (Search endpoint, key, and index name), and set up a Conda environment to manage dependencies. Refer to [REQUIREMENTS.md](REQUIREMENTS.md) for full setup instructions.

## 📋 Table of Contents

This notebook walks through **two approaches for indexing policy documents stored in Azure Blob Storage**, using either the **Push SDK** or the **native Azure AI Search Indexer**.

#### 1. [**Indexing Policy Documents from Blob Storage Using the SDK (Push Model)**](#index-using-push-sdk)

In this approach, we build a fully controlled indexing pipeline using the Azure SDK for Python.

You will learn how to:

- Load PDF or text-based policy documents directly from Blob Storage
- Preprocess and chunk content for retrieval
- Generate embeddings using Azure OpenAI
- Upload documents, metadata, and vector embeddings into Azure AI Search using the `SearchClient`

This method gives you **maximum flexibility** and is ideal when you need to integrate your own preprocessing logic, custom chunking strategies, or real-time updates.

#### 2. [**Indexing Policy Documents Using Azure AI Search Indexers with Custom Skillsets**](#index-using-indexer)

This approach uses the built-in **Azure Search Indexer** to automatically crawl documents from Blob Storage and enrich them using a **custom skillset**.

You will learn how to:

- Connect Blob Storage as a data source
- Define a skillset that performs OCR, embedding, or text extraction
- Automatically map enriched fields into your Azure Search index

This method is best when you want to **automate the ingestion pipeline** with minimal custom code, leveraging Azure's low-code enrichment platform.

> By the end of this notebook, you’ll understand how to build both a **fully customized ingestion flow** using the SDK and a **scalable low-code pipeline** using native indexers with skillsets — both optimized for retrieving policy content stored in Blob Storage.


In [1]:
import os
from tenacity import retry, wait_random_exponential, stop_after_attempt
from dotenv import load_dotenv
import os
import json
import copy
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    ExhaustiveKnnAlgorithmConfiguration,
    ExhaustiveKnnParameters,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    SearchIndex,
    SemanticConfiguration,
    SemanticPrioritizedFields,
    SemanticField,
    SearchField,
    VectorSearch,
    SemanticSearch,
    HnswAlgorithmConfiguration,
    HnswParameters,
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchProfile,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    VectorSearch,
    ExhaustiveKnnParameters,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    SearchIndex,
    SemanticConfiguration,
    SemanticField,
    SearchField,
    VectorSearch,
    HnswParameters,
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
)

# Load environment variables from .env file
load_dotenv()

# Define the target directory
target_directory = os.getcwd()  # Get the current working directory

# Move one directory back
parent_directory = os.path.dirname(target_directory)

# Check if the parent directory exists
if os.path.exists(parent_directory):
    # Change the current working directory to the parent directory
    os.chdir(parent_directory)
    print(f"Directory changed to {os.getcwd()}")
else:
    print(f"Parent directory {parent_directory} does not exist.")

Directory changed to c:\Users\pablosal\Desktop\aihlsignited-medindexer


### **Indexing Policy Documents from Blob Storage Using the SDK (Push Model)**

#### **1. Use Document Intelligence to parse and read the PDF into text (Markdown)**

In [2]:
from src.documentintelligence.document_intelligence_helper import AzureDocumentIntelligenceManager

text_extractor = AzureDocumentIntelligenceManager()

2025-03-30 21:08:04,990 - micro - MainProcess - INFO     Container 'pre-auth-policies' already exists. (blob_helper.py:_create_container_if_not_exists:89)


In [3]:
policy_raw_text_markdown = text_extractor.analyze_document(document_input="https://storageaeastusfactory.blob.core.windows.net/pre-auth-policies/policies_ocr/001.pdf", 
                                model_type="prebuilt-layout")

2025-03-30 21:08:05,077 - micro - MainProcess - INFO     Blob URL detected. Extracting content. (document_intelligence_helper.py:analyze_document:78)
2025-03-30 21:08:05,385 - micro - MainProcess - INFO     Downloaded blob 'policies_ocr/001.pdf' as bytes. (blob_helper.py:download_blob_to_bytes:311)


In [4]:
policy_raw_text_markdown.content[:100]

'<figure>\n\ncigna\nhealthcare\n\n</figure>\n\n\n# PRIOR AUTHORIZATION POLICY\n\nPOLICY:\n\nInflammatory Conditio'

#### **2. Extract Metadata**

- **Policy Name**: Extracted from document title or headers (e.g., "Antiseizure Medications – Epidiolex Prior Authorization Policy").
- **Payer Name**: Identifies which insurance company issued the policy (e.g., Cigna, UnitedHealthcare).
- **Drug Name(s)**: The medications referenced in the policy (e.g., Epidiolex, Dupixent).
- **Medical Specialties Involved**: Identifies if a policy is specific to Rheumatology, Neurology, Oncology, etc.
- **Indications & Diseases Covered**: Disease categories linked to a policy (e.g., Crohn’s disease, epilepsy)

In [11]:
import json
from typing import List, Dict, Optional
from pydantic import BaseModel
import openai
import os
import time
import requests
from azure.core.credentials import AzureKeyCredential
from utils.ml_logging import get_logger

logger = get_logger()

# --------------------------------------------------------------------------
# Configure the Azure OpenAI client using key-based authentication.
# --------------------------------------------------------------------------
client = openai.AzureOpenAI(
    api_version="2024-12-01-preview",
    azure_endpoint="https://pablo-m2areked-westeurope.cognitiveservices.azure.com",
    azure_deployment="gpt-4o-structured-outputs",
    api_key=os.getenv("AZURE_OPENAI_KEY_StructuredOutputs")
    )  # Ensure this env var is set.

model_name = "gpt-4o-structured-outputs"

In [12]:
# ----------------- DEFINE POLICY METADATA MODEL -----------------
class PolicyMetadata(BaseModel):
    policy_name: str
    payer_name: str
    drug_names: List[str]
    medical_specialties: List[str]
    indications_diseases: List[str]
    covered_diseases_icd_codes: Optional[List[str]]
    covered_drug_codes: Optional[List[str]]

    class Config:
        extra = "forbid"

# ----------------- ICD-10 LOOKUP FUNCTION -----------------
def lookup_icd_codes_for_disease(disease: str, max_retries: int = 2) -> List[str]:
    """Fetches up to three ICD-10 codes for a given disease with retries on failure."""

    if not isinstance(disease, str) or not disease.strip():
        logger.error("Invalid disease parameter provided.")
        return []

    url = "https://clinicaltables.nlm.nih.gov/api/icd10cm/v3/search"
    params = {"sf": "code,name", "terms": disease, "maxList": 3}

    for attempt in range(max_retries + 1):
        try:
            logger.info(f"[Attempt {attempt+1}] Fetching ICD-10 codes for disease: '{disease}' with params: {params}")
            resp = requests.get(url, params=params, timeout=10)
            resp.raise_for_status()

            if 'application/json' not in resp.headers.get('Content-Type', ''):
                logger.error(f"Unexpected content type: {resp.headers.get('Content-Type')}")
                return []

            data = resp.json()
            logger.debug(f"Full ICD-10 API Response: {data}")

            # ✅ Extract only the ICD-10 codes and ensure they are valid
            icd_codes = [item for item in data[1] if isinstance(item, str) and len(item) > 3]

            if icd_codes:
                logger.info(f"ICD-10 Codes Found for '{disease}': {icd_codes}")
                return icd_codes
            else:
                logger.warning(f"No valid ICD-10 codes found for '{disease}'.")
                return []

        except requests.RequestException as e:
            logger.error(f"ICD lookup failed for '{disease}' on attempt {attempt+1}: {e}")
            if attempt < max_retries:
                logger.info(f"Retrying ICD lookup for '{disease}'...")
                time.sleep(2)  # Wait before retrying

    logger.error(f"ICD lookup ultimately failed for '{disease}' after {max_retries+1} attempts.")
    return []


# ----------------- RXNORM LOOKUP FUNCTION -----------------
def lookup_drug_details(drug: str, max_retries: int = 2) -> List[str]:
    """Fetches RxNorm IDs for TAH-validated drugs with retries on failure."""
    
    if not isinstance(drug, str) or not drug.strip():
        logger.error("Invalid drug parameter provided.")
        return []

    url = "https://rxnav.nlm.nih.gov/REST/rxcui.json"
    params = {"name": drug}

    for attempt in range(max_retries + 1):
        try:
            logger.info(f"[Attempt {attempt+1}] Fetching RxNorm ID for drug: '{drug}' with params: {params}")
            resp = requests.get(url, params=params, timeout=10)
            resp.raise_for_status()

            if 'application/json' not in resp.headers.get('Content-Type', ''):
                logger.error(f"Unexpected content type: {resp.headers.get('Content-Type')}")
                return []

            data = resp.json()
            logger.debug(f"Full RxNorm API Response: {data}")

            # Extract only the RxNorm IDs
            rxnorm_ids = data.get("idGroup", {}).get("rxnormId", [])

            if rxnorm_ids:
                logger.info(f"RxNorm IDs Found for '{drug}': {rxnorm_ids}")
                return rxnorm_ids
            else:
                logger.warning(f"No RxNorm ID found for '{drug}'.")
                return []

        except requests.RequestException as e:
            logger.error(f"Drug lookup failed for '{drug}' on attempt {attempt+1}: {e}")
            if attempt < max_retries:
                logger.info(f"Retrying RxNorm lookup for '{drug}'...")
                time.sleep(2)  # Wait before retrying

    logger.error(f"RxNorm lookup ultimately failed for '{drug}' after {max_retries+1} attempts.")
    return []


# ----------------- FINAL ENRICHMENT FUNCTION -----------------
def enrich_metadata(metadata: PolicyMetadata) -> PolicyMetadata:
    """Enrich extracted metadata with ICD-10 and RxNorm codes, only for TAH-validated terms."""
    enriched = metadata.model_copy()
    enriched.covered_diseases_icd_codes = [
        code for disease in metadata.indications_diseases
        for code in lookup_icd_codes_for_disease(disease)
    ]
    enriched.covered_drug_codes = [
        code for drug in metadata.drug_names
        for code in lookup_drug_details(drug)
    ]
    return enriched
# --------------------------------------------------------------------------
# Extraction Phase: Use Azure OpenAI to parse policy text into the above schema.
# --------------------------------------------------------------------------
# Optimized function for metadata extraction
def extract_policy_metadata(policy_text: str) -> PolicyMetadata:
    messages = [
        {
            "role": "system",
            "content": (
                "You are an advanced AI system specializing in extracting **structured metadata** from clinical policy documents. "
                "Your goal is to achieve **100% accuracy** by leveraging **Tree of Thought reasoning, multi-step validation techniques, "
                "and automated normalization of payer names, drug names, and medical conditions**.\n\n"

                "### **1️⃣ Extract Policy Name (`policy_name`)** 📄\n"
                "- **Identify the official policy title** from document **headers, footers, or the first paragraph**.\n"
                "- **Ensure Standard Formatting:** Convert extracted titles into a **consistent naming format**.\n"
                "  - **Example Input (Raw OCR Extracted Text):**\n"
                "    - 'DUPIXENT (DUPILUMAB) - PRIOR AUTHORIZATION POLICY'\n"
                "    - 'Anthem BCBS Policy 2024: Prior Authorization - Dupixent'\n"
                "  - **Expected Standard Output:**\n"
                "    - 'Dupixent Prior Authorization Policy'\n\n"

                "2️⃣ **Payer Name (payer_name) - AUTOMATIC NORMALIZATION ENABLED:**\n"
                "   - Locate in **headers, footers, disclaimers, or embedded watermarks**.\n"
                "   - Normalize using the following standardized mapping:\n"
                "     - 'Cigna', 'Cigna Healthcare', 'Cigna Corp' → 'Cigna'\n"
                "     - 'Humana', 'Humana Inc', 'Humana Health Plan' → 'Humana'\n"
                "     - 'United Healthcare', 'United Health Care', 'UHC' → 'UnitedHealthcare'\n"
                "     - 'Anthem Blue Cross Blue Shield' → 'Anthem BCBS'\n"
                "     - 'Blue Cross Blue Shield' → 'BCBS'\n"
                "     - 'Kaiser Permanente' → 'Kaiser Permanente'\n"
                "     - 'Aetna', 'Aetna Health Inc', 'Aetna Insurance' → 'Aetna'\n"
                "     - 'WellCare Health Plans' → 'WellCare'\n"
                "     - 'Medicare Advantage' → 'Medicare'\n"
                "     - 'Medicaid' → 'Medicaid'\n"
                "     - 'MVP Health Care' → 'MVP HealthCare'\n"
                "     - 'HealthFirst' → 'HealthFirst'\n"
                "     - 'Molina Healthcare' → 'Molina Healthcare'\n"
                "     - 'Centene Corporation' → 'Centene'\n"
                "     - 'Blue Shield of California' → 'Blue Shield of California'\n"
                "     - 'Empire Blue Cross Blue Shield' → 'Empire BCBS'\n"
                "     - 'Horizon Blue Cross Blue Shield' → 'Horizon BCBS'\n"

                "3️⃣ **Drug Names (drug_names) - AUTOMATIC NORMALIZATION ENABLED:**\n"
                "   - Extract **all medications mentioned in the policy**.\n"
                "   - Include **both brand and generic names**.\n"
                "   - Normalize using **RxNorm drug database standards**.\n"

                "4️⃣ **Medical Specialties (medical_specialties):**\n"
                "   - Identify relevant **clinical specialties** (e.g., Neurology, Rheumatology, Oncology).\n"
                "   - Cross-check with **medical board designations** to avoid ambiguity.\n"

                "5️⃣ **Indications & Diseases (indications_diseases) - AUTOMATIC NORMALIZATION ENABLED:**\n"
                "   - Extract **every disease, condition, or indication mentioned**.\n"
                "   - Normalize to **ICD-10 classification**.\n"
                "   - Check **eligibility, exclusions, and coverage sections** for additional conditions.\n\n"

                "### **General Guidelines:**\n"
                "- If a field is missing, return an **empty string (`""`)** for text fields or an **empty list (`[]`)** for arrays.\n"
                "- Strictly **follow the JSON schema** without adding extra keys.\n"
                "- **Cross-validate extracted information** across multiple sections to prevent errors.\n"
            )
        },
        {
            "role": "user",
            "content": (
                "Extract and normalize structured metadata from the following insurance policy document:\n\n"
                f"{policy_text}\n\n"
                "### **Output Schema (Strict JSON Format):**\n"
                "{\n"
                '  "policy_name": "The official title of the prior authorization policy.",\n'
                '  "payer_name": "The name of the insurance company, automatically normalized.",\n'
                '  "drug_names": ["List of all referenced drugs, including brand and generic, automatically normalized."],\n'
                '  "medical_specialties": ["List of relevant clinical specialties (e.g., Neurology, Oncology)."],\n'
                '  "indications_diseases": ["List of all conditions covered under this policy, normalized to ICD-10 standards."]\n'
                "}"
            )
        }
    ]

    try:
        response = client.beta.chat.completions.parse(
            model=model_name,
            messages=messages,
            response_format=PolicyMetadata
        )
        metadata = response.choices[0].message.parsed

        # Enrich metadata with ICD-10 and RxNorm codes
        enriched_metadata = enrich_metadata(metadata)
        enriched = enriched_metadata.model_dump()

        return enriched
    
    except Exception as e:
        logger.error("Error during extraction: %s", e)
        raise

In [13]:
extract_policy_metadata = extract_policy_metadata(policy_raw_text_markdown.content)

2025-03-30 21:11:22,810 - micro - MainProcess - INFO     [Attempt 1] Fetching ICD-10 codes for disease: 'Ankylosing Spondylitis' with params: {'sf': 'code,name', 'terms': 'Ankylosing Spondylitis', 'maxList': 3} (1126985018.py:lookup_icd_codes_for_disease:27)
2025-03-30 21:11:23,190 - micro - MainProcess - INFO     ICD-10 Codes Found for 'Ankylosing Spondylitis': ['M08.1', 'M45.6', 'M45.2'] (1126985018.py:lookup_icd_codes_for_disease:42)
2025-03-30 21:11:23,193 - micro - MainProcess - INFO     [Attempt 1] Fetching ICD-10 codes for disease: 'Crohn's Disease' with params: {'sf': 'code,name', 'terms': "Crohn's Disease", 'maxList': 3} (1126985018.py:lookup_icd_codes_for_disease:27)
2025-03-30 21:11:23,351 - micro - MainProcess - INFO     ICD-10 Codes Found for 'Crohn's Disease': ['K50.90', 'K50.913', 'K50.914'] (1126985018.py:lookup_icd_codes_for_disease:42)
2025-03-30 21:11:23,351 - micro - MainProcess - INFO     [Attempt 1] Fetching ICD-10 codes for disease: 'Hidradenitis Suppurativa' wit

In [14]:
extract_policy_metadata

{'policy_name': 'Inflammatory Conditions - Adalimumab Products Prior Authorization Policy',
 'payer_name': 'Cigna',
 'drug_names': ['Abrilada',
  'Adalimumab-aacf',
  'Adalimumab-adaz',
  'Adalimumab-adbm',
  'Adalimumab-fkjp',
  'Adalimumab-ryvk',
  'Amjevita',
  'Cyltezo',
  'Hadlima',
  'Hulio',
  'Humira',
  'Hyrimoz',
  'Idacio',
  'Simlandi',
  'Yuflyma',
  'Yusimry'],
 'medical_specialties': ['Rheumatology',
  'Dermatology',
  'Gastroenterology',
  'Ophthalmology'],
 'indications_diseases': ['Ankylosing Spondylitis',
  "Crohn's Disease",
  'Hidradenitis Suppurativa',
  'Juvenile Idiopathic Arthritis',
  'Plaque Psoriasis',
  'Psoriatic Arthritis',
  'Rheumatoid Arthritis',
  'Ulcerative Colitis',
  'Uveitis',
  'Non-radiographic axial spondyloarthritis',
  'Reactive Arthritis',
  'Inflammatory Bowel Disease-associated arthritis'],
 'covered_diseases_icd_codes': ['M08.1',
  'M45.6',
  'M45.2',
  'K50.90',
  'K50.913',
  'K50.914',
  'L73.2',
  'L40.52',
  'M06.9',
  'M05.9',
  'M

### 3. **Chunk Documents**

In [21]:
from langchain_text_splitters import MarkdownHeaderTextSplitter

headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("####", "Header 4")
]

In [22]:
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
chunks = splitter.split_text(policy_raw_text_markdown.content)

### 4. **Vectorize, Add Metadata, Index**

In [16]:
search_client = SearchClient(
    endpoint=endpoint,
    index_name=os.environ["AZURE_SEARCH_INDEX_NAME_EMPLOYEES"],
    credential=AzureKeyCredential(os.environ["AZURE_SEARCH_ADMIN_KEY"]),
)

In [17]:
# Set the service endpoint and API key from the environment
# Create an SDK client
import openai
from openai import AzureOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter

openai.api_key = os.environ["AZURE_AOAI_API_KEY"]
openai.api_base = os.environ["AZURE_AOAI_API_ENDPOINT"]
openai.api_type = "azure"
openai.api_version = "2023-05-15"

model = os.environ["AZURE_AOAI_EMBEDDING_DEPLOYMENT_ID"]

client = AzureOpenAI(
    api_version=openai.api_version,
    azure_endpoint=openai.api_base,
    api_key=openai.api_key,
)

# This is in characters and there is an avg of 4 chars / token
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=150,
)

In [18]:
n = 100  # max batch size (number of docs) to upload at a time
total_docs_uploaded = 0

In [19]:
# Split up a list into chunks - this is used to ensure a limited number of items sent to Azure AI Search
def divide_chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i : i + n]


# Function to generate embeddings for content fields, also used for query embeddings
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def generate_embeddings(text):
    response = client.embeddings.create(input=text, model=model)
    return json.loads(response.model_dump_json())["data"][0]["embedding"]


chunked_content_docs = []
sfc_counter = 0
for sfc_counter in range(len(final_data_for_indexing)):
    chunked_content = text_splitter.split_text(
        final_data_for_indexing[sfc_counter]["content"]
    )
    chunk_counter = 0
    for cc in chunked_content:
        json_data = copy.deepcopy(final_data_for_indexing[sfc_counter])
        json_data["content"] = chunked_content[chunk_counter]
        json_data["content_vector"] = generate_embeddings(json_data["content"])
        json_data["ParentId"] = f"{json_data['ParentId']}"
        json_data["id"] = f"{json_data['ParentId']}_{chunk_counter}"
        chunk_counter += 1
        chunked_content_docs.append(json_data)
    sfc_counter += 1

total_docs = len(chunked_content_docs)
total_docs_uploaded += total_docs
print(f"Total Documents to Upload: {total_docs}")

for documents_chunk in divide_chunks(chunked_content_docs, n):
    try:
        print(f"Uploading batch of {len(documents_chunk)} documents...")
        result = search_client.upload_documents(documents=documents_chunk)
        # Check if all documents in the batch were uploaded successfully
        if all(res.succeeded for res in result):
            print(f"Upload of batch of {len(documents_chunk)} documents succeeded.")
        else:
            print("Some documents in the batch were not uploaded successfully.")
    except Exception as ex:
        print("Error in multiple documents upload: ", ex)

Total Documents to Upload: 6
Uploading batch of 6 documents...
Upload of batch of 6 documents succeeded.


### Retrieval Data from Azure AI search

Before executing this notebook, please review the notebook `04-retrieval.ipynb` for a better understanding of the process. 

>%pip install azure-search-documents==11.4.0b10

In [1]:
import os
from dotenv import load_dotenv
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import RawVectorQuery

from src.aoai.azure_open_ai import AzureOpenAIManager

In [3]:
# Load environment variables from .env file
load_dotenv()

# Set up Azure Cognitive Search credentials
service_endpoint = os.getenv("AZURE_AI_SEARCH_SERVICE_ENDPOINT")
key = os.getenv("AZURE_SEARCH_ADMIN_KEY")
credential = AzureKeyCredential(key)

# Define the name of the Azure Search index
# This is the index where your data is stored in Azure Search
index_name = os.getenv("AZURE_SEARCH_INDEX_NAME_EMPLOYEES")

# Set up the Azure Search client with the specified index
# This prepares the client to interact with the Azure Search service
search_client = SearchClient(service_endpoint, index_name, credential=credential)

# Set up the Azure Search client with the specified index
# This prepares the client to interact with the Azure Search service
search_client = SearchClient(service_endpoint, index_name, credential=credential)

embedding_aoai_deployment_model = "foundational-canadaeast-ada"
aoai_client = AzureOpenAIManager(embedding_model_name=embedding_aoai_deployment_model)

search_query = "has showcased remarkable expertise and dedication"
search_vector = aoai_client.generate_embedding(search_query)

In [4]:
# hybrid retrieval + rerank
r = search_client.search(
    search_query,
    top=5,
    vector_queries=[
        RawVectorQuery(vector=search_vector, k=50, fields="content_vector")
    ],
    query_type="semantic",
    semantic_configuration_name="combined-index-fields-semantic-config",
    query_language="en-us",
)

for doc in r:
    content = doc["content"].replace("\n", " ")[:100]
    print(
        f"ID: {doc['id']}, Age: {doc['Age']}, Annual Salary: {doc['AnnualSalary']}, score: {doc['@search.score']}, reranker: {doc['@search.reranker_score']}. {content}"
    )

ID: E04105_Employee_Sample_Data_0, Age: 59, Annual Salary: 99975.0, score: 0.03333333507180214, reranker: 3.553462028503418. Theodore has showcased remarkable expertise and dedication in his role as Technical Architect. His c
ID: E02387_Employee_Sample_Data_0, Age: 55, Annual Salary: 141604.0, score: 0.032786883413791656, reranker: 1.8165210485458374. Emily has consistently demonstrated exceptional leadership and technical expertise. Her ability to d
ID: E02387_Employee_Sample_Data_1, Age: 55, Annual Salary: 141604.0, score: 0.03125763311982155, reranker: 1.536381721496582. on time and within budget. Emily's strengths lie in her strategic thinking, leadership, and technica
ID: E02387_Employee_Sample_Data_2, Age: 55, Annual Salary: 141604.0, score: 0.0314980149269104, reranker: 1.4992340803146362. executive leadership training and participating in conferences focusing on emerging technologies. Em
ID: E04105_Employee_Sample_Data_1, Age: 59, Annual Salary: 99975.0, score: 0.03225806355476

### Example 1 - Filtering Data Based on Age and Annual Salary


expression `Age gt 58 and AnnualSalary lt 100000` 

The expression uses the 'gt' (greater than) and 'lt' (less than) operators to filter documents based on the values of the 'Age' and 'AnnualSalary' fields.

"Age gt 58" returns documents where the 'Age' field is greater than 58.
"AnnualSalary lt 100000" returns documents where the 'AnnualSalary' field is less than 100000.

The 'and' operator is used to combine these two conditions, so the expression only returns documents where both conditions are true. That is, it returns documents where the 'Age' field is greater than 58 and the 'AnnualSalary' field is less than 100000.


In [5]:
r = search_client.search(
    search_query,
    top=5,
    vector_queries=[
        RawVectorQuery(vector=search_vector, k=50, fields="content_vector")
    ],
    query_type="semantic",
    semantic_configuration_name="combined-index-fields-semantic-config",
    query_language="en-us",
    filter="Age gt 58 and AnnualSalary lt 100000",  # Add filter expression
)

for doc in r:
    content = doc["content"].replace("\n", " ")[:1000000]
    print(
        f"ID: {doc['id']}, Age: {doc['Age']}, Annual Salary: {doc['AnnualSalary']}, score: {doc['@search.score']}, reranker: {doc['@search.reranker_score']}. {content}"
    )

ID: E04105_Employee_Sample_Data_0, Age: 59, Annual Salary: 99975.0, score: 0.03333333507180214, reranker: 3.553462028503418. Theodore has showcased remarkable expertise and dedication in his role as Technical Architect. His c
ID: E04105_Employee_Sample_Data_1, Age: 59, Annual Salary: 99975.0, score: 0.032786883413791656, reranker: 1.4954043626785278. architecture, resulting in a 20% improvement in system efficiency and a significant reduction in dow
ID: E04105_Employee_Sample_Data_2, Age: 59, Annual Salary: 99975.0, score: 0.032258063554763794, reranker: 1.4904258251190186. advanced certifications in emerging technologies relevant to manufacturing IT systems and attending 


## Indexing Content from Multiple Sources (Azure SQL DB and Blob Storage) Using Azure AI search Indexer Option

In [1]:
from src.indexers.sql_Indexing import AzureSQLManager

DATABASE = "dev-sql-server"
az_sql_client = AzureSQLManager(DATABASE)

In [2]:
table_name = "foodreview"

# Drop previous table of same name if one exists
az_sql_client.execute(f"DROP TABLE IF EXISTS {table_name};")
print(
    f"Finished dropping table '{table_name}' if it existed. This ensures we are starting with a clean slate."
)

# Create a table
az_sql_client.execute(
    f"""
               CREATE TABLE {table_name} 
               (Id int NOT NULL, 
               CONSTRAINT PK_{table_name}_Id PRIMARY KEY CLUSTERED (Id), 
               ProductId text, 
               UserId text, 
               ProfileName text, 
               HelpfulnessNumerator integer, 
               HelpfulnessDenominator integer, 
               Score integer, 
               Time bigint, 
               Summary text, 
               Recommnedation NVARCHAR(MAX));
               """
)
print(
    f"Finished creating table '{table_name}'. This table will store our food review data."
)

# Create an index
az_sql_client.execute(f"CREATE INDEX idx_Id ON {table_name}(Id);")
print(
    f"Finished creating index 'idx_Id' on table '{table_name}'. This index will improve the performance of queries that filter by the 'Id' field."
)

Finished dropping table 'foodreview' if it existed. This ensures we are starting with a clean slate.
Finished creating table 'foodreview'. This table will store our food review data.
Finished creating index 'idx_Id' on table 'foodreview'. This index will improve the performance of queries that filter by the 'Id' field.


### Enable change tracking

This allows us to automatically update the index when changes are made to the data.

In [3]:
try:
    # Commit any active transactions
    az_sql_client.cursor.commit()

    # Enable change tracking at the database level
    az_sql_client.execute(
        f"ALTER DATABASE [{DATABASE}] SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)"
    )
except Exception as e:
    print(e)

ERROR:root:Error executing query: ALTER DATABASE [dev-sql-server] SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
Error Message: ('42000', "[42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Change tracking is already enabled for database 'dev-sql-server'. (5088) (SQLExecDirectW); [42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]ALTER DATABASE statement failed. (5069)")
Traceback: Traceback (most recent call last):
  File "c:\Users\pablosal\Desktop\gbbai-azure-ai-search-indexing\src\indexers\sql_Indexing.py", line 78, in execute
    self.cursor.execute(query)
pyodbc.ProgrammingError: ('42000', "[42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Change tracking is already enabled for database 'dev-sql-server'. (5088) (SQLExecDirectW); [42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]ALTER DATABASE statement failed. (5069)")



('42000', "[42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Change tracking is already enabled for database 'dev-sql-server'. (5088) (SQLExecDirectW); [42000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]ALTER DATABASE statement failed. (5069)")


In [4]:
try:
    # Enable change tracking on a specific table.
    # This is necessary to keep track of changes made to this specific table.
    # The TRACK_COLUMNS_UPDATED option is turned on, allowing the system to keep track of which columns were updated.
    az_sql_client.execute(
        f"ALTER TABLE {table_name} ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)"
    )
except Exception as e:
    print(e)

In [5]:
import numpy as np
import pandas as pd

try:
    all_reviews = pd.read_csv("utils/data/Reviews_small.csv")
    sample_reviews = all_reviews[:1000].copy()
    product_text_reviews = sample_reviews[["ProductId", "Text"]].copy()
    sample_reviews.drop(columns=["Text"], inplace=True)
    product_text_reviews.to_csv("utils/data/Reviews_text.csv", index=False)
except FileNotFoundError:
    print("File not found.")
except Exception as e:
    print(f"Error: {e}")

### Uploading Data to SQL Server

In [6]:
batch_size = 30
batches = [
    sample_reviews[i : i + batch_size]
    for i in range(0, len(sample_reviews), batch_size)
]

# Iterate over each batch and insert or update the data in the database
for batch in batches:
    # Convert the batch dataframe to a list of tuples for bulk insertion
    rows = [tuple(row) for row in batch.itertuples(index=False)]

    # Define the SQL query for bulk insertion or update
    query = f"""
    MERGE INTO {table_name} AS Target
    USING (VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)) AS Source (Id, ProductId, UserId, ProfileName, HelpfulnessNumerator, HelpfulnessDenominator, Score, Time, Summary)
    ON CAST(Target.ProductId AS NVARCHAR(MAX)) = Source.ProductId
    WHEN MATCHED THEN 
        UPDATE SET UserId = Source.UserId, ProfileName = Source.ProfileName, HelpfulnessNumerator = Source.HelpfulnessNumerator, HelpfulnessDenominator = Source.HelpfulnessDenominator, Score = Source.Score, Time = Source.Time, Summary = Source.Summary
    WHEN NOT MATCHED THEN
        INSERT (Id, ProductId, UserId, ProfileName, HelpfulnessNumerator, HelpfulnessDenominator, Score, Time, Summary)
        VALUES (Source.Id, Source.ProductId, Source.UserId, Source.ProfileName, Source.HelpfulnessNumerator, Source.HelpfulnessDenominator, Source.Score, Source.Time, Source.Summary);
    """
    az_sql_client.cursor.executemany(query, rows)

In [7]:
# Execute the SELECT statement
try:
    data = az_sql_client.execute_and_fetch(f"SELECT count(Id) FROM {table_name};")
    print(data)
except Exception as e:
    print(f"Error executing SELECT statement: {e}")

[(29,)]


In [9]:
data = az_sql_client.execute_and_fetch(f"SELECT * FROM {table_name};")
data[:5]

[(1, 'B001E4KFG0', 'A3SGXH7AUHU8GW', 'delmartian', 1, 1, 5, 1303862400, 'Good Quality Dog Food', None),
 (2, 'B00813GRG4', 'A1D87F6ZCVE5NK', 'dll pa', 0, 0, 1, 1346976000, 'Not as Advertised', None),
 (3, 'B000LQOCH0', 'ABXLMWJIXXAIN', 'Natalia Corres "Natalia Corres"', 1, 1, 4, 1219017600, '"Delight" says it all', None),
 (4, 'B000UA0QIQ', 'A395BORC6FGVXV', 'Karl', 3, 3, 2, 1307923200, 'Cough Medicine', None),
 (5, 'B006K2ZZ7K', 'A3JRGQVEQN31IQ', 'Pamela G. Williams', 0, 0, 5, 1336003200, 'Wonderful, tasty taffy', None)]

### Indexing Data from Blob Storage

In [10]:
from src.extractors.blob_data_extractors import AzureBlobDataExtractor

blob_client = AzureBlobDataExtractor()

In [11]:
df_from_blob = blob_client.read_csv_from_blob(
    "Reviews_text.csv", "testretrieval", sep=",", encoding="ISO-8859-1", header=0
)
df_from_blob["ProductId"] = df_from_blob["ProductId"].astype(str)
df_from_blob["Recommnedation"] = df_from_blob["Text"].astype(str)

In [12]:
batch_size = 30
batches = [
    df_from_blob[i : i + batch_size] for i in range(0, len(df_from_blob), batch_size)
]

for batch in batches:
    # Convert the batch dataframe to a list of tuples for bulk update
    # Adjust the unpacking to match the actual structure of your DataFrame tuples
    rows = [(text, product_id) for _, product_id, text, *_ in batch.itertuples()]

    # Define the SQL query for bulk update
    query = f"UPDATE {table_name} SET Recommnedation = ? WHERE CAST(ProductId AS NVARCHAR(MAX)) = ?"
    az_sql_client.cursor.executemany(query, rows)

In [14]:
data = az_sql_client.execute_and_fetch(f"SELECT * FROM {table_name};")
data[:5]

[(1, 'B001E4KFG0', 'A3SGXH7AUHU8GW', 'delmartian', 1, 1, 5, 1303862400, 'Good Quality Dog Food', 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.'),
 (2, 'B00813GRG4', 'A1D87F6ZCVE5NK', 'dll pa', 0, 0, 1, 1346976000, 'Not as Advertised', 'Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".'),
 (3, 'B000LQOCH0', 'ABXLMWJIXXAIN', 'Natalia Corres "Natalia Corres"', 1, 1, 4, 1219017600, '"Delight" says it all', 'This is a confection that has been around a few centuries.  It is a light, pillowy citrus gelatin with nuts - in this case Filberts. And it is cut into tiny squares and then liberally coated with powdered sugar.  And it is a tiny mou