# Azure AI Search Vector Index Compression
This notebook demonstrates different compression configurations for vector search indexes and their impact on storage.

## Prerequisites

Before running the notebook, ensure you have the following: 

- [Fork](https://github.com/microsoft/rag-time/fork) the repository and clone it to your local machine by following the script below:

    ```bash
    git clone https://github.com/your-org/rag-time.git
    cd rag-time
    ```

- An [Azure account](https://portal.azure.com) with proper permissions to access the following services:
    - An **Azure OpenAI** service with an active deployment of a **chat model** and an **embedding model**.
    - An **Azure AI Search** service with an index that contains vectorized text data. Follow the instructions in the [Quickstart](https://learn.microsoft.com/en-us/azure/search/search-get-started-portal-import-vectors?tabs=sample-data-storage%2Cmodel-aoai%2Cconnect-data-storage) to index the documents in [data](./../../data/) folder. 
- Install Python 3.8 or later from [python.org](https://python.org).

## Steps to Use the Notebook

### 1. Set Up Environment Variables

To store credentials securely, rename `.env.sample` file to `.env` in the same directory as the notebook and update the following variables:

```bash
AZURE_SEARCH_SERVICE_ENDPOINT="https://SEARCHSERVICE.search.windows.net"
AZURE_SEARCH_ADMIN_KEY=
AZURE_OPENAI_SERVICE_ENDPOINT="https://OPENAISERVICE.openai.azure.com/"
AZURE_OPENAI_EMBED_DEPLOYMENT="text-embedding-3-large"
```

### 2. Install Required Libraries

Run the first code cell to install the required Python libraries:

In [None]:
%pip install azure-search-documents==11.6.0b8
%pip install azure-identity
%pip install datasets
%pip install tabulate
%pip install python-dotenv
%pip install openai

### 3. Import Dependencies

Run the following command to load environment variables from the `.env` file and import dependencies:

In [None]:
import json
import os

from azure.core.exceptions import ResourceExistsError
from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    BinaryQuantizationCompression,
    HnswAlgorithmConfiguration,
    HnswParameters,
    ScalarQuantizationCompression,
    ScalarQuantizationParameters,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    SimpleField,
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchProfile,
    VectorSearchCompressionRescoreStorageMethod,
    RescoringOptions
)
from azure.search.documents.models import VectorizedQuery
from dotenv import load_dotenv
from tabulate import tabulate

### 4. Set Configuration Constants

In [None]:
# Load environment variables from .env file
load_dotenv(override=True)

# Azure Search Configuration
SERVICE_ENDPOINT = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
credential = DefaultAzureCredential()


### 5. Example Search Index Configuration

Define the example search index with a small vector dimension:

In [None]:
index_client = SearchIndexClient(endpoint=SERVICE_ENDPOINT, credential=credential)

index = SearchIndex(
    name="tinyindex",
    fields=[
        SimpleField(name="id", type=SearchFieldDataType.String, key=True),
        SearchField(
            name="embedding",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            searchable=True,
            vector_search_dimensions=3,
            vector_search_profile_name="embedding_profile",
            stored=False
        ),
    ],
    vector_search=VectorSearch(
        algorithms=[
            HnswAlgorithmConfiguration(
                name="hnsw_config",
                kind=VectorSearchAlgorithmKind.HNSW,
                parameters=HnswParameters(metric="cosine"),
            )
        ],
        profiles=[
            VectorSearchProfile(
                name="embedding_profile", algorithm_configuration_name="hnsw_config", compression_name="binary_compression"
            )
        ],
        compressions=[
            BinaryQuantizationCompression(
                compression_name="binary_compression",
                rerank_with_original_vectors=None,
                default_oversampling=None,
                rescoring_options=RescoringOptions(
                    enable_rescoring=False,
                    rescore_storage_method=VectorSearchCompressionRescoreStorageMethod.DISCARD_ORIGINALS,
                ),
            )
        ],
    ),
)

index_client.create_index(index)

### 6. Define Test Scenarios 

See explanations in reference here:
https://learn.microsoft.com/python/api/azure-search-documents/azure.search.documents.indexes.models.searchfield?view=azure-python


In [None]:
INDEX_PREFIX = "compression-test"

scenarios = [
    {
        "name": "baseline",
        "compression_type": None,
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": True,
        "description": "Baseline configuration without compression"
    },
    {
        "name": "baseline-s",
        "compression_type": None,
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Baseline configuration without compression, with stored=False "
    },
    {
        "name": "scalar-full",
        "compression_type": "scalar",
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with full dimensions, preserved originals"
    },
    {
        "name": "scalar-truncated-1024",
        "compression_type": "scalar",
        "truncate_dims": 1024,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with 1024 dimensions, preserved originals"
    },
    {
        "name": "scalar-truncated-1024-discard",
        "compression_type": "scalar",
        "truncate_dims": 1024,
        "discard_originals": True,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with 1024 dimensions, discarded originals"
    },
    {
        "name": "binary-full",
        "compression_type": "binary",
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Binary quantization with full dimensions, preserved originals"
    },
    {
        "name": "binary-truncated-1024",
        "compression_type": "binary",
        "truncate_dims": 1024,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Binary quantization with 1024 dimensions, preserved originals"
    },
    {
        "name": "binary-truncated-1024-discard",
        "compression_type": "binary",
        "truncate_dims": 1024,
        "discard_originals": True,
        "stored_embedding": False,
        "description": "Binary quantization with 1024 dimensions, discarded originals"
    }
]

### 7. Execute Index Creation

Run the provided code to create vector search indexes with different compression settings. This allows comparing storage efficiency and search quality.

In [None]:
class AzureSearchIndexManager:
    def __init__(self, service_endpoint: str, credential: str, index_name_prefix: str, vector_dimensions: int):
        self.client = SearchIndexClient(endpoint=service_endpoint, credential=credential)
        self.index_name_prefix = index_name_prefix
        self.vector_dimensions = vector_dimensions

    def _create_base_fields(self, stored_embedding=True):
        return [
            SimpleField(name="id", type=SearchFieldDataType.String, key=True),
            SearchField(name="title", type=SearchFieldDataType.String, searchable=True),
            SearchField(name="content", type=SearchFieldDataType.String, searchable=True),
            SearchField(
                name="embedding",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True,
                vector_search_dimensions=self.vector_dimensions,
                vector_search_profile_name="default-profile",
                stored=stored_embedding,
            ),
        ]

    def _create_compression_config(
        self,
        config_type: str,
        truncate_dims: int = None,
        discard_originals: bool = False,
        oversample_ratio: int = 10,
    ):
        """
        Creates a compression configuration based on the scenario.
        """
        compression_name = f"{config_type}-compression"

        # Determine the storage method based on whether originals are discarded
        rescore_storage_method = (
            VectorSearchCompressionRescoreStorageMethod.DISCARD_ORIGINALS
            if discard_originals
            else VectorSearchCompressionRescoreStorageMethod.PRESERVE_ORIGINALS
        )

        # Enable rescoring only if originals are preserved
        enable_rescoring = not discard_originals

        # Configure rescoring options
        rescoring_options = RescoringOptions(
            enable_rescoring=enable_rescoring,
            default_oversampling=oversample_ratio if enable_rescoring else None,
            rescore_storage_method=rescore_storage_method,
        )

        # Base parameters for compression
        base_params = {
            "compression_name": compression_name,
            "rescoring_options": rescoring_options,
            # Explicitly set deprecated parameters to None
            "rerank_with_original_vectors": None,
            "default_oversampling": None,
        }

        # Add truncation dimension if specified
        if truncate_dims:
            base_params["truncation_dimension"] = truncate_dims

        # Create the appropriate compression object
        if config_type == "scalar":
            compression = ScalarQuantizationCompression(
                parameters=ScalarQuantizationParameters(quantized_data_type="int8"),
                **base_params,
            )
        elif config_type == "binary":
            compression = BinaryQuantizationCompression(
                **base_params,
            )
        else:
            compression = None

        return compression

    def _create_vector_search_config(self, compression_config=None):
        """
        Creates the VectorSearch configuration, including algorithm and compression settings.
        """
        # Define the HNSW algorithm configuration
        algorithm_config = HnswAlgorithmConfiguration(
            name="hnsw-config",
            kind="hnsw",
            parameters=HnswParameters(
                m=4,
                ef_construction=400,
                ef_search=500,
                metric="cosine"
            )
        )

        # Define the VectorSearchProfile
        profiles = [
            VectorSearchProfile(
                name="default-profile",
                algorithm_configuration_name=algorithm_config.name,
                compression_name=compression_config.compression_name if compression_config else None,
            )
        ]

        # Assemble the VectorSearch configuration
        vector_search = VectorSearch(
            profiles=profiles,
            algorithms=[algorithm_config],
            compressions=[compression_config] if compression_config else None,
        )

        return vector_search

    def create_index(self, scenario: dict):
        """
        Creates or updates an index based on the provided scenario.
        """
        index_name = f"{self.index_name_prefix}-{scenario['name']}"

        # Use the 'stored_embedding' value from the scenario
        stored_embedding = scenario.get('stored_embedding', True)

        # Create base fields with the stored_embedding flag
        fields = self._create_base_fields(stored_embedding=stored_embedding)

        # Create compression configuration if specified
        compression_config = None
        if scenario["compression_type"]:
            compression_config = self._create_compression_config(
                config_type=scenario["compression_type"],
                truncate_dims=scenario.get("truncate_dims"),
                discard_originals=scenario.get("discard_originals", False),
            )

        # Create vector search configuration
        vector_search = self._create_vector_search_config(compression_config)

        # Define the SearchIndex
        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
        )

        # Create or update the index
        try:
            self.client.create_or_update_index(index)
        except ResourceExistsError:
            print(f"Index {index_name} already exists.")
        except Exception as e:
            if e.message and "already exists" in e.message:
                print(f"Index {index_name} already exists.")
            else:
                print(f"Error creating index {index_name}: {type(e)} - {str(e)}")
    
        # Return the index name
        return index_name

manager = AzureSearchIndexManager(
    service_endpoint=SERVICE_ENDPOINT,
    credential=credential,
    index_name_prefix=INDEX_PREFIX,
    vector_dimensions=3072)

# Create all index configurations
created_indexes = []
for scenario in scenarios:
    try:
        index_name = manager.create_index(scenario)
        created_indexes.append({
            "index_name": index_name,
            "configuration": scenario["description"]
        })
    except Exception as e:
        print(f"Error creating index for scenario {scenario['name']}: {str(e)}")

# Display created indexes
if len(created_indexes) > 0:
    print("\nCreated Indexes:")
    tabulate(created_indexes, headers="keys", tablefmt="html")
else:
    print("\nNo indexes were created successfully.")

In [None]:
from datasets import load_dataset
import pyarrow.parquet as pq
import pyarrow as pa

# Load dataset in streaming mode and select only needed columns
print("Loading dataset in streaming mode...")
ds = load_dataset(
    "Qdrant/dbpedia-entities-openai3-text-embedding-3-large-3072-1M",
    streaming=True,
    split='train'
)

# Select only needed columns
ds = ds.select_columns(["_id", "title", "text", "text-embedding-3-large-3072-embedding"])

# Take first 100K examples
print("Taking first 100K examples...")
data = []
for i, example in enumerate(ds):
    if i >= 100000:
        break
    data.append(example)

# Convert to Arrow table and save to parquet
print("Converting to parquet...")
output_file = "dbpedia_100k.parquet"
table = pa.Table.from_pylist(data)  # Using pa.Table instead of pq.Table
pq.write_table(
    table,
    output_file,
    compression='snappy'  # Good balance of compression and speed
)

print(f"Dataset saved to {output_file}")

# Verify the saved file
table = pq.read_table(output_file)
print(f"\nSaved dataset shape: {table.num_rows} rows × {table.num_columns} columns")
print("Columns:", table.column_names)

In [None]:
# Read the parquet file
table = pq.read_table("dbpedia_100k.parquet")

# Convert first row to the exact format we need
first_doc = {
    "id": str(table['_id'][0].as_py()),
    "title": table['title'][0].as_py(),
    "content": table['text'][0].as_py(),
    "embedding": table['text-embedding-3-large-3072-embedding'][0].as_py()
}

# Print info about the dataset
print("Dataset Information:")
print(f"Total number of rows: {table.num_rows}")
print(f"Columns: {table.column_names}")
print("\nFirst document structure:")
print(json.dumps(first_doc, indent=2, default=str))

# Print some basic stats about the data
print("\nData Statistics:")
print(f"Embedding dimension: {len(first_doc['embedding'])}")
print(f"Average title length: {sum(len(str(title)) for title in table['title']) / table.num_rows:.1f} characters")
print(f"Average text length: {sum(len(str(text)) for text in table['text']) / table.num_rows:.1f} characters")

# Verify there are no null values
print("\nChecking for null values:")
for column in table.column_names:
    null_count = table[column].null_count
    print(f"{column}: {null_count} null values")

In [None]:
import base64
import time
from datetime import datetime
from typing import List, Dict, Any
import json

import pyarrow.parquet as pq


def chunk_list(lst: List[Any], chunk_size: int) -> List[List[Any]]:
    """Split a list into chunks of specified size."""
    return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]


def encode_key(key: str) -> str:
    """Encode key to be Azure Search compatible using URL-safe base64."""
    return base64.urlsafe_b64encode(key.encode()).decode()


def prepare_documents(table) -> List[Dict]:
    """
    Convert Arrow table to list of documents with base64 encoded IDs.
    """
    documents = []
    total_rows = table.num_rows

    print(f"Converting {total_rows} rows to documents...")
    for i in range(total_rows):
        # Encode the ID to make it Azure Search compatible
        original_id = str(table["_id"][i].as_py())
        encoded_id = encode_key(original_id)

        document = {
            "id": encoded_id,
            "title": table["title"][i].as_py(),
            "content": table["text"][i].as_py(),
            "embedding": table["text-embedding-3-large-3072-embedding"][i].as_py(),
        }
        documents.append(document)

        if i % 1000 == 0:  # Progress indicator
            print(f"Processed {i}/{total_rows} documents...")

    print("Document conversion complete")

    # Print first document as sample (including both original and encoded ID)
    print("\nSample document format:")
    sample_doc = documents[0].copy()
    print("Original ID:", original_id)
    print("Encoded ID:", sample_doc["id"])
    print(json.dumps(sample_doc, indent=2, default=str))

    return documents


def upload_to_search(
    documents: List[Dict],
    endpoint: str,
    index_name: str,
    credential,
    batch_size: int = 100,
) -> None:
    """
    Upload documents to Azure Search index using manual batching.
    """
    # Create search client
    search_client = SearchClient(
        endpoint=endpoint, index_name=index_name, credential=credential
    )

    total_docs = len(documents)
    print(f"\nStarting upload to index: {index_name} at {datetime.now()}")
    print(f"Total documents to upload: {total_docs}")
    start_time = datetime.now()

    # Split documents into batches
    batches = chunk_list(documents, batch_size)
    total_batches = len(batches)

    successful_docs = 0
    failed_docs = 0

    try:
        for batch_num, batch in enumerate(batches, 1):
            max_retries = 3
            retry_count = 0

            while retry_count < max_retries:
                try:
                    results = search_client.upload_documents(documents=batch)

                    # Count successes and failures
                    for result in results:
                        if result.succeeded:
                            successful_docs += 1
                        else:
                            failed_docs += 1
                            print(
                                f"Failed to upload document {result.key}: {result.error}"
                            )

                    elapsed_time = datetime.now() - start_time
                    print(
                        f"Index {index_name}: Processed batch {batch_num}/{total_batches} "
                        f"({successful_docs}/{total_docs} docs). "
                        f"Elapsed time: {elapsed_time}"
                    )

                    # Short pause between batches to prevent throttling
                    time.sleep(0.25)
                    break  # Success - exit retry loop

                except Exception as e:
                    retry_count += 1
                    if retry_count == max_retries:
                        print(
                            f"Failed to upload batch after {max_retries} attempts: {str(e)}"
                        )
                        failed_docs += len(batch)
                    else:
                        print(
                            f"Retry {retry_count}/{max_retries} after error: {str(e)}"
                        )
                        time.sleep(2**retry_count)  # Exponential backoff

        total_time = datetime.now() - start_time
        print(f"\nUpload to {index_name} completed:")
        print(f"Successfully uploaded: {successful_docs} documents")
        print(f"Failed to upload: {failed_docs} documents")
        print(f"Total time: {total_time}")

        # Verify final count
        result = search_client.search("*", top=0)
        final_count = result.get_count()
        print(f"Final document count in index: {final_count}")

    except Exception as e:
        print(f"Fatal error during upload to {index_name}: {str(e)}")
        raise


def upload_to_all_indexes(
    documents: List[Dict],
    endpoint: str,
    index_prefix: str,
    scenarios: list,
    credential: AzureKeyCredential,
    batch_size: int = 100,
) -> None:
    """
    Upload documents to all indexes sequentially.
    """
    total_start_time = datetime.now()

    for i, scenario in enumerate(scenarios, 1):
        index_name = f"{index_prefix}-{scenario['name']}"
        print(f"\nProcessing index {i} of {len(scenarios)}: {index_name}")
        upload_to_search(
            documents=documents,
            endpoint=endpoint,
            index_name=index_name,
            credential=credential,
            batch_size=batch_size,
        )

    total_time = datetime.now() - total_start_time
    print(f"\nCompleted all uploads. Total time: {total_time}")


# Usage:
print("Loading parquet file...")
table = pq.read_table("dbpedia_100k.parquet")
print(f"Loaded {table.num_rows} rows")

# Prepare documents once
documents = prepare_documents(table)

# Create credential and upload to all indexes
upload_to_all_indexes(
    documents=documents,
    endpoint=SERVICE_ENDPOINT,
    index_prefix=INDEX_PREFIX,
    scenarios=scenarios,
    credential=credential,
    batch_size=100,
)

In [None]:
from azure.search.documents.indexes import SearchIndexClient
from tabulate import tabulate 

def bytes_to_mb(bytes):
    """Convert bytes to megabytes with 4 decimal places"""
    return round(bytes / (1024 * 1024), 4)

def get_index_sizes(
    endpoint: str,
    index_prefix: str,
    scenarios: list,
    credential,
    retry_attempts: int = 3
) -> None:
    """
    Get and print storage sizes for all indexes, with retry logic for eventual consistency.
    """
    # Create search index client
    search_index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
    
    print("\nGathering index statistics...")
    print("Note: There may be delays in finding index statistics after document upload")
    print("Index statistics is not a real-time API\n")
    
    # Collect all index sizes with retries
    index_data = []
    for scenario in scenarios:
        index_name = f"{index_prefix}-{scenario['name']}"
        
        for attempt in range(retry_attempts):
            try:
                stats = search_index_client.get_index_statistics(index_name)
                storage_size = bytes_to_mb(stats["storage_size"])
                vector_size = bytes_to_mb(stats["vector_index_size"])
                total_size = storage_size + vector_size
                index_data.append({
                    'Index Name': index_name,
                    'Scenario': scenario['name'],
                    'Storage Size (MB)': storage_size,
                    'Vector Size (MB)': vector_size,
                    'Total Size (MB)': total_size,  # Added for sorting
                })
                break
            except Exception as e:
                if attempt == retry_attempts - 1:
                    print(f"Failed to get statistics for {index_name} after {retry_attempts} attempts: {str(e)}")
                else:
                    print(f"Retry {attempt + 1}/{retry_attempts} for {index_name}")
                    time.sleep(2 ** attempt)  # Exponential backoff
    
    # Find baseline storage and vector sizes
    baseline_entry = next((entry for entry in index_data if entry['Scenario'] == 'baseline'), None)
    if not baseline_entry:
        print("Baseline scenario not found.")
        return
    baseline_storage_size = baseline_entry['Storage Size (MB)']
    baseline_vector_size = baseline_entry['Vector Size (MB)']
    
    # Compute reduction percentages compared to baseline
    for entry in index_data:
        storage_reduction_pct = ((baseline_storage_size - entry['Storage Size (MB)']) / baseline_storage_size) * 100
        vector_reduction_pct = ((baseline_vector_size - entry['Vector Size (MB)']) / baseline_vector_size) * 100
        entry['Storage Reduction (%)'] = f"{storage_reduction_pct:.2f}"
        entry['Vector Reduction (%)'] = f"{vector_reduction_pct:.2f}"
    
    # Sort by total size
    index_data.sort(key=lambda x: x['Total Size (MB)'], reverse=True)
    
    # Prepare table headers and rows
    headers = [
        'Index Name', 'Scenario', 'Storage Size (MB)', 'Storage Reduction (%)',
        'Vector Size (MB)', 'Vector Reduction (%)'
    ]
    table_rows = [
        [
            entry['Index Name'],
            entry['Scenario'],
            f"{entry['Storage Size (MB)']:.4f}",
            entry['Storage Reduction (%)'],
            f"{entry['Vector Size (MB)']:.4f}",
            entry['Vector Reduction (%)']
        ]
        for entry in index_data
    ]
    
    # Print the table using tabulate
    return tabulate(table_rows, headers=headers, tablefmt="html")
    

# Get and print index sizes
get_index_sizes(
    endpoint=SERVICE_ENDPOINT,
    index_prefix=INDEX_PREFIX,
    scenarios=scenarios,
    credential=credential
)


### 8. Search quality

Let's test the search quality of the compressed indexes.

In [None]:
import os

import azure.identity
import openai


token_provider = azure.identity.get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default")
openai_client = openai.AzureOpenAI(
    api_version="2023-07-01-preview",
    azure_endpoint=os.environ["AZURE_OPENAI_SERVICE_ENDPOINT"],
    azure_ad_token_provider=token_provider)

def get_embedding(text):
    get_embeddings_response = openai_client.embeddings.create(model=os.environ["AZURE_OPENAI_EMBED_DEPLOYMENT"], input=text)
    return get_embeddings_response.data[0].embedding

results = {}
for scenario in scenarios:
    results[scenario['name']] = []
    index_name = f"{INDEX_PREFIX}-{scenario['name']}"
    search_client = SearchClient(SERVICE_ENDPOINT, index_name, credential=credential)
    search_query = "first avian dinosaur in the fossil record"
    search_vector = get_embedding(search_query)
    r = search_client.search(
            top=5, 
            vector_queries=[
                    VectorizedQuery(vector=search_vector, k_nearest_neighbors=50, fields="embedding")])
    for doc in r:
        results[scenario['name']].append(doc["title"])

rows = []
headers = [scenario['name'] for scenario in scenarios]
for i in range(5):
    rows.append([results[scenario['name']][i] for scenario in scenarios])
tabulate(rows, headers=headers, tablefmt="html")

## Troubleshooting

- **Environment Variables Not Loaded:** Ensure you have correctly set the .env file or manually export them in your terminal before running the notebook
- **Authentication Issues:** If using Managed Identity, make sure your Azure identity has proper role assignments.
- **Dataset Not Found:** Ensure that the dataset is correctly downloaded and converted into Parquet format.
- **Index Creation Errors:** Check that your Azure AI Search service is configured correctly and has the required storage capacity.

## Summary

This notebook demonstrates different vector search compression techniques in Azure AI Search, including scalar and binary quantization. It evaluates the impact of compression on storage efficiency and retrieval quality, helping optimize large-scale AI search applications.

