# Running RAG experiments using S3 vectors and MLflow

## 1. Introduction

The following notebook illustrates the process of experimenting with different chunking, retrieval, and generation configurations when building a RAG (retrieval augmented generation) solution. In this case, we are going to use S3 vectors to store the embedding vectors, and MLflow to run evaluations and to store parameters from the experiments.

![Architecture](s3-vectors-buckets-arch.png)

## 2. Global parameters for the experiments

Let us set up some global parameters that we are going to use in this round of experiments. These parameters can be, for example, the model ID and the version of the SageMaker JumpStart models we are going to use, and the chunking parameters for processing the documents.

In [None]:
EMBEDDING_MODEL_ID = "huggingface-textembedding-gte-qwen2-7b-instruct"
EMBEDDING_MODEL_VERSION = "1.0.10"
NUM_DIMENSIONS = 3584 # Number of dimensions in the embedding model

TEXT_GENERATION_MODEL_ID = "deepseek-llm-r1-distill-qwen-7b"
TEXT_GENERATION_MODEL_VERSION = "2.0.5"

CHUNK_SIZE = 1000
CHUNK_OVERLAP = 250

## 3. Install required libraries

In [None]:
!pip install boto3 botocore mlflow sagemaker-mlflow langchain-text-splitters langgraph pypdf --quiet

## 4. Deploy embedding model and text generation model

Retrieval augmented generation (RAG) requires two types of models:

- Embedding model: To map text chunks into an embeddings space to perform similarity search.
- Text generation model: To generate responses based on the text chunks retrieved.

The following cell deploys the models from SageMaker Jumpstart and stores their names so we can use them later. To keep the code concise, several helper functions have been defined in the `utils.py` file and are imported in this notebook. We are using, for example, the `deploy_jumpstart_model` function to deploy models from SageMaker JumpStart.

This cell can take ~20 minutes.

In [None]:
from sagemaker.jumpstart.model import JumpStartModel
import sagemaker

def deploy_jumpstart_model(model_id: str,
                 instance_type: str,
                 endpoint_name_base: str,
                 model_version: str = "*") -> str:
    """
    Deploys a SageMaker JumpStart model.

    Args:
        model_id (str): The JumpStart model ID.
        instance_type (str): The SageMaker instance type.
        endpoint_name_base (str): Prefix for the endpoint name.
        model_version (str): The version of the model to deploy.

    Returns:
        str: Name of the deployed real-time endpoint
    """
    endpoint_name = sagemaker.utils.unique_name_from_base(endpoint_name_base)
    model = JumpStartModel(
        model_id=model_id,
        model_version=model_version,
        instance_type=instance_type
    )
    model.deploy(
        accept_eula=True,
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=endpoint_name
    )

    return endpoint_name

In [None]:
%%time

EMBEDDING_ENDPOINT_NAME = deploy_jumpstart_model(
    model_id=EMBEDDING_MODEL_ID,
    instance_type="ml.g5.xlarge",
    endpoint_name_base="s3-vectors-demo-embedding-model",
    model_version=EMBEDDING_MODEL_VERSION
    )
print(f"EMBEDDING_ENDPOINT_NAME:: {EMBEDDING_ENDPOINT_NAME}")

TEXT_GENERATION_ENDPOINT_NAME = deploy_jumpstart_model(
    model_id=TEXT_GENERATION_MODEL_ID,
    instance_type="ml.g6.12xlarge",
    endpoint_name_base="s3-vectors-demo-generation-model",
    model_version=TEXT_GENERATION_MODEL_VERSION
)

print(f"TEXT_GENERATION_ENDPOINT_NAME:: {TEXT_GENERATION_ENDPOINT_NAME}")

## 5. Create vectors bucket and vector indexes

We are going to create an S3 vectors bucket to store the vectors, and inside it, we are going to have two vector indexes, one for fixed-size chunking and one for recursive chunking. A vector index is a logical group for vector data, and it is used for all write and read requests.

In [None]:
import boto3
import sagemaker
from sagemaker.utils import unique_name_from_base

VECTORS_BUCKET_NAME = unique_name_from_base("s3-vectors-bucket")
FIXED_CHUNKING_INDEX_NAME = "fixed-chunking"
RECURSIVE_CHUNKING_INDEX_NAME = "recursive-chunking"

print(f"VECTORS_BUCKET_NAME:: {VECTORS_BUCKET_NAME}")

In [None]:
s3vectors_client = boto3.client("s3vectors")

s3vectors_client.create_vector_bucket(
    vectorBucketName=VECTORS_BUCKET_NAME
    )

for index_name in [FIXED_CHUNKING_INDEX_NAME, RECURSIVE_CHUNKING_INDEX_NAME]:
    s3vectors_client.create_index(
        vectorBucketName=VECTORS_BUCKET_NAME,
        indexName=index_name,
        dimension=NUM_DIMENSIONS,
        distanceMetric="euclidean",
        dataType="float32",
        metadataConfiguration={"nonFilterableMetadataKeys": ["chunk"]}
    )

## 6. Sample datasets

The dataset is ingested based on a JSON file which has the source URL of each document and associated meta-data. The files are downloaded and stored locally programatically using the JSON file as reference. You need to update the JSON file with the source URL and metadata of the documents you want to use as data source.

In the sample dataset, we are using publicly available financial data of amazon.com. There are two versions of this same json file - the `source_files_short_version.json` has reference for 3 documents whereas the other one has 9 documents. The default is the long version.

In [None]:
from utils import download_pdfs, process_pdf
import json

short_version = True

file_path = "source_files_short_version.json" if short_version else "source_files_long_version.json"
with open(file_path, 'r') as file:
    json_data = json.load(file)

urls, filenames, metadata = (json_data.get(k, []) for k in ('url', 'document', 'metadata'))

download_pdfs(urls, filenames)

## 7. Calculate embedding vectors

We are now going to define a function that calculates the embedding vector for text chunks. This function is going to use an embedding model to calculate the vectors. Both the vectors and the text chunks are stored in an S3 vector bucket, and metadata is added to each vector using the `domain` key to indicate its origin.

In [None]:
import boto3
import json
import uuid
from botocore.exceptions import ClientError
from typing import List, Dict
from sagemaker.predictor import Predictor

def calculate_embedding_vectors(
    chunks: List[Dict[str, str]],
    embeddings_model_endpoint: Predictor,
    vector_bucket_name: str,
    index_name: str,
    domain_name: str,
    year: int
):
    """
    Calculate embeddings vector and store them in the vector bucket.

    Args:
        chunks (List[Dict[str, str]]): A list of dictionaries, where each dictionary contains a chunk of text.
        embeddings_model_endpoint (Predictor): A SageMaker Predictor instance used to calculate embeddings.
        vector_bucket_name (str): The name of the vector bucket to store vector data.
        index_name (str): The vector index within the vector bucket.
        domain_name (str): The domain metadata to be added to the vectors.
        year (int): The year of the document
    """
    s3vectors_client = boto3.client("s3vectors")

    for chunk in chunks:
        unique_id = str(uuid.uuid4())
        key = f"{unique_id}"

        embedding_response = embeddings_model_endpoint.predict({'inputs': chunk["chunk"]})

        s3vectors_client.put_vectors(
            vectorBucketName=vector_bucket_name,
            indexName=index_name,
            vectors=[
                {
                    "key": key,
                    "data": {
                        "float32": embedding_response[0]
                    },
                    "metadata": {
                        "domain": domain_name,
                        "year": year,
                        "chunk": chunk["chunk"]
                    }
                }
            ]
        )

We now define a SageMaker Predictor based on the endpoint defined above for the embedding model.

In [None]:
from sagemaker.serializers import JSONSerializer, IdentitySerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor

embedding_model_predictor = Predictor(
    endpoint_name=EMBEDDING_ENDPOINT_NAME,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

## 8. Storing embedding vectors for fixed-size and recursive chunking

Now we are going to iterate through the files, process the content from the PDFs, and load the vectors as `records` in each vector index. We will keep a separate vector indexes for recursive chunking and fixed-size chunking, so we can compare their performance in RAG solutions later.

The next cells can take approximately 10 minutes each.

In [None]:
import time

start_time = time.time()
num_chunks = 0

# Iterate through the files to extract chunks, and store them in the S3 vectors bucket
for filename, meta in zip(filenames, metadata):
    extracted_chunks = process_pdf(
        filename, chunking="recursive",
        chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
    )
    num_chunks += len(extracted_chunks)
    print(f"filename:: {filename}")

    calculate_embedding_vectors(
        chunks=extracted_chunks,
        embeddings_model_endpoint=embedding_model_predictor,
        vector_bucket_name=VECTORS_BUCKET_NAME,
        index_name=RECURSIVE_CHUNKING_INDEX_NAME,
        domain_name=meta["domain"],
        year=meta["year"]
    )

end_time = time.time()

CHUNKING_TIME_RECURSIVE = end_time-start_time
print(f"Elapsed time: {CHUNKING_TIME_RECURSIVE:.0f} seconds")
NUM_CHUNKS_RECURSIVE = num_chunks
print(f"Number of text chunks: {NUM_CHUNKS_RECURSIVE}")

In [None]:
import time

start_time = time.time()
num_chunks = 0

# Iterate through the files to extract chunks, and store them in the S3 vectors bucket
for filename, meta in zip(filenames, metadata):
    extracted_chunks = process_pdf(
        filename, chunking="fixed",
        chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
    )
    num_chunks += len(extracted_chunks)
    print(f"filename:: {filename}")

    calculate_embedding_vectors(
        chunks=extracted_chunks,
        embeddings_model_endpoint=embedding_model_predictor,
        vector_bucket_name=VECTORS_BUCKET_NAME,
        index_name=FIXED_CHUNKING_INDEX_NAME,
        domain_name=meta["domain"],
        year=meta["year"]
    )

end_time = time.time()

CHUNKING_TIME_FIXED = end_time-start_time
print(f"Elapsed time: {CHUNKING_TIME_FIXED:.0f} seconds")
NUM_CHUNKS_FIXED = num_chunks
print(f"Number of text chunks: {NUM_CHUNKS_FIXED}")

## 9. Run simple semantic search

We are first going to test only the **retrieval** part by running semantic search within a vector index. When running a query with a string, the steps will be:

- Use the embedding model to compute the query vector

- Query the corresponding vector index using the query vector. The `metadata_filter` argument is optional and it may be used if we want to restrict the semantic search to a subset of document that come from a particular `domain`

- Extract the text chunks from the metadata of the returned records

In [None]:
import boto3
import json
from typing import List, Dict, Any, Optional

def query_vectors_with_text(
    text: str,
    embeddings_model_endpoint: Predictor,
    vector_bucket_name: str,
    index_name: str,
    top_K: int = 5,
    metadata_filter: Optional = None,
) -> List[Dict[str, Any]]:
    """
    Queries a vector index using a text input and returns the nearest neighbors.

    Args:
        text (str): The input text to query.
        embeddings_model_endpoint (Predictor): The endpoint for generating embeddings.
        vector_bucket_name (str): The name of the vector bucket.
        index_name (str): The name of the vector index to query.
        top_K (int): Number of neighbors to fetch.
        metadata_filter: The filters to apply.

    Returns:
        List[Dict[str, Any]]: A list of neighbors with their metadata and corresponding text chunks.
    """
    query_vector = embeddings_model_endpoint.predict({'inputs': text})[0]

    s3vectors_client = boto3.client("s3vectors")

    query_args = {
        "vectorBucketName": vector_bucket_name,
        "indexName": index_name,
        "queryVector": {"float32": query_vector},
        "topK": top_K,
        "returnDistance": True,
        "returnMetadata": True
    }

    if metadata_filter is not None:
        query_args["filter"] = metadata_filter

    vectors = s3vectors_client.query_vectors(**query_args).get("vectors", [])
    neighbors = [v.get("data", {}).get("float32") for v in vectors]
    chunks = [v.get("metadata", {}).get("chunk") for v in vectors]

    return neighbors, chunks

Now we can run a sample semantic query. We are going to inspect only Amazon financial documents using the `metadata_filter` parameter.

In [None]:
%%time

import random

_, chunks = query_vectors_with_text(
    text="How old is Jeff Bezos?",
    embeddings_model_endpoint=embedding_model_predictor,
    vector_bucket_name=VECTORS_BUCKET_NAME,
    index_name=RECURSIVE_CHUNKING_INDEX_NAME,
    top_K = 1,
    metadata_filter={
        "$and": [
            {"domain": {"$eq": "Amazon Financial Report"}},
            {"year": {"$eq": 2025}}
            ]
        }
    )

print(f"extracted_chunks count:: {len(chunks)}")
retrieved_chunk = random.choice(chunks)
print(f"Sample chunk:\n {retrieved_chunk}\n")

The sampled chunk should show some semantic similarity to the query string, probably by mentioning Amazon's board of directors in some form, if not by directly mentioning its members. Now let's try a different query without the metadata filter. This query will inspect more candidate text chunks.

In [None]:
%%time

import random

_, chunks = query_vectors_with_text(
    text="Who are the executive officers mentioned in Amazon financial report?",
    embeddings_model_endpoint=embedding_model_predictor,
    vector_bucket_name=VECTORS_BUCKET_NAME,
    index_name=RECURSIVE_CHUNKING_INDEX_NAME
    )

print(f"extracted_chunks count:: {len(chunks)}")
retrieved_chunk = random.choice(chunks)
print(f"Sample chunk:\n {retrieved_chunk}\n")

Let's now inspect a different domain by focusing on the annual report.

In [None]:
%%time

import random

_, chunks = query_vectors_with_text(
    text="What are the business and industry risk called out in Amazon's 2024 annual report?",
    embeddings_model_endpoint=embedding_model_predictor,
    vector_bucket_name=VECTORS_BUCKET_NAME,
    index_name=RECURSIVE_CHUNKING_INDEX_NAME,
    metadata_filter={"domain": {"$eq": "Amazon Annual Report"}}
    )

print(f"extracted_chunks count:: {len(chunks)}")
retrieved_chunk = random.choice(chunks)
print(f"Sample chunk:\n {retrieved_chunk}\n")

## 10. Retrieval augmented generation

Now we are going to put the semantic search in conjunction with a generation step to have a complete retrieval augmented generation call. This requires two steps, so we are going to define a `StateGraph` from `LangGraph` to define the flow between the `retrieve` and `generate` steps.

In [None]:
from sagemaker.serializers import JSONSerializer, IdentitySerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor

# Load the text generation model as a SageMaker Predictor
text_generation_predictor = Predictor(
    endpoint_name=TEXT_GENERATION_ENDPOINT_NAME,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

In [None]:
from langchain import hub
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import START, StateGraph
from typing import List, Dict, Union, Any
from typing_extensions import TypedDict

from utils import langchain_to_openai_messages

# Load RAG prompt from LangChain Hub
prompt = hub.pull("rlm/rag-prompt")

# Define application state structure
class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def retrieve(state: State, chunking_strategy: str = "recursive") -> Dict[str, List[Document]]:
    """
    Retrieves relevant documents for a given question using vector search.
    
    Args:
        state (State): The current state containing the question.
        chunking_strategy (str): Either "recursive" or "fixed" to control the vector index used.

    Returns:
        Dict[str, List[Document]]: A dictionary with retrieved context documents.
    """
    try:
        index_name = (
            RECURSIVE_CHUNKING_INDEX_NAME if chunking_strategy == "recursive"
            else FIXED_CHUNKING_INDEX_NAME
        )

        _, chunks = query_vectors_with_text(
            text=state["question"],
            embeddings_model_endpoint=embedding_model_predictor,
            vector_bucket_name=VECTORS_BUCKET_NAME,
            index_name=index_name,
        )
        document_chunks = [Document(page_content=chunk) for chunk in chunks]
        return {"context": document_chunks}
    except Exception as e:
        raise RuntimeError(f"Document retrieval failed: {e}")


def generate(state: State) -> Dict[str, str]:
    """
    Generates an answer using retrieved context and user question.

    Args:
        state (State): The current state containing question and context documents.

    Returns:
        Dict[str, str]: A dictionary with the generated answer.

    Raises:
        RuntimeError: If text generation fails or returns an unexpected response.
    """
    try:
        docs_content = "\n\n".join(doc.page_content for doc in state["context"])

        # Generate LangChain messages and convert to OpenAI format
        lc_messages = prompt.invoke({
            "question": state["question"],
            "context": docs_content
        }).to_messages()
        openai_messages = langchain_to_openai_messages(lc_messages)

        request = {
            "messages": openai_messages,
            "temperature": 0.2,
            "max_new_tokens": 512,
        }

        response = text_generation_predictor.predict(request)

        # Validate response structure
        if ("choices" not in response 
                or not response["choices"] 
                or "message" not in response["choices"][0]
                or "content" not in response["choices"][0]["message"]):
            raise ValueError("Unexpected response format from text generation predictor.")

        return {"answer": response["choices"][0]["message"]["content"]}

    except Exception as e:
        raise RuntimeError(f"Text generation failed: {e}")

The following cells assembles the functions defined above into a LangGraph `StateGraph` with two nodes: `retrieve` and `generate`.

In [None]:
from functools import partial
from langgraph.graph.state import StateGraph

def build_graph(chunking_strategy: str) -> StateGraph:
    """
    Build LangGraph graph for RAG

    Args:
        chunking_strategy (str): Define the preferred chunking strategy.

    Returns:
        graph (StateGraph): LangGraph StateGraph.
    """
    # Build graph explicitly adding the node with a name
    graph_builder = StateGraph(State)
    graph_builder.add_node("retrieve", partial(retrieve, chunking_strategy=chunking_strategy))
    graph_builder.add_node("generate", generate)
    # Define edges
    graph_builder.set_entry_point("retrieve")
    graph_builder.add_edge("retrieve", "generate")

    return graph_builder

We are going to build and compile two different `StateGraphs`, one for querying the vector index with vectors obtained through `recursive` chunking, and the other one to focus on those obtained with `fixed` sized chunking.

In [None]:
# Compile the graphs
recursive_graph_builder = build_graph(chunking_strategy="recursive")
recursive_graph = recursive_graph_builder.compile()

fixed_graph_builder = build_graph(chunking_strategy="fixed")
fixed_graph = fixed_graph_builder.compile()

Let's test the graphs by invoking them with a simple question. This time, rather than simply retrieving text chunks that are semantically similar, the graph will als use these chunking when composing a response.

In [None]:
%%time

response = recursive_graph.invoke(
    {
        "question": "What are the names of the people in Amazon's board of directors?"
    }
    )
print(response["answer"])

In [None]:
%%time

response = fixed_graph.invoke(
    {
        "question": "What are the names of the people in Amazon's board of directors?"
    }
    )
print(response["answer"])

In this case, both chunking strategies should succeed in generating accurate responses to the question.

## 11. Evaluating the RAG solutions using MLflow

Now we have a `LangGraph` graph to run RAG, it would be good to know how good it is. This is critical in the experimentation phase because we could use this information to adjust the chunking method, chunk size, and the models for better performance.

We are going to use a ground truth data set in the form of questions and answers in a JSONL file.

In [None]:
import pandas as pd

eval_df = pd.read_json("amazon_10k_eval_dataset.jsonl", lines=True)
eval_df.head()

For the evaluation, we are going to use [MLflow LLM metrics](https://mlflow.org/docs/latest/llms/llm-evaluate/), which require the definition of `model` functions. We are going to create a different function to invoke each `StateGraph` graph.

In [None]:
def recursive_model(input_df: pd.DataFrame) -> List[str]:
    """
    Takes a pandas DataFrame with a 'question' column, passes each question 
    to a LangGraph model for inference, and returns a list of string answers.

    This function is intended for use with MLflow evaluation workflows.
    """
    answer = []
    for index, row in input_df.iterrows():
        response = recursive_graph.invoke({
            "question": row["question"]
        })
        answer.append(response["answer"])

    return answer

def fixed_model(input_df: pd.DataFrame) -> List[str]:
    """
    Takes a pandas DataFrame with a 'question' column, passes each question 
    to a LangGraph model for inference, and returns a list of string answers.

    This function is intended for use with MLflow evaluation workflows.
    """
    answer = []
    for index, row in input_df.iterrows():
        response = fixed_graph.invoke({
            "question": row["question"]
        })
        answer.append(response["answer"])

    return answer

Let's define some environment variables for MLflow to invoke AWS models. **The model evaluation will fail without these AWS credentials.**

In [None]:
import boto3
import os

# Set AWS environment variables for MLflow
os.environ["AWS_REGION"] = boto3.session.Session().region_name
os.environ["AWS_ACCESS_KEY_ID"] = "" # Insert AWS access key here
os.environ["AWS_SECRET_ACCESS_KEY"] = "" # Insert AWS secret access key here

We are going to connect a managed MLflow tracking server to store results from the experiment.

**Make sure that you already have an ML tracking server running in your SageMaker domain**

In [None]:
tracking_servers = [s['TrackingServerArn'] for s 
                    in boto3.client("sagemaker").list_mlflow_tracking_servers()['TrackingServerSummaries']
                    if s['IsActive'] == 'Active']

if len(tracking_servers) < 1:
    print("You don't have any active MLflow servers. Trying to find a server in the status 'Creating'...")

    r = boto3.client("sagemaker").list_mlflow_tracking_servers(
        TrackingServerStatus='Creating',
    )['TrackingServerSummaries']

    if len(r) < 1:
        print("You don't have any MLflow server in the status 'Creating'. Run the next code cell to create a new one.")
        mlflow_arn = None
        mlflow_name = None
    else:
        mlflow_arn = r[0]['TrackingServerArn']
        mlflow_name = r[0]['TrackingServerName']
        print(f"You have an MLflow server {mlflow_arn} in the status 'Creating', going to use this one")
else:
    mlflow_arn = tracking_servers[0]
    mlflow_name = tracking_servers[0].split('/')[1]
    print(f"You have {len(tracking_servers)} running MLflow server(s). Get the first server ARN:{mlflow_arn}")

Now we are going to run the evaluation using `mlflow.evaluate` and store the results in our tracking server. In this case, we are going to track both the latency in the evaluations and the correctness of the answer as evaluated using Claude 3 Sonnet (LLM as a judge). We are also going to add metrics from the chunking and embedding stages.

In [None]:
import mlflow
from time import gmtime, strftime, sleep

mlflow.set_tracking_uri(mlflow_arn)
experiment_suffix = strftime('%d-%H-%M-%S', gmtime())
experiment_name = f"recursive-chunking-exp-{experiment_suffix}"
experiment = mlflow.set_experiment(experiment_name=experiment_name)

In [None]:
# Run evaluation
with mlflow.start_run():
    results = mlflow.evaluate(
        model=recursive_model,
        data=eval_df,
        targets="answer",
        extra_metrics=[
            mlflow.metrics.genai.answer_correctness(
                model="bedrock:/anthropic.claude-3-sonnet-20240229-v1:0",
                parameters={
                    "temperature": 0.2,
                    "max_tokens": 256,
                    "anthropic_version": "bedrock-2023-05-31",
                },
            ),
            mlflow.metrics.latency(),
        ],
        evaluator_config={
          "col_mapping": {
              "inputs": "question",
          }
        }
    )

    # Store metrics from the evaluation
    mlflow.log_metrics(results.metrics)
    # Store other parameters
    mlflow.log_param("Chunking strategy", "recursive")
    mlflow.log_param("Chunk size", CHUNK_SIZE)
    mlflow.log_param("Chunk overlap", CHUNK_OVERLAP)
    mlflow.log_param("Number of chunks", NUM_CHUNKS_RECURSIVE)
    mlflow.log_param("Embedding model ID", EMBEDDING_MODEL_ID)
    mlflow.log_param("Embedding model version", EMBEDDING_MODEL_VERSION)
    mlflow.log_param("Text generation model ID", TEXT_GENERATION_MODEL_ID)
    mlflow.log_param("Text generation model version", TEXT_GENERATION_MODEL_VERSION)
    mlflow.log_param("Chunking time seconds", CHUNKING_TIME_RECURSIVE)

In [None]:
import mlflow
from time import gmtime, strftime, sleep

mlflow.set_tracking_uri(mlflow_arn)
experiment_suffix = strftime('%d-%H-%M-%S', gmtime())
experiment_name = f"fixed-chunking-exp-{experiment_suffix}"
experiment = mlflow.set_experiment(experiment_name=experiment_name)

In [None]:
# Run evaluation
with mlflow.start_run():
    results = mlflow.evaluate(
        model=fixed_model,
        data=eval_df,
        targets="answer",
        extra_metrics=[
            mlflow.metrics.genai.answer_correctness(
                model="bedrock:/anthropic.claude-3-sonnet-20240229-v1:0",
                parameters={
                    "temperature": 0.2,
                    "max_tokens": 256,
                    "anthropic_version": "bedrock-2023-05-31",
                },
            ),
            mlflow.metrics.latency(),
        ],
        evaluator_config={
          "col_mapping": {
              "inputs": "question",
          }
        }
    )

    # Store metrics from the evaluation
    mlflow.log_metrics(results.metrics)
    # Store other parameters
    mlflow.log_param("Chunking strategy", "fixed")
    mlflow.log_param("Chunk size", CHUNK_SIZE)
    mlflow.log_param("Chunk overlap", CHUNK_OVERLAP)
    mlflow.log_param("Number of chunks", NUM_CHUNKS_FIXED)
    mlflow.log_param("Embedding model ID", EMBEDDING_MODEL_ID)
    mlflow.log_param("Embedding model version", EMBEDDING_MODEL_VERSION)
    mlflow.log_param("Text generation model ID", TEXT_GENERATION_MODEL_ID)
    mlflow.log_param("Text generation model version", TEXT_GENERATION_MODEL_VERSION)
    mlflow.log_param("Chunking time seconds", CHUNKING_TIME_FIXED)

Results from the experiments have been loaded into the MLflow tracking server. Machine learning engineers can use MLflow experiments to compare different RAG configurations and see the effect of changing:

- Prompt
- Embedding model
- Chunk size
- Chunk overlap
- Chunking strategy
- Distance metric

and others, and use this information to pick highly performant RAG solutions. Metrics can be viewed and compared in the MLflow tracking server:

![MLflow Experiment Run](mlflow-experiment-run.png)

## 12. Clean up

Let's delete the models, S3 buckets, and S3 vectors buckets.

In [None]:
embedding_model_predictor.delete_endpoint()
text_generation_predictor.delete_endpoint()

In [None]:
# Delete S3 vectors bucket and indexes
import boto3

s3vectors = boto3.client("s3vectors")

s3vectors.delete_index(
    vectorBucketName=VECTORS_BUCKET_NAME,
    indexName=FIXED_CHUNKING_INDEX_NAME
)
s3vectors.delete_index(
    vectorBucketName=VECTORS_BUCKET_NAME,
    indexName=RECURSIVE_CHUNKING_INDEX_NAME
)
s3vectors.delete_vector_bucket(
    vectorBucketName=VECTORS_BUCKET_NAME
)

In [None]:
!rm -rf data/