# Step 1: Install libraries


In [20]:
! pip install -qU pymongo datasets langchain tiktoken sentence_transformers tqdm boto3

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.1/139.1 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.6/12.6 MB[0m [31m45.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.7/82.7 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h

# Step 2: Setup prerequisites

Replace:

- `<MONGODB_URI>` with your **MongoDB connection string**


In [2]:
import os
from pymongo import MongoClient

In [3]:
# Retain the quotes ("") when pasting the URI
MONGODB_URI = os.environ["MONGODB_URI"]
# Initialize a MongoDB Python client
mongodb_client = MongoClient(MONGODB_URI, appname="devrel.workshop.rag")
# Check the connection to the server
mongodb_client.admin.command("ping")

{'ok': 1}

# Step 3: Load the dataset


In [4]:
import pandas as pd
from datasets import load_dataset

In [6]:
data = load_dataset("si3mshady/aws_whitepapers", split="train", streaming=True)
data_head = data.take(20)
docs = pd.DataFrame(data_head).to_dict("records")

In [7]:
# Check the number of documents in the dataset
len(docs)

20

In [8]:
# Preview a document
docs[0]

{'Title': 'Amazon_Aurora_Migration_Handbook',
 'Category': 'General'}

# Step 4: Chunk up the data


In [9]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import Dict, List

In [10]:
# Separators to split on
separators = ["\n\n", "\n", " ", "", "#", "##", "###"]

📚 https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/split_by_token/#tiktoken


In [11]:
# Use the `RecursiveCharacterTextSplitter` text splitter with the `cl100k_base` encoding
# For text data, you typically want to keep 1-2 paragraphs (~200 tokens) in a single chunk
# Chunk overlap of 15-20% of the chunk size is recommended
# Pass the `separators` list above as an argument called `separators`
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    encoding_name="cl100k_base", separators=separators, chunk_size=200, chunk_overlap=30
)

📚 https://api.python.langchain.com/en/latest/character/langchain_text_splitters.character.RecursiveCharacterTextSplitter.html

In [12]:
def get_chunks(doc: Dict, text_field: str) -> List[Dict]:
    """
    Chunk up a document.

    Args:
        doc (Dict): Parent document to generate chunks from.
        text_field (str): Text field to chunk.

    Returns:
        List[Dict]: List of chunked documents.
    """
    # Extract the field to chunk from `doc`
    text = doc[text_field]
    # Split `text` using the appropriate method of the `RecursiveCharacterTextSplitter` class
    # NOTE: `text` is a string
    chunks = text_splitter.split_text(text)

    # Iterate through `chunks` and for each chunk:
    # 1. Create a shallow copy of `doc`, call it `temp`
    # 2. Set the `text_field` field in `temp` to the content of the chunk
    # 3. Append `temp` to `chunked_data`
    chunked_data = []
    for chunk in chunks:
       temp = doc.copy()
       temp[text_field]=chunk
       chunked_data.append(temp)

    return chunked_data

In [13]:
split_docs = []

In [14]:
# Iterate through `docs`, use the `get_chunks` function to chunk up the documents based on the "body" field, and add the list of chunked documents to `split_docs` initialized above.
split_docs = []
for doc in docs:
    chunks = get_chunks(doc, "Content")
    split_docs.extend(chunks)

In [15]:
# Check that the length of the list of chunked documents is greater than the length of `docs`
len(split_docs)

970

In [16]:
# Preview one of the items in split_docs- ensure that it is a Python dictionary
split_docs[0]

{'Title': 'Amazon_Aurora_Migration_Handbook',
 'Content': 'This paper has been archived For the latest Ama zon Aurora Migration content refer to: https://d1awsstaticcom/whitepapers/RDS/Migrating your databases to Amazon Aurorapdf Amazon Web Services Amazon Aurora Migration Handbook 1 Amazon Aurora Migration Handbook July 2020 This paper has been archived For the latest Ama zon Aurora Migration content refer to: https://d1awsstaticcom/whitepapers/RDS/Migrating your databases to Amazon Aurorapdf Amazon Web Services Amazon Aurora Migration Handbook 2 Notices Customers are responsible for making their own independent assessment of the information in this document This document: (a) is for informational purposes only (b) represents current AWS product offerings and practices which are subject to change withou t notice and (c) does not create any commitments or assurances from AWS and its affiliates suppliers or licensors AWS products or services are provided “as is” without warranties repre

# Step 5: Generate embeddings


In [17]:
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

In [33]:
# Load the `gte-small` model using the Sentence Transformers library

import json
import boto3
from botocore.config import Config

my_config = Config(
    region_name = 'us-east-1',
    signature_version = 'v4',
    retries = {
        'max_attempts': 10,
        'mode': 'standard'
    }
)

class TitanEmbeddings(object):
    accept = "application/json"
    content_type = "application/json"

    def __init__(self, model_id="amazon.titan-embed-text-v2:0"):
        self.bedrock = boto3.client(
            service_name='bedrock-runtime',
            config=my_config,
            aws_access_key_id=os.environ["AWS_ACCESS_KEY"],
            aws_secret_access_key=os.environ["AWS_SECRET"],
        )
        self.model_id = model_id
    def __call__(self, text, dimensions, normalize=True):
        """
        Returns Titan Embeddings
        Args:
            text (str): text to embed
            dimensions (int): Number of output dimensions.
            normalize (bool): Whether to return the normalized embedding or not.
        Return:
            List[float]: Embedding

        """
        body = json.dumps({
            "inputText": text,
            "dimensions": dimensions,
            "normalize": normalize
        })
        response = self.bedrock.invoke_model(
            body=body, modelId=self.model_id, accept=self.accept, contentType=self.content_type
        )
        response_body = json.loads(response.get('body').read())
        return response_body['embedding']

embedding_model = TitanEmbeddings(model_id="amazon.titan-embed-text-v2:0")

📚 https://huggingface.co/thenlper/gte-small

In [36]:
# Define a function that takes a piece of text (`text`) as input, embeds it using the `embedding_model` instantiated above and returns the embedding as a list
# An array can be converted to a list using the `tolist()` method
def get_embedding(text: str) -> List[float]:
    """
    Generate the embedding for a piece of text.

    Args:
        text (str): Text to embed.

    Returns:
        List[float]: Embedding of the text as a list.
    """
    dimensions = 1024
    normalize = True
    embedding = embedding_model(text, dimensions, normalize)

    return embedding

In [37]:
embedded_docs = []

In [38]:
# Add an `embedding` field to each dictionary in `split_docs`
# The `embedding` field should correspond to the embedding of the value of the `body` field
# Use the `get_embedding` function defined above to generate the embedding
# Append the updated dictionaries to `embedded_docs` initialized above.
for doc in tqdm(split_docs):
    doc["embedding"] = get_embedding(doc["Content"])

    embedded_docs.append(doc)

100%|██████████| 970/970 [04:07<00:00,  3.93it/s]


In [39]:
# Check that the length of `embedded_docs` is the same as that of `split_docs`
len(embedded_docs)

970

# Step 6: Ingest data into MongoDB


In [40]:
# Name of the database -- Change if needed or leave as is
DB_NAME = "mongodb_rag_lab"
# Name of the collection -- Change if needed or leave as is
COLLECTION_NAME = "knowledge_base"
# Name of the vector search index -- Change if needed or leave as is
ATLAS_VECTOR_SEARCH_INDEX_NAME = "vector_index"

📚 https://pymongo.readthedocs.io/en/stable/tutorial.html#getting-a-database

📚 https://pymongo.readthedocs.io/en/stable/tutorial.html#getting-a-collection

In [41]:
# Connect to the collection defined above using the `mongodb_client` defined in Step 2
collection = mongodb_client[DB_NAME][COLLECTION_NAME]

In [42]:
# Bulk delete all existing records from the collection defined above
collection.delete_many({})

DeleteResult({'n': 0, 'electionId': ObjectId('7fffffff000000000000001f'), 'opTime': {'ts': Timestamp(1727162815, 10), 't': 31}, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1727162815, 10), 'signature': {'hash': b'\xdd\x82\x1f\xc3\x91s\xa9\xe5\xf7\xde4\xcd\xc2N\xc9g\x1by\xe4\xdc', 'keyId': 7384910504117927940}}, 'operationTime': Timestamp(1727162815, 10)}, acknowledged=True)

📚 https://pymongo.readthedocs.io/en/stable/examples/bulk.html#bulk-insert


In [43]:
# Bulk insert `embedded_docs` into the collection defined above -- should be a one-liner
collection.insert_many(embedded_docs)

print("Data ingestion into MongoDB completed")

Data ingestion into MongoDB completed


# Step 7: Create a vector search index

Follow the instructions in the documentation to create a Vector Search index in the Atlas UI.


# Step 8: Perform semantic search on your data


### Define a vector search function

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#fields

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples (Refer to the "Basic Example")


In [49]:
# Define a function to retrieve relevant documents for a user query using vector search
def vector_search(user_query: str) -> List[Dict]:
    """
    Retrieve relevant documents for a user query using vector search.

    Args:
    user_query (str): The user's query string.

    Returns:
    list: A list of matching documents.
    """

    # Generate embedding for the `user_query` using the `get_embedding` function defined in Step 5
    query_embedding = get_embedding(user_query)

    # Define an aggregation pipeline consisting of a $vectorSearch stage, followed by a $project stage
    # Set the number of candidates to 150 and only return the top 5 documents from the vector search
    # In the $project stage, exclude the `_id` field and include only the `body` field and `vectorSearchScore`
    # NOTE: Use variables defined previously for the `index`, `queryVector` and `path` fields in the $vectorSearch stage
    pipeline = [
      {
          "$vectorSearch": {
              "index": ATLAS_VECTOR_SEARCH_INDEX_NAME,
              "queryVector": query_embedding,
              "path": "embedding",
              "numCandidates": 150,
              "limit": 5,
          }
      },
      {
          "$project": {
              "_id": 0,
              "Content": 1,
              "score": {"$meta": "vectorSearchScore"}
          }
      }
  ]

    # Execute the aggregation `pipeline` and store the results in `results`
    results = collection.aggregate(pipeline)

    return list(results)

### Run vector search queries


In [50]:
vector_search("How to migrate Aurora to other platform?")

[{'Content': 'For the latest Ama zon Aurora Migration content refer to: https://d1awsstaticcom/whitepapers/RDS/Migrating your databases to Amazon Aurorapdf Amazon Web Services Amazon Aurora Migration Handbook 31 Notes For the sake of simplicity this scenario assumes the following: 1 Migration commands are executed from a client instance running a Linux operating system 2 The source server is a self managed MySQL database (eg running on Amazon EC2 or on premises) that is configured to allow connections from the client instance 3 The target Aurora DB cluster already exists and is configured to allow connections from the client instance If you don’t yet have an Aurora DB cluster review the stepbystep cluster launch instructions in the Amazon RDS User Guide 17 4 Export from the source database is performed using a privileged super  user MySQL ac count For simplicity this scenario assumes that the user holds all permissions available in MySQL 5 Import into Amazon Aurora is performed using t

In [51]:
vector_search("What industry or development can AWS cloud benefit?")

[{'Content': 'a concentration of intellectual capital and innovative businesses and startups can be a strong indicator of economic development Cloud technology can help give new businesses a boost in their forecasting demand generation and innovation when bringing their products or services to market AWS accelerate s this process through AWS Activate  a program designed to provide startups with resourc es and credits to get started with the cloud; through access to tools like Amazon LightSail  which provides technology like virtual private servers to enterprises of all sizes for the cost of a cup of ArchivedAmazon Web Services  Inc – 5 Ways t he Cloud Can Drive Economi c Development Page 7 coffee ; and by encouraging public private partnerships and small business linkages namely through the strength of the AWS Partner Network (APN)  Additionally AWS CloudStart formed to encourage the growth of SMEs and economic development organizations by providing resources to educate train and help 

# 🦹‍♀️ Combine pre-filtering with vector search

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/#about-the-filter-type

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples (Refer to the "Filter Example")


# Step 9: Build the RAG application


### Instantiate a chat model


### Define a function to create the chat prompt

In [53]:
# Define a function to create the user prompt for our RAG application
def create_prompt(user_query: str) -> str:
    """
    Create a chat prompt that includes the user query and retrieved context.

    Args:
        user_query (str): The user's query string.

    Returns:
        str: The chat prompt string.
    """
    # Retrieve the most relevant documents for the `user_query` using the `vector_search` function defined in Step 8
    context = vector_search(user_query)

    # Join the retrieved documents into a single string, where each document is separated by two new lines ("\n\n")
    context = "\n\n".join([doc.get('Content') for doc in context])
    # Prompt consisting of the question and relevant context to answer it
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

### Define a function to answer user queries

📚 https://docs.fireworks.ai/guides/querying-text-models#chat-completions-api

In [56]:
# Define a function to answer user queries using Fireworks' Chat Completion API
def generate_answer(user_query: str) -> None:
    """
    Generate an answer to the user query.

    Args:
        user_query (str): The user's query string.
    """
    client = boto3.client(
            service_name='bedrock-runtime',
            config=my_config,
            aws_access_key_id=os.environ["AWS_ACCESS_KEY"],
            aws_secret_access_key=os.environ["AWS_SECRET"],
    )

    # Set the model ID, e.g., Titan Text Premier.
    model_id = "amazon.titan-text-premier-v1:0"

    # Use the `create_prompt` function above to create a chat prompt
    prompt = create_prompt(user_query)

    conversation = [
          {
              "role": "user",
              "content": [{"text": prompt}],
          }
      ]

    # Use the `prompt` created above to populate the `content` field in the chat message
    response = client.converse(
        modelId=model_id,
        messages=conversation,
        inferenceConfig={"topP":0.9},
        additionalModelRequestFields={}
    )

    # Print the final answer
    print(response["output"]["message"]["content"][0]["text"])

### Query the RAG application


In [57]:
generate_answer("What industry or development can AWS cloud benefit?")

Cloud technology can help give new businesses a boost in their forecasting demand generation and innovation when bringing their products or services to market


In [58]:
generate_answer("What did I just ask you?")

I DON'T KNOW


# 🦹‍♀️ Re-rank retrieved results


In [59]:
from sentence_transformers import CrossEncoder

In [60]:
rerank_model = CrossEncoder("mixedbread-ai/mxbai-rerank-xsmall-v1")

config.json:   0%|          | 0.00/968 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/142M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.45k [00:00<?, ?B/s]

spm.model:   0%|          | 0.00/2.46M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/8.65M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/23.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/970 [00:00<?, ?B/s]

📚 https://huggingface.co/mixedbread-ai/mxbai-rerank-xsmall-v1

In [61]:
# Add a re-ranking step to the following function
def create_prompt(user_query: str) -> str:
    """
    Create a chat prompt that includes the user query and retrieved context.

    Args:
        user_query (str): The user's query string.

    Returns:
        str: The chat prompt string.
    """
    # Retrieve the most relevant documents for the `user_query` using the `vector_search` function defined in Step 8
    context = vector_search(user_query)
    # Extract the "body" field from each document in `context`
    documents = [d.get("Content") for d in context]
    # Use the `rerank_model` instantiated above to re-rank `documents`
    # Set the `top_k` argument to 5
    reranked_documents = rerank_model.rank(
        user_query, documents, return_documents=True, top_k=5
    )
    # Join the re-ranked documents into a single string, where each document is separated by two new lines ("\n\n")
    context = "\n\n".join([d.get("text", "") for d in reranked_documents])
    # Prompt consisting of the question and relevant context to answer it
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

In [63]:
# Note the impact of re-ranking on the generated answer
generate_answer("What industry or development can AWS cloud benefit?")

innovative businesses and startups


# Step 10: Add memory to the RAG application


In [65]:
from datetime import datetime

In [66]:
history_collection = mongodb_client[DB_NAME]["chat_history"]

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index


In [67]:
# Create an index on the key `session_id` for the `history_collection` collection
history_collection.create_index("session_id")

'session_id_1'

### Define a function to store chat messages in MongoDB

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one

In [68]:
def store_chat_message(session_id: str, role: str, content: str) -> None:
    """
    Store a chat message in a MongoDB collection.

    Args:
        session_id (str): Session ID of the message.
        role (str): Role for the message. One of `system`, `user` or `assistant`.
        content (str): Content of the message.
    """
    # Create a message object with `session_id`, `role`, `content` and `timestamp` fields
    # `timestamp` should be set the current timestamp
    message = {
        "session_id": session_id,
        "role": role,
        "content": content,
        "timestamp": datetime.now(),
    }
    # Insert the `message` into the `history_collection` collection
    history_collection.insert_one(message)



### Define a function to retrieve chat history from MongoDB

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/cursor.html#pymongo.cursor.Cursor.sort

In [82]:
def retrieve_session_history(session_id: str) -> List:
    """
    Retrieve chat message history for a particular session.

    Args:
        session_id (str): Session ID to retrieve chat message history for.

    Returns:
        List: List of chat messages.
    """
    # Query the `history_collection` collection for documents where the "session_id" field has the value of the input `session_id`
    # Sort the results in increasing order of the values in `timestamp` field
    cursor =  history_collection.find({"session_id": session_id}).sort("timestamp", 1)

    if cursor:
        # Iterate through the cursor and extract the `role` and `content` field from each entry
        # Then format each entry as: {"role": <role_value>, "content": <content_value>}
        messages = [{"role": msg["role"], "content": [{ "text": msg["content"]}]} for msg in cursor]
    else:
        # If cursor is empty, return an empty list
        messages = []

    return messages

### Handle chat history in the `generate_answer` function

📚 https://docs.fireworks.ai/guides/querying-text-models#chat-completions-api


In [79]:
def generate_answer(session_id: str, user_query: str) -> None:
    """
    Generate an answer to the user's query taking chat history into account.

    Args:
        session_id (str): Session ID to retrieve chat history for.
        user_query (str): The user's query string.
    """
    # Initialize list of messages to pass to the chat completion model
    messages = []

    # Retrieve documents relevant to the user query and convert them to a single string
    context = vector_search(user_query)
    context = "\n\n".join([d.get("Content", "") for d in context])

    # # Create a system prompt containing the retrieved context
    # system_message = {
    #     "role": "assistant",
    #     "content": [
    #         {
    #             "text": f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}",
    #         }
    #     ]
    # }
    # # Append the system prompt to the `messages` list
    # messages.append(system_message)

    # Use the `retrieve_session_history` function to retrieve message history from MongoDB for the session ID `session_id`
    # And add all messages in the message history to the `messages` list
    message_history = retrieve_session_history(session_id)

    messages.extend(message_history)
    prompt = create_prompt(user_query)

    # Format the user message in the format {"role": <role_value>, "content": <content_value>}
    # The role value for user messages must be "user"
    # And append the user message to the `messages` list
    user_message = {"role": "user", "content": [{ "text": prompt}]}


    messages.append(user_message)

    # Call the chat completions API
    client = boto3.client(
          service_name='bedrock-runtime',
          config=my_config,
          aws_access_key_id=os.environ["AWS_ACCESS_KEY"],
          aws_secret_access_key=os.environ["AWS_SECRET"],
    )

    # Set the model ID, e.g., Titan Text Premier.
    model_id = "amazon.titan-text-premier-v1:0"

    # Use the `prompt` created above to populate the `content` field in the chat message
    response = client.converse(
        modelId=model_id,
        messages=messages,
        inferenceConfig={"topP":0.9},
        additionalModelRequestFields={}
    )

    # Extract the answer from the API response
    answer = response["output"]["message"]["content"][0]["text"]

    # Use the `store_chat_message` function to store the user message and also the generated answer in the message history collection
    # The role value for user messages is "user", and "assistant" for the generated answer
    store_chat_message(session_id, "user", user_query)
    store_chat_message(session_id, "assistant", answer)

    print(answer)

In [84]:
generate_answer(
    session_id="1",
    user_query="What are triggers in MongoDB Atlas?",
)

I DON'T KNOW


In [85]:
generate_answer(
    session_id="1",
    user_query="What did I just ask you?",
)

What are triggers in MongoDB Atlas?


In [86]:
generate_answer(
    session_id="1",
    user_query="Tell me more about AWS EC2",
)

Amazon EC2 is a web service that provides resizable compute capacity in the cloud  billed by the hour or second (minimum of 60 seconds)  You can run virtual machines (EC2 instances) ranging in size from one vCPU and one GB memory  to 448 vCPU and 6six TB memory You have a choice of operating systems including Windows Server 2008/2012 /2016/2019  Oracle Linux Red Hat Enterprise Linux and SUSE Linux Elastic Load Balanc ing Elastic Load Balancing automatically distributes incoming application traffic across multiple Amazon EC2 instances For example  you can use Elastic Load Balancing with Amazon EC2 to create a web application that is highly available and scalable
