# RAG With CosmosDB

In this sample, we'll demonstrate how to build a RAG Pattern application using a subset of the Movie Lens dataset. This sample will leverage the Python SDK for Azure Cosmos DB for NoSQL to perform vector search for RAG, store and retrieve chat history, and store the vectors of the chat history to use as a semantic cache. Azure OpenAI to generate embeddings and LLM completions.

In [None]:
# Install requierements

%pip install -r requierements.txt

Load environments variables

In [21]:
from dotenv import dotenv_values

env_name = ".env" # following sample_env.env template change to your own .env file name
config = dotenv_values(env_name)

OPENAI_API_KEY = config['openai_key']
OPENAI_API_ENDPOINT = config['openai_endpoint']
OPENAI_API_VERSION = config['openai_api_version'] # at the time of authoring, the api version is 2024-02-01
COMPLETIONS_MODEL_DEPLOYMENT_NAME = config['openai_completions_deployment']
EMBEDDING_MODEL_DEPLOYMENT_NAME = config['openai_embeddings_model']
COSMOSDB_NOSQL_ACCOUNT_KEY = config['cosmos_key']
COSMOSDB_NOSQL_ACCOUNT_ENDPOINT = config['cosmos_uri']

# Instantiate the Azure Open AI client

In [5]:
from openai import AzureOpenAI

AOAI_client = AzureOpenAI(api_key=OPENAI_API_KEY, azure_endpoint=OPENAI_API_ENDPOINT, api_version=OPENAI_API_VERSION,)

# Generating Embedding

We'll use the deployed embeddings model to generate the embeddings

In [6]:
import time

def generate_embeddings(text):
    '''
    Generate embeddings from string of text.
    This will be used to vectorize data and user input for interactions with Azure OpenAI.
    '''
    response = AOAI_client.embeddings.create(input=text, model=EMBEDDING_MODEL_DEPLOYMENT_NAME)
    embeddings =response.model_dump()
    time.sleep(0.5) 
    return embeddings['data'][0]['embedding']

# Load the data with embeddings or generate embeddings

We have a sample data file with embeddings but you can generate the embeddings afresh before uploading the data.

In [11]:
import json

# Load text-sample_w_embeddings.json which has embeddings pre-computed
data_file = open(file="./text-sample.json", mode="r") 

# OR Load text-sample.json data file. Embeddings will need to be generated using the function below.
# data_file = open(file="../../DataSet/AzureServices/text-sample.json", mode="r")

data = json.load(data_file)
data_file.close()

In [12]:
# Take a peek at one data item
print(json.dumps(data[0], indent=2))

{
  "id": "1",
  "title": "Azure App Service",
  "content": "Azure App Service is a fully managed platform for building, deploying, and scaling web apps. You can host web apps, mobile app backends, and RESTful APIs. It supports a variety of programming languages and frameworks, such as .NET, Java, Node.js, Python, and PHP. The service offers built-in auto-scaling and load balancing capabilities. It also provides integration with other Azure services, such as Azure DevOps, GitHub, and Bitbucket.",
  "category": "Web"
}


In [None]:
# Generate embeddings for title and content fields
n = 0
for item in data:
    n+=1
    item['id'] = str(n)
    title = item['title']
    content = item['content']
    title_embeddings = generate_embeddings(title)
    content_embeddings = generate_embeddings(content)
    item['titleVector'] = title_embeddings
    item['contentVector'] = content_embeddings
    item['@search.action'] = 'upload'
    print("Creating embeddings for item:", n, "/" ,len(data), end='\r')

In [14]:
# Save embeddings to sample_text_w_embeddings.json file
with open("./text-sample_w_embeddings.json", "w") as f:
    json.dump(data, f)

# Create cosmosdb client

In [22]:
from azure.core.exceptions import AzureError
from azure.core.credentials import AzureKeyCredential

#Cosmos DB imports
from azure.cosmos import CosmosClient
from azure.cosmos.aio import CosmosClient as CosmosAsyncClient
from azure.cosmos import PartitionKey, exceptions

cosmos_client = CosmosClient(url=COSMOSDB_NOSQL_ACCOUNT_ENDPOINT, credential=COSMOSDB_NOSQL_ACCOUNT_KEY)

# Create a new database or use existing one

In [23]:
#create database
DATABASE_NAME = "vector-nosql-db"
db= cosmos_client.create_database_if_not_exists(
    id=DATABASE_NAME
)
properties = db.read()
print(json.dumps(properties))

{"id": "vector-nosql-db", "_rid": "WZhzAA==", "_self": "dbs/WZhzAA==/", "_etag": "\"00006955-0000-0200-0000-678675650000\"", "_colls": "colls/", "_users": "users/", "_ts": 1736865125}


# Author the vector embedding policy

Vector embedding policy defines the necessary information for the vector search queries as detailed below:

<ul>
  <li>“path”: what properties contain vectors</li>
  <li>“datatype”: What type are the vector’s elements (default Float32)</li>
  <li>“dimensions”: The length of each vector in the path (default 1536)</li>
  <li>“distanceFunction”: The metric used to compute distance/similarity (default Cosine)</li>
</ul>

In [24]:
vector_embedding_policy = {
    "vectorEmbeddings": [
        {
            "path":"/titleVector",
            "dataType":"float32",
            "distanceFunction":"dotproduct",
            "dimensions":1536
        },
        {
            "path":"/contentVector",
            "dataType":"float32",
            "distanceFunction":"cosine",
            "dimensions":1536
        }
    ]
}

In [25]:
indexing_policy = {
    "includedPaths": [
        {
            "path": "/*"
        }
    ],
    "excludedPaths": [
        {
            "path": "/\"_etag\"/?"
        },
        {
            "path": "/titleVector/*"
        },
        {
            "path": "/contentVector/*"
        }
    ],
    "vectorIndexes": [
        {"path": "/titleVector",
         "type": "quantizedFlat"
        },
        {"path": "/contentVector",
         "type": "quantizedFlat"
        }
    ]
}

# Create container with the embedding and indexing policy

In [None]:
CONTAINER_NAME = "vector-nosql-cont"
try:    
    container = db.create_container_if_not_exists(
                    id=CONTAINER_NAME,
                    partition_key=PartitionKey(path='/id', kind='Hash'),
                    indexing_policy=indexing_policy,
                    vector_embedding_policy=vector_embedding_policy)

    print('Container with id \'{0}\' created'.format(id))

except exceptions.CosmosResourceExistsError:
    print('A container with id \'{0}\' already exists'.format(id))

In [30]:
container = db.get_container_client(CONTAINER_NAME)

# Upload data to the container

Azure Cosmos DB Python SDK does not currently support bulk inserts so we'll have to insert the items sequentially

In [None]:
with open('./text-sample_w_embeddings.json') as f:
   data = json.load(f)

container_client = db.get_container_client(CONTAINER_NAME)

for item in data:
    print("writing item ", item['id'])
    container_client.upsert_item(item)

# Vector search in Azure Cosmos DB for NoSQL

In [32]:
# Simple function to assist with vector search
def vector_search(query, num_results=5):
    query_embedding = generate_embeddings(query)
    results = container.query_items(
            query='SELECT TOP @num_results c.content, c.title, c.category, VectorDistance(c.contentVector,@embedding) AS SimilarityScore  FROM c ORDER BY VectorDistance(c.contentVector,@embedding)',
            parameters=[
                {"name": "@embedding", "value": query_embedding}, 
                {"name": "@num_results", "value": num_results} 
            ],
            enable_cross_partition_query=True)
    #correct this
    return results

In [None]:
query = "What are some NoSQL databases in Azure?"#"What are the services for running ML models?"
results = vector_search(query)

for result in results: 
    #print(result)
    print(f"Similarity Score: {result['SimilarityScore']}")  
    print(f"Title: {result['title']}")  
    print(f"Content: {result['content']}")  
    print(f"Category: {result['category']}\n") 

# Q&A over the data with GPT

In [37]:
def generate_completion(vector_search_results, user_prompt):
    system_prompt = '''
    You are an intelligent assistant for Microsoft Azure services.
    You are designed to provide helpful answers to user questions about Azure services given the information about to be provided.
        - Only answer questions related to the information provided below, provide at least 3 clear suggestions in a list format.
        - Write two lines of whitespace between each answer in the list.
        - If you're unsure of an answer, you can say ""I don't know"" or ""I'm not sure"" and recommend users search themselves."
        - Only provide answers that have products that are part of Microsoft Azure and part of these following prompts.
    '''

    messages=[{"role": "system", "content": system_prompt}]
    for item in vector_search_results:
        messages.append({"role": "system", "content": item['content']})
    messages.append({"role": "user", "content": user_prompt})
    response = AOAI_client.chat.completions.create(model=COMPLETIONS_MODEL_DEPLOYMENT_NAME, messages=messages,temperature=0)
    
    return response.model_dump()

In [None]:
user_input = "What are some NoSQL databases in Azure?"

search_results = vector_search(user_input)
completions_results = generate_completion(search_results, user_input)
print("\n")
print(completions_results['choices'][0]['message']['content'])    

In [None]:
user_input = "What are some benefit of Neo4j"

search_results = vector_search(user_input)
completions_results = generate_completion(search_results, user_input)
print("\n")
print(completions_results['choices'][0]['message']['content'])    