# Microsoft Build 2025, DEM567: Tips for fast vector and full-text search with Azure Cosmos DB

In [None]:
from dotenv import dotenv_values
import json
from azure.cosmos import CosmosClient, PartitionKey, exceptions
from azure.cosmos.aio import CosmosClient as AsyncClient
from azure.identity import DefaultAzureCredential
from openai import AzureOpenAI
import asyncio

import nest_asyncio
nest_asyncio.apply()

In [None]:
# following example.env template change to your own .env file name
config = dotenv_values(".env")
cosmos_endpoint =  config['cosmos_endpoint']
cosmos_database = config['cosmos_database']
cosmos_container = config['cosmos_container']
openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = config['openai_embeddings_dimensions']
openai_api_version = config['openai_api_version']

credential = DefaultAzureCredential() # use EntraID auth for Azure Cosmos DB

In [None]:
def generate_embeddings(text):
    response = openai_client.embeddings.create(
        input=text,
        model=openai_embeddings_deployment)     
    embeddings = response.model_dump()
    return embeddings['data'][0]['embedding']

In [None]:
cosmos_client = CosmosClient(url=cosmos_endpoint, credential=credential)
db = cosmos_client.get_database_client(cosmos_database)
container = db.get_container_client(cosmos_container)

openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)

## Setup Azure Cosmos DB with container and indexing policies for search

In [None]:
vector_embedding_policy = { 
"vectorEmbeddings": [ 
{ 
    "path": "/embedding", 
    "dataType": "float32", 
    "distanceFunction": "cosine", 
    "dimensions": 3072 
}, 
    ]    
}

full_text_policy = {
   "defaultLanguage": "en-US",
   "fullTextPaths": [
       {
           "path": "/description",
           "language": "en-US"
       },
       {
           "path": "/summary_review",
           "language": "en-US"
       }
   ]
}

In [None]:
indexing_policy = { 
"includedPaths": [ 
    { 
    "path": "/*" 
    } 
    ], 
    "excludedPaths": [ 
    { 
"path": "/\"_etag\"/?",
            "path": "/embedding/*",
            
} 
], 
"vectorIndexes": [ 
{
    "path": "/embedding", 
    "type": "diskANN"
    } 
],
"fullTextIndexes": [
    {
        "path": "/description",
    },
    {
        "path": "/summary_review",
    }
]
}

In [None]:
# Create CosmosDB Container
container = db.create_container_if_not_exists(
    id='Products',
    partition_key=PartitionKey(path='/id'),
    indexing_policy=indexing_policy,
    vector_embedding_policy=vector_embedding_policy
)

## Insert data for search examples

In [None]:
# Load sample data
with open('data/e-retail-data-3072D.json', 'r') as file:
    data = json.load(file) 

In [None]:
# Insert Data into CosmosDB
for i in data:
    container.create_item(body=i)

## Define search terms and generate embeddings

In [82]:
search_terms = "luxury bags"
emb = generate_embeddings(search_terms)
full_text = search_terms.split()

## Vector Search

In [98]:
results = container.query_items(
query = f'''
        SELECT TOP 10 c.id, c.product_name, c.description, VectorDistance(c.embedding, {emb}) as Score
        FROM c
        ORDER BY VectorDistance(c.embedding, {emb}, false, {{"searchListSizeMultiplier": 5}})
    ''',
          enable_cross_partition_query=True, populate_query_metrics=True)

for r in list(results):
    print(json.dumps(r,indent=4))

{
    "id": "Product-60",
    "product_name": "Luxury Handbag Glam with Leather",
    "description": "Indulge in luxury with our handcrafted leather handbag. Perfect for any occasion, this elegant accessory will elevate your style effortlessly.",
    "Score": 0.5735780326825795
}
{
    "id": "Product-95",
    "product_name": "Luxe Elegant Leather Handbag",
    "description": "Crafted from premium leather, this handbag exudes luxury and sophistication. With ample space and elegant design, it is perfect for both day and evening events.",
    "Score": 0.571470577188964
}
{
    "id": "Product-25",
    "product_name": "Sapphire Luxe Handbag",
    "description": "This luxurious handbag is crafted from premium leather, with a spacious interior and elegant gold hardware. Perfect for both casual outings and special occasions.",
    "Score": 0.5698408357266865
}
{
    "id": "Product-20",
    "product_name": "Luxury Leather Handbag",
    "description": "This luxurious handbag is made from genuine

## A Simple Text search with `FullTextContains`

In [None]:
results = container.query_items(
        query= f'''
        SELECT TOP 5 c.product_id, c.product_name, c.description
        FROM c
        WHERE FullTextContainsAny(c.description,'{"', '".join(full_text)}')
        ''',
          enable_cross_partition_query=True, populate_query_metrics=True)

for r in list(results):
    print(json.dumps(r,indent=4))

## Search and order by BM25 with `FullTextScore`

In [None]:
results = container.query_items(
        query= f'''
        SELECT TOP 5 c.product_id, c.product_name, c.description
        FROM c
        ORDER BY RANK FullTextScore(c.description,'{"', '".join(full_text)}')
        ''',
          enable_cross_partition_query=True, populate_query_metrics=True)

for r in list(results):
    print(json.dumps(r,indent=4))

In [None]:
results = container.query_items(
        query= f'''
        SELECT TOP 5 c.product_id, c.product_name, c.description, VectorDistance(c.embedding, {emb}) as SimilarityScore
        FROM c
        ORDER BY RANK RRF(FullTextScore(c.description,'{"', '".join(full_text)}'),VectorDistance(c.embedding, {emb}))
        ''',
          enable_cross_partition_query=True, populate_query_metrics=True)

for r in list(results):
    print(json.dumps(r,indent=4))

# Search using async.io library for concurrency

In [None]:
async def fetch_query_results(container, query, pk_range):
    return [item async for item in container.query_items(query=query,parameters=[{'name' : '@query_vector', 'value' : emb}], partition_key_range_id=pk_range['id'], populate_query_metrics=True)]

In [None]:
async def concurrent_query_sample(query):
    async with AsyncClient(url=cosmos_endpoint, credential=credential) as client:
        async_db = client.get_database_client(cosmos_database)
        async_container = async_db.get_container_client(cosmos_container)
        # Fetch the partition key range ids for the container
        pk_ranges = [pk_range async for pk_range in  async_container.client_connection._ReadPartitionKeyRanges(async_container.container_link)]
        # Queue up queries on a per-partition basis
        tasks = [fetch_query_results(async_container, query, pk_range) for pk_range in pk_ranges]
        # Run them all and gather the results
        results = await asyncio.gather(*tasks)
        return results, client.client_connection.last_response_headers

In [None]:
query = '''
        SELECT TOP 10 c.id, c.product_name, c.description, VectorDistance(c.embedding, @query_vector) as Score
        FROM c
        ORDER BY VectorDistance(c.embedding, @query_vector)
    '''

In [None]:
# Run the query across all partitions
results, headers = await concurrent_query_sample(query)

# Merge and sort the results
flattened_results = [item for sublist in results for item in sublist]
sorted_results = sorted(flattened_results, key=lambda x: x["Score"], reverse=True)
print(json.dumps(sorted_results,indent=4))