# Build a RAG chatbot with the Azure Cosmos DB NoSQL API

In this sample, we'll demonstrate how to build a RAG pattern application using a subset of the Movie Lens dataset. This sample will use: 
- **Azure Cosmos DB for NoSQL** with the Python SDK to perform vector search for RAG, store and retrieve chat history, and store the vectors of the chat history to use as a semantic cache. 
- **Azure OpenAI** to generate embeddings and completions.
- **Gradio**, an open source Python package, to create a simple user interface to allow users to type in questions and display responses generated by a GPT model or served from the cache. The responses will also display an elapsed time so you can see the impact caching has on performance versus generating a response.

References:
- [Azure Cosmos DB for NoSQL Python Quickstart](https://learn.microsoft.com/azure/cosmos-db/nosql/quickstart-python?pivots=devcontainer-codespace)
- [Azure Cosmos DB for NoSQL Vector Search](https://learn.microsoft.com/azure/cosmos-db/nosql/vector-search)


# Create the conda environment

Start a terminal from the CosmosDBLab2 folder and run the following commands **sequentially**. Respond `y` when asked
:

```
conda create --name CosmosDB_env python=3.10

conda activate CosmosDB_env

pip install ipykernel

python -m ipykernel install --user --name CosmosDB_env --display-name "CosmosDB_env"

pip install -r requirements.txt
```

In this notebook, select the CosmosDB_env kernel. You may need to close and reopen this notebook to select it

# Set up a Cosmos DB account  

Create a Cosmos DB account as follows:  
- **Subscription**: *Select your subscription*  
- **Resource Group**: *Select/Create a resource group*  
- **Account name**: *Enter a unique name. The name can contain only lowercase letters, numbers and the "-" character*  
- **Availability Zones**: *Disable*
- **Location**: *Select a location*  
- **Capacity mode**: *Provisioned throughput*  
- **Apply Free Tier Discount**: *Do Not Apply*  
- **Limit total account throughput**: *Leave unchecked*  

Once the account is deployed, enable Vector Search for NoSQL API. This is a preview feature. To enable it, go to **Settings**, **Features** and enable the feature. This will take a couple of minutes and there will be a notification when it is enabled. 



# Set up Azure OpenAI resource and deploy models
If you don't already have an Azure OpenAI resource, create one.  
Then deploy  
- An embeddings model. **This has to be a `text-3-embedding-large` model**
- A completions model, `gpt-35-turbo
`

# Initialize Your Client Connection
A sample `env_file.env` file is in this folder. Populate it with the appropriate credentials for Azure Cosmos DB and Azure OpenAI where marked with <>. Do not change any of the other values.  
Click the Save button.

In [None]:
# Import the required libraries
import time
import json
import uuid
import urllib 
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import PartitionKey, exceptions
from time import sleep
import gradio as gr
from azure.cosmos import CosmosClient

# Load configuration
env_name = "env_file.env"
config = dotenv_values(env_name)

cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_collection_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_collection_name']

# Create the Azure Cosmos DB for NoSQL client
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)

openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']

# Movies file url
# storage_file_url = config['storage_file_url']

# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)

#  Create a database and containers with vector policies

Next, we will create the Cosmos DB Database, as well as the containers for movie data and the semantic cache.  

When creating the containers, we specify the vector embedding and indexing policies, so we will create these first.

In [None]:
db = cosmos_client.create_database_if_not_exists(cosmos_database)

# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
    "vectorEmbeddings": [ 
        { 
            "path":"/" + cosmos_vector_property,
            "dataType":"float32",
            "distanceFunction":"dotproduct",
            "dimensions":openai_embeddings_dimensions
        }, 
    ]
}

# Create the vector index policy to specify vector details
indexing_policy = {
    "vectorIndexes": [ 
        {
            "path": "/"+cosmos_vector_property, 
            "type": "quantizedFlat" 
        }
    ]
} 

# Create the data collection with vector index (note: this creates a container with 10000 RUs to allow fast data load)
try:
    movies_container = db.create_container_if_not_exists(id=cosmos_collection, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=10000) 
    print('Container with id \'{0}\' created'.format(movies_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise 

# Create the cache collection with vector index
try:
    cache_container = db.create_container_if_not_exists(id=cosmos_cache, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=1000) 
    print('Container with id \'{0}\' created'.format(cache_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise

# Generate embeddings from Azure OpenAI

This function is used to vectorize the user input for the vector search. It uses the text-3-embedding-large model with `dimensions = 256`

In [None]:
from tenacity import retry, stop_after_attempt, wait_random_exponential
@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
    
    response = openai_client.embeddings.create(
        input=text,
        model=openai_embeddings_deployment,
        dimensions=openai_embeddings_dimensions
    )
    
    embeddings = response.model_dump()
    return embeddings['data'][0]['embedding']

# Load data from the json file
Extract the MovieLens dataset from the zip file in the DataSet folder. The embeddings have already been generated in this dataset.

In [None]:
# Unzip the data file
with zipfile.ZipFile("DataSet/MovieLens-4489-256D.zip", 'r') as zip_ref: 
#    zip_ref.extractall("/Data")
    zip_ref.extractall("DataSet")
zip_ref.close()
# Load the data file
data =[]

with open('DataSet/MovieLens-4489-256D.json', 'r') as d:
    data = json.load(d)
# View the number of documents in the data (4489)
len(data) 

# Store data in Azure Cosmos DB. 
Insert data into Azure Cosmos DB for NoSQL.
If you encounter throttling (429 error), run the cell again. 

In [None]:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def insert_data():
    start_time = time.time()  # Record the start time
    
    counter = 0
    tasks = []
    max_concurrency = 5  # Adjust this value to control the level of concurrency
    semaphore = asyncio.Semaphore(max_concurrency)
    print("Starting doc load, please wait...")
    
    def upsert_item_sync(obj):
        movies_container.upsert_item(body=obj)
    
    async def upsert_object(obj):
        nonlocal counter
        async with semaphore:
            await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
            # Progress reporting
            counter += 1
            if counter % 100 == 0:
                print(f"Sent {counter} documents for insertion into collection.")
    
    for obj in data:
        tasks.append(asyncio.create_task(upsert_object(obj)))
    
    # Run all upsert tasks concurrently within the limits set by the semaphore
    await asyncio.gather(*tasks)
    
    end_time = time.time()  # Record the end time
    duration = end_time - start_time  # Calculate the duration
    print(f"All {counter} documents inserted!")
    print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")

# Run the async function
await insert_data()
 

Next, we will define several functions for our application to
- Perform vector search
- Get recent chat history
- Generate chat completions
- Cache generated responses

# Perform Vector Search

This defines a function for performing a vector search over the movies data and chat cache collections. The function takes a collection reference, array of vector embeddings, and optional similarity score to filter for top matches and number of results to return to filter further.

In [None]:
# Perform a vector search on the Cosmos DB container
def vector_search(container, vectors, similarity_score=0.02, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results  c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore 
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score}
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    # Extract the necessary information from the results
    formatted_results = []
    for result in results:
        score = result.pop('SimilarityScore')
        formatted_result = {
            'SimilarityScore': score,
            'document': result
        }
        formatted_results.append(formatted_result)

    # #print(formatted_results)
    metrics_header = dict(container.client_connection.last_response_headers)
    #print(json.dumps(metrics_header,indent=4))
    return formatted_results

# Get recent chat history

This function provides conversational context to the LLM, allowing it to better have a conversation with the user.

In [None]:
def get_chat_history(container, completions=3):
    results = container.query_items(
        query= '''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ], enable_cross_partition_query=True)
    results = list(results)
    return results

# Chat Completion Functions
Define the functions to handle the chat completion process, including caching responses. 

- `generate_completion` - this function assembles all of the required data as a payload to send to a GPT model to generate a completion

- `chat_completion` - function defines the pipeline for our RAG Pattern application. When user submits a question, the cache is consulted first for an exact match. If no match then a vector search is made, chat history gathered, the LLM generates a response, which is then cached before returning to the user.

In [None]:
def generate_completion(user_prompt, vector_search_results, chat_history):
    
    system_prompt = '''
    You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
    You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
        - Only answer questions related to the information provided below. 
    '''

    # Create a list of messages as a payload to send to the OpenAI Completions API

    # system prompt
    messages = [{'role': 'system', 'content': system_prompt}]

    #chat history
    for chat in chat_history:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
    
    #user prompt
    messages.append({'role': 'user', 'content': user_prompt})

    #vector search results
    for result in vector_search_results:
        messages.append({'role': 'system', 'content': json.dumps(result['document'])})

    print("Messages going to openai", messages)
    # Create the completion
    response = openai_client.chat.completions.create(
        model = openai_completions_deployment,
        messages = messages,
        temperature = 0.1
    )    
    return response.model_dump()

def chat_completion(cache_container, movies_container, user_input):
    print("starting completion")
    # Generate embeddings from the user input
    user_embeddings = generate_embeddings(user_input)
    # Query the chat history cache first to see if this question has been asked before
    cache_results = get_cache(container = cache_container, vectors = user_embeddings, similarity_score=0.99, num_results=1)
    if len(cache_results) > 0:
        print("Cached Result\n")
        return cache_results[0]['completion'], True
        
    else:
        #perform vector search on the movie collection
        print("New result\n")
        search_results = vector_search(movies_container, user_embeddings)

        print("Getting Chat History\n")
        #chat history
        chat_history = get_chat_history(cache_container, 3)
        #generate the completion
        print("Generating completions \n")
        completions_results = generate_completion(user_input, search_results, chat_history)

        print("Caching response \n")
        #cache the response
        cache_response(cache_container, user_input, user_embeddings, completions_results)

        print("\n")
        # Return the generated LLM completion
        return completions_results['choices'][0]['message']['content'], False

# Cache Generated Responses
Save the user prompts and generated completions to the cache for faster future responses.

In [None]:
def cache_response(container, user_prompt, prompt_vectors, response):
    # Create a dictionary representing the chat document
    chat_document = {
        'id':  str(uuid.uuid4()),  
        'prompt': user_prompt,
        'completion': response['choices'][0]['message']['content'],
        'completionTokens': str(response['usage']['completion_tokens']),
        'promptTokens': str(response['usage']['prompt_tokens']),
        'totalTokens': str(response['usage']['total_tokens']),
        'model': response['model'],
        'vector': prompt_vectors
    }
    # Insert the chat document into the Cosmos DB container
    container.create_item(body=chat_document)
    print("item inserted into cache.", chat_document)

In [None]:
# Perform a vector search on the Cosmos DB container
def get_cache(container, vectors, similarity_score=0.0, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    #print(results)
    return results

# Create a Simple Chat Bot UX in Gradio
Build a user interface using Gradio for interacting with the AI application by clicking run on the cell.  
Then click on the public URL link to chat with the bot about movies
Ask questions such as "Who are the characters in Toy Story?"  
To shut down the bot, interrupt the kernel and run the final cell.

In [None]:
chat_history = []
with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="Cosmic Movie Assistant")
    
    msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
    clear = gr.Button("Clear")

    def user(user_message, chat_history):
        # Create a timer to measure the time it takes to complete the request
        start_time = time.time()
        # Get LLM completion
        response_payload, cached = chat_completion(cache_container, movies_container, user_message)
        # Stop the timer
        end_time = time.time()
        elapsed_time = round((end_time - start_time) * 1000, 2)
        response = response_payload
        print(response_payload)
        # Append user message and response to chat history
        details = f"\n (Time: {elapsed_time}ms)"
        if cached:
            details += " (Cached)"
        chat_history.append([user_message, response_payload + details])
        
        return gr.update(value=""), chat_history
    
    msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)

    clear.click(lambda: None, None, chatbot, queue=False)

# Launch the Gradio interface
demo.launch(debug=True,share=True)

In [None]:
# Be sure to run this cell to close or restart the Gradio demo
demo.close()

# Cleaning up - **IMPORTANT**
Delete the Cosmos DB resource.  
If you would like to keep it, **reduce the throughput**
 on both containers to 400 RU/s (under Scale and Settings) 