# Fine-Tune BGE Embedding Model Using Synthetic Data from Amazon Bedrock

## Introduction

Have you ever faced the challenge of obtaining high-quality data for fine-tuning your machine learning models? Generating synthetic data can provide a robust solution, especially when real-world data is scarce or sensitive. For instance, when developing a medical search engine, obtaining a large dataset of real user queries and relevant documents is often infeasible due to privacy concerns surrounding personal health information. However, synthetic data generation techniques can be employed to create realistic query-document pairs that resemble authentic user searches and relevant medical content, enabling the training of accurate retrieval models while preserving user privacy.

In this blog post, we'll demonstrate how to leverage [Amazon Bedrock](https://aws.amazon.com/bedrock/) to create synthetic data, fine-tune a [BAAI General Embeddings (BGE) model](https://github.com/FlagOpen/FlagEmbedding?tab=readme-ov-file#bge-embedding), and deploy it using [Amazon SageMaker](https://aws.amazon.com/sagemaker/).

Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon via a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI.

### What is the BGE Model?

BGE stands for Beijing Academy of Artificial Intelligence (BAAI) General Embeddings. It is a family of embedding models with a BERT-like architecture, designed to produce high-quality embeddings from text data. The BGE models come in three sizes:

- `bge-small-en-v1.5`: 0.13GB, 384 embedding dimension
- `bge-base-en-v1.5`: 0.44GB, 768 embedding dimension
- `bge-large-en-v1.5`: 1.34GB, 1024 embedding dimension

For comparing two pieces of text, the BGE model functions as a bi-encoder architecture, processing each piece of text through the same model in parallel to obtain their embeddings.

### Why Use Synthetic Data?

Generating synthetic data can significantly enhance the performance of your models by providing ample, high-quality training data without the constraints of traditional data collection methods. This blog post will guide you through generating synthetic data using Amazon Bedrock, fine-tuning a BGE model, evaluating its performance, and deploying it with Amazon SageMaker.

### Solution Overview

In this blog post, we will:

1. Set up a SageMaker Studio Environment with the necessary IAM policies.
2. Create a Conda environment for dependencies.
3. Generate synthetic data using Meta Llama3 on Amazon Bedrock.
4. Fine-tune the BGE embedding model with the generated data.
5. Evaluate and compare the fine-tuned model.
6. Deploy the model using Amazon SageMaker and [Hugging Face Text Embeddings Inference (TEI)](https://huggingface.co/docs/text-embeddings-inference/en/index).

## Step 1: Generate Synthetic Data Using Amazon Bedrock

We’ll start by adapting [LlamaIndex’s embedding model fine-tuning guide](https://docs.llamaindex.ai/en/stable/examples/finetuning/embeddings/finetune_embedding/) to use Amazon Bedrock to generate synthetic data for fine-tuning.

### Download the data

For our training and validation data, we’ll use the same Uber and Lyft 10K documents that are provided by the referenced LlamaIndex guide. 

Note that we're using "!" in Jupyter notebooks to execute shell commands directly from the notebook cells, which is convenient for performing system operations that might be more cumbersome or longer to implement directly in Python code.

First we’ll download the data to be used for training and evaluation:

In [None]:
!mkdir -p 'data/10k/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/10k/uber_2021.pdf'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/lyft_2021.pdf' -O 'data/10k/lyft_2021.pdf'

Next, we'll select PDFs from the corpus to train and evaluate on. You can replace these with your own data files.

In [None]:
TRAIN_FILES = ["./data/10k/lyft_2021.pdf"]
VAL_FILES = ["./data/10k/uber_2021.pdf"]

TRAIN_CORPUS_FPATH = "./data/train_corpus.json"
VAL_CORPUS_FPATH = "./data/val_corpus.json"

Next, we create the corpus of text chunks by using LlamaIndex functionality to load the PDFs, and then parsing them into plain text chunks.

In [None]:
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter

def load_corpus(files, verbose=False, chunk_size=1024, chunk_overlap=20):
    if verbose:
        print(f"Loading files {files}")

    reader = SimpleDirectoryReader(input_files=files)
    docs = reader.load_data()
    if verbose:
        print(f"Loaded {len(docs)} docs")

    parser = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    nodes = parser.get_nodes_from_documents(docs, show_progress=verbose)

    if verbose:
        print(f"Parsed {len(nodes)} nodes")

    return nodes

chunk_size = 1024
chunk_overlap = 20

train_nodes = load_corpus(TRAIN_FILES, verbose=True, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
val_nodes = load_corpus(VAL_FILES, verbose=True, chunk_size=chunk_size, chunk_overlap=chunk_overlap)

After parsing the data, we’ll run a couple cleaning operations to discard samples that aren’t useful for training and validation.

In [None]:
# Remove nodes where len(node.text) less than min_len
min_len = 20
train_nodes = [node for node in train_nodes if len(node.text) >= min_len]
val_nodes = [node for node in val_nodes if len(node.text) >= min_len]

print(len(train_nodes), len(val_nodes))

# Remove nodes where text is not valid UTF-8
def is_valid_utf8(string):
    """
    Checks if a string is valid UTF-8.

    :param string: String to be checked.
    :return: True if the string is valid UTF-8, otherwise False.
    """
    try:
        # Try encoding and decoding to check for UTF-8 validity
        string.encode('utf-8').decode('utf-8')
        return True
    except UnicodeEncodeError:
        return False

train_nodes = [node for node in train_nodes if is_valid_utf8(node.text)]
val_nodes = [node for node in val_nodes if is_valid_utf8(node.text)]

print(len(train_nodes), len(val_nodes))

### Set Up Your LLM

To generate synthetic data, we will use a Large Language Model (LLM) from Amazon Bedrock. In this case, we will use the Meta Llama3-70B-Instruct model, which offers great performance for low cost.

First, set up the LLM and define the prompt template for generating questions based on the context:


In [None]:
from botocore.config import Config
from langchain_aws import ChatBedrock
from llama_index.llms.langchain import LangChainLLM

config = Config(retries={"max_attempts": 10, "mode": "adaptive"})

model = ChatBedrock(model_id="meta.llama3-70b-instruct-v1:0", config=config)
llm = LangChainLLM(llm=model)

qa_generate_prompt_tmpl = """
You are an AI assistant helping a teacher generate questions for an upcoming quiz. I will provide you with some context material in <context> tags. Your task is to generate {{NUM_QUESTIONS}} question(s) based only on the given context. The questions should be diverse and cover different parts of the context. Do not rely on any outside knowledge.

Here are some important guidelines to follow:
- Number each question, starting from 1.
- Put each question on its own line, with no blank lines in between.
- Do not generate any multiple choice questions.
- Restrict the questions to information provided in the context only.
- Do not reference the text, just directly ask a question that can be answered with it.
    - For example, do not add phrases like, "as referenced in the context"
- Do not output anything other than the questions.

Here is the context to use for generating questions:
<context>
{context_str}
</context>

Please generate your numbered questions in the following format:

1. Question 1
2. Question 2
3. Question 3

The number of questions should match the {num_questions_per_chunk} value exactly.

Assistant: Here is a list of questions to ask from the context above, where each question is a single line:
"""

### Generate Synthetic QA Pairs

Now, use the LLM to generate synthetic question-answer pairs. Here is a function to generate these pairs in parallel for a given set of nodes:

In [None]:
import os
import re
import uuid
from typing import List
from tqdm import tqdm

from llama_index.core.llms import LLM
from llama_index.core.schema import MetadataMode, TextNode
from llama_index.finetuning.embeddings.common import DEFAULT_QA_GENERATE_PROMPT_TMPL
from llama_index.finetuning import EmbeddingQAFinetuneDataset

from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_qa_embedding_pairs_parallel(
    nodes: List[TextNode],
    llm: LLM,
    qa_generate_prompt_tmpl: str = DEFAULT_QA_GENERATE_PROMPT_TMPL,
    num_questions_per_chunk: int = 2,
    max_workers: int = 8  # Number of parallel threads
) -> EmbeddingQAFinetuneDataset:
    """Generate examples given a set of nodes in parallel with a progress bar."""
    node_dict = {
        node.node_id: node.get_content(metadata_mode=MetadataMode.NONE)
        for node in nodes
    }

    queries = {}
    relevant_docs = {}

    def process_node(node_id, text):
        query = qa_generate_prompt_tmpl.format(
            context_str=text, num_questions_per_chunk=num_questions_per_chunk
        )
        response = llm.complete(query)

        result = str(response).strip().split("\n")
        questions = [
            re.sub(r"^\d+[\).\s]", "", question).strip() for question in result
        ]
        questions = [question for question in questions if len(question) > 0]

        local_queries = {}
        local_relevant_docs = {}
        for question in questions:
            question_id = str(uuid.uuid4())
            local_queries[question_id] = question
            local_relevant_docs[question_id] = [node_id]

        return local_queries, local_relevant_docs

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all node processing jobs
        future_to_node = {executor.submit(process_node, node_id, text): node_id for node_id, text in node_dict.items()}

        # Process futures as they complete
        for future in tqdm(as_completed(future_to_node), total=len(future_to_node), desc="Processing nodes"):
            node_id = future_to_node[future]
            try:
                local_queries, local_relevant_docs = future.result()
                queries.update(local_queries)
                relevant_docs.update(local_relevant_docs)
            except Exception as exc:
                print(f'Node {node_id} generated an exception: {exc}')

    # Construct dataset
    return EmbeddingQAFinetuneDataset(
        queries=queries, corpus=node_dict, relevant_docs=relevant_docs
    )

# Usage
train_dataset = generate_qa_embedding_pairs_parallel(train_nodes, llm=llm, qa_generate_prompt_tmpl=qa_generate_prompt_tmpl)
val_dataset = generate_qa_embedding_pairs_parallel(val_nodes, llm=llm, qa_generate_prompt_tmpl=qa_generate_prompt_tmpl)

# Create directories if they don't exist
if not os.path.exists("./data"):
    os.makedirs("./data")

TRAIN_CORPUS_FPATH = "./data/spgi_train_corpus.json"
VAL_CORPUS_FPATH = "./data/spgi_val_corpus.json"

train_dataset.save_json(TRAIN_CORPUS_FPATH)
val_dataset.save_json(VAL_CORPUS_FPATH)

### Load the Data

Optionally, you can load the data using the EmbeddingQAFinetuneDataset class. This is useful if you want to verify the contents of your datasets:

In [None]:
# [Optional] Load
train_dataset = EmbeddingQAFinetuneDataset.from_json(TRAIN_CORPUS_FPATH)
val_dataset = EmbeddingQAFinetuneDataset.from_json(VAL_CORPUS_FPATH)

# Check the first 20 query-docid pairs to see if the questions generated look reasonable
for p in train_dataset.query_docid_pairs[:20]:
    print(p)

### Save the Generated Data

After generating the synthetic data, save it in JSONL format for later use in fine-tuning the BGE model.

Note that instead of using LlamaIndex for fine-tuning, we'll use the [FlagEmbedding](https://github.com/FlagOpen/FlagEmbedding) library, so that we can take advantage of additional features this library provides such as Hard Negative Mining and Model Merging. For FlagEmbedding, data should be a json file, where each line is a dict like this:

```json
{"query": str, "pos": List[str], "neg": List[str]}
```

We'll take the chunks we've parsed and convert to this format. Negatives can be hard-negative mined in the next step if not available.

Here’s how you can write the positive samples to a JSONL file:

In [None]:
import json
import random
from tqdm.auto import tqdm

def write_jsonl(dataset: EmbeddingQAFinetuneDataset, filepath: str):
    """Write dataset to jsonl file."""
    # Collect the data
    data = []
    for query, doc_id in tqdm(dataset.query_docid_pairs):
        assert len(doc_id) == 1
        text = dataset.corpus[doc_id[0]]
        data.append({"query": query, "text": text})
    # Shuffle data
    random.shuffle(data)
    # Write to jsonl file
    with open(filepath, "w") as f:
        for sample in data:
            query = sample["query"]
            text = sample["text"]
            f.write(json.dumps({"query": query, "pos": [text]}) + "\n")

# Define file paths
TRAIN_DATA_FILEPATH = "./data/train_data.jsonl"
VAL_DATA_FILEPATH = "./data/val_data.jsonl"

# Usage
write_jsonl(train_dataset, TRAIN_DATA_FILEPATH)
write_jsonl(val_dataset, VAL_DATA_FILEPATH)

This process ensures that you have a well-structured synthetic dataset ready for the next step of fine-tuning the BGE model.

## Step 2: Fine-Tune the BGE Embedding Model

### Model Selection

To fine-tune the BGE model, we need to choose the appropriate model size based on our requirements and available resources. The BGE models come in three sizes:

- `bge-small-en-v1.5`: 0.13GB, 384 embedding dimension
- - `bge-base-en-v1.5`: 0.44GB, 768 embedding dimension
- `bge-large-en-v1.5`: 1.34GB, 1024 embedding dimension

For this demonstration, we will use the `bge-base-en-v1.5` model as it offers a good balance between performance and resource requirements.

In [None]:
from huggingface_hub import snapshot_download

model_name = "BAAI/bge-base-en-v1.5"
base_model_local_dir = model_name.split("/")[-1]

# We'll opt to download the model locally
downloaded_model_dir = snapshot_download(
    model_name,
    local_dir=base_model_local_dir,
)

### Defining Instructions

The BGE model can make use of instructions during the fine-tuning process. We can do this by prepending instructions to the passages and/or queries used for fine-tuning. In this example, we'll define a retrieval instruction for the query, and skip using a passage instruction.

In [None]:
query_instruction_for_retrieval = "Represent the Financial question for retrieving supporting documents: "

# We could also use an instruction for retrieval, but it's not necessary
passage_instruction_for_retrieval = ""

### Generate Hard Negatives

Hard negative mining is an essential step in fine-tuning embedding models. It helps improve the model's ability to distinguish between similar but not identical text pairs, thereby enhancing the overall quality of the embeddings. In this step, we will generate hard negatives to fine-tune our BGE model.

We will use a pre-defined script to perform hard negative mining. The script utilizes the `FlagEmbedding` module to mine hard negatives from our dataset. Combining Python code with running scripts directly from a Jupyter notebook cell provides a flexible and efficient workflow. In the below code, `sys.executable` points to the Python executable for this Conda environment.

In [None]:
import sys

# Define math for saving hard negatives
TRAIN_DATA_HARD_NEGATIVE_FILEPATH = "./data/train_data_minedHN.jsonl"

# Hard Negative Mining
!{sys.executable} -m FlagEmbedding.baai_general_embedding.finetune.hn_mine \
--model_name_or_path "{base_model_local_dir}" \
--input_file "{TRAIN_DATA_FILEPATH}" \
--output_file "{TRAIN_DATA_HARD_NEGATIVE_FILEPATH}" \
--range_for_sampling 3-200 \
--use_gpu_for_searching \
--query_instruction_for_retrieval "{query_instruction_for_retrieval}"

Running these scripts directly from a Jupyter notebook cell allows for seamless integration and automation within your workflow.

### Fine-Tuning the Model

Now we'll kick off the fine-tuning process. The model is trained with CrossEntropyLoss using in-batch negative sampling. This process involves adjusting the model's weights based on our custom dataset, which includes both positive and hard negative pairs.

#### Important Training Arguments

Here are some critical arguments to consider when fine-tuning the model:

- `per_device_train_batch_size`: Batch size in training. Generally, a larger batch size will yield better performance. You can increase it by enabling options such as --fp16, --deepspeed ./df_config.json (you can refer to ds_config.json), and --gradient_checkpointing.
- `train_group_size`: The number of positive and negative samples for a query during training. This argument controls the number of negatives (#negatives = train_group_size - 1). Ensure that the number of negatives is not larger than the number of negatives in the data ("neg": List[str]). In-batch negatives are also used in fine-tuning.
- `negatives_cross_device`: Shares the negatives across all GPUs, extending the number of negatives.
- `learning_rate`: Select an appropriate learning rate for your model. Recommended values are 1e-5, 2e-5, or 3e-5 for large, base, or small-scale models, respectively.
- `temperature`: Influences the distribution of similarity scores.
- `query_max_len`: Maximum length for queries. Set this according to the average length of queries in your data.
- `passage_max_len`: Maximum length for passages. Set this according to the average length of passages in your data.
- `query_instruction_for_retrieval`: Instruction for queries, which will be added to each query. You can also set it to "" to add nothing to the query.
- `use_inbatch_neg`: Uses passages in the same batch as negatives. The default value is True.

For more training arguments please refer to [transformers.TrainingArguments](https://huggingface.co/docs/transformers/main_classes/trainer#transformers.TrainingArguments).

#### Training Command

We'll try using `gradient_accumulation_steps` in order to have a larger effective batch size. Here is the command used to initiate the training process:

In [None]:
import os

# Set the torchrun path
torchrun_path = os.path.join(os.path.dirname(sys.executable), "torchrun")

# Disable parallelism to avoid deadlocks
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Define the output path for fine-tuning
finetuned_model_dir = "./finetuned_model"

# Kick off the FlagEmbedding fine-tuning script using torchrun, passing the necessary arguments
!{torchrun_path} --nproc_per_node 1 \
-m FlagEmbedding.baai_general_embedding.finetune.run \
--output_dir "{finetuned_model_dir}" \
--model_name_or_path "{base_model_local_dir}" \
--train_data "{TRAIN_DATA_HARD_NEGATIVE_FILEPATH}" \
--learning_rate 1e-5 \
--fp16 \
--num_train_epochs 20 \
--per_device_train_batch_size 32 \
--gradient_accumulation_steps 10 \
--dataloader_drop_last True \
--normlized True \
--temperature 0.02 \
--query_max_len 256 \
--passage_max_len 256 \
--train_group_size 2 \
--negatives_cross_device \
--logging_steps 10 \
--query_instruction_for_retrieval "{query_instruction_for_retrieval}" \
--passage_instruction_for_retrieval "{passage_instruction_for_retrieval}"

### Inspect the trained model

After fine-tuning, we can load the model and print it:

In [None]:
from transformers import AutoModel

# Load model from HuggingFace Hub
model = AutoModel.from_pretrained(finetuned_model_dir)

print(model)

## Step 3: Model Merging

Next we'll use `LM-Cocktail` to merge the fine-tuned weights with the original weights. This creates new parameters by calculating a weighted average of two or more models' parameters. Despite being simple, this tends to give better performance by mitigating the problem of catastrophic forgetting.

In [None]:
from LM_Cocktail import mix_models

mixed_model_output_dir = "./mixed_model"

# Mix fine-tuned model and base model; then save it to output_path
model = mix_models(
    model_names_or_paths=[base_model_local_dir, finetuned_model_dir],
    model_type='encoder',
    weights=[0.25, 0.75],  # you can change the weights to get a better trade-off.
    output_path=mixed_model_output_dir
)

## Step 4: Test the Model Locally

We can optionally test the model locally now before deploying. we will test the model out with pairs of queries and documents that we'd expect to score high in similarity, and then with pairs we'd expect to score low in similarity.

In [None]:
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoModel, AutoTokenizer
import torch

def model_fn(model_dir):
    # load tokenizer and model from model_dir
    tokenizer = AutoTokenizer.from_pretrained(model_dir, use_safetensors=True)
    model = AutoModel.from_pretrained(model_dir, use_safetensors=True)
    model.eval()

    return tokenizer, model

def predict_fn(data, tokenizer_and_model):
    # unpack tokenizer and model
    tokenizer, model = tokenizer_and_model

    # process input
    inputs = data.pop("inputs", data)
    instruction = data.pop("instruction", None)

    # Add instruction to query if instruction is provided
    # for s2p(short query to long passage) retrieval task, add an instruction to query (DON'T add instruction for passages, unless fine-tuned)
    if instruction is not None:
        inputs = [instruction + q for q in inputs]

    # Tokenize input sentences
    encoded_input = tokenizer(inputs, padding=True, truncation=True, return_tensors='pt')

    # Compute token embeddings
    with torch.no_grad():
        model_output = model(**encoded_input)
        # Perform pooling. In this case, cls pooling.
        embeddings = model_output[0][:, 0]
    # normalize embeddings
    embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)

    return {"embeddings": embeddings}

# Load the fine-tuned mixed model
model, tokenizer = model_fn(mixed_model_output_dir)

In [None]:
# Similar documents and queries
similar_documents = [
    "The company's revenue for the fiscal year 2022 was $500 million, a 10% increase from the previous year. This growth was primarily driven by strong sales in the company's core product lines and successful expansion into new markets.",
    "The company's net income for the fiscal year 2022 was $75 million, a 15% increase from the previous year. This improvement was mainly attributed to higher revenue, cost optimization initiatives, and operational efficiencies.",
    "The company's total assets as of December 31, 2022, were $1.2 billion, a 5% increase from the previous year. This increase was primarily due to the acquisition of XYZ Company and investments in property, plant, and equipment."
]

similar_queries = [
    "What was the company's revenue for the fiscal year 2022?",
    "How much did the company's net income increase in fiscal year 2022 compared to the previous year?",
    "What was the value of the company's total assets as of December 31, 2022?"
]

# Different documents and queries
different_documents = [
    "The company's research and development expenses for the fiscal year 2022 were $50 million, representing 10% of the total revenue. The company continues to invest in innovative technologies and product development to maintain its competitive edge in the market.",
    "The company's board of directors declared a quarterly dividend of $0.50 per share, payable on September 15, 2023, to shareholders of record as of August 31, 2023. This represents a 5% increase from the previous quarter's dividend.",
    "The company's cash and cash equivalents balance as of December 31, 2022, was $200 million, a decrease of 20% from the previous year. This decrease was primarily due to the repayment of long-term debt and share repurchases during the year."
]

different_queries = [
    "What was the average age of the company's employees as of December 31, 2022?",
    "How many patents did the company file in the fiscal year 2022?",
    "What was the company's market share in the European market for the fiscal year 2022?"
]

In [None]:
# Get embeddings for similar documents and queries
similar_document_results = predict_fn({"inputs": similar_documents, "instruction": passage_instruction_for_retrieval}, (model, tokenizer))
similar_query_results = predict_fn({"inputs": similar_queries, "instruction": query_instruction_for_retrieval}, (model, tokenizer))

# Get embeddings for different documents and queries
different_document_results = predict_fn({"inputs": different_documents, "instruction": passage_instruction_for_retrieval}, (model, tokenizer))
different_query_results = predict_fn({"inputs": different_queries, "instruction": query_instruction_for_retrieval}, (model, tokenizer))

# Extract the embeddings
similar_document_embeddings = similar_document_results['embeddings'].numpy()
similar_query_embeddings = similar_query_results['embeddings'].numpy()
different_document_embeddings = different_document_results['embeddings'].numpy()
different_query_embeddings = different_query_results['embeddings'].numpy()

# Calculate cosine similarity for similar document-query pairs
cosine_sim_pairs_similar = []
for doc_emb, query_emb in zip(similar_document_embeddings, similar_query_embeddings):
    cosine_sim = cosine_similarity([doc_emb], [query_emb])
    cosine_sim_pairs_similar.append(cosine_sim[0][0])

# Calculate cosine similarity for different document-query pairs
cosine_sim_pairs_different = []
for doc_emb, query_emb in zip(different_document_embeddings, different_query_embeddings):
    cosine_sim = cosine_similarity([doc_emb], [query_emb])
    cosine_sim_pairs_different.append(cosine_sim[0][0])

# Print the cosine similarity for each similar document-query pair
print("Cosine Similarity for Similar Document-Query Pairs:")
for i, sim in enumerate(cosine_sim_pairs_similar):
    print(f"Pair {i+1}: Cosine Similarity = {sim}")

# Print the cosine similarity for each different document-query pair
print("\nCosine Similarity for Different Document-Query Pairs:")
for i, sim in enumerate(cosine_sim_pairs_different):
    print(f"Pair {i+1}: Cosine Similarity = {sim}")

## Step 5: Model Evaluation

Next, we can evaluate the model and compare it to the base model (or to Titan, Cohere, etc. embedding models).

Note: For black-box API embedding models, we can evaluate Hit Rate with the below evaluate function, but we can't apply the `evaluate_st` and the `InformationRetrievalEvaluator`, since it only works for `sentence-transformers` compatible models.

### Evaluation Procedure 1: Hit Rate

The first evaluation procedure we employ is a straightforward metric known as the Hit Rate. The process is as follows:

1. We take each pair consisting of a query and its corresponding relevant document (query, relevant_doc).
2. Using the query, we retrieve the top-K documents from the system being evaluated.
3. If the retrieved results include the relevant_doc, we consider it a hit.

This simple and intuitive approach serves as the initial step in our evaluation process, allowing us to assess the performance of various embedding models, including our own open-source and fine-tuned embedding models as well as proprietary API-based embedding models.

In [None]:
from copy import deepcopy

import pandas as pd
from langchain_aws import ChatBedrock
from llama_index.core import VectorStoreIndex
from llama_index.core.schema import TextNode
from llama_index.core import Settings
from llama_index.embeddings.bedrock import BedrockEmbedding
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.langchain import LangChainLLM
from tqdm.notebook import tqdm

def evaluate(
    dataset,
    embed_model,
    top_k=1,
    query_instruction_for_retrieval = "",
    passage_instruction_for_retrieval = "",
    verbose=False,
):

    model = ChatBedrock(model_id="meta.llama3-70b-instruct-v1:0")
    Settings.llm = LangChainLLM(llm=model) # Not used, but required
    Settings.embed_model = embed_model

    ds = deepcopy(dataset)

    corpus = ds.corpus
    queries = ds.queries
    relevant_docs = ds.relevant_docs

    # Add instruction to query if instruction is provided
    for k,v in queries.items():
        queries[k] = query_instruction_for_retrieval + v
    # Add instruction to passage if instruction is provided
    for k,v in corpus.items():
        corpus[k] = passage_instruction_for_retrieval + v

    nodes = [TextNode(id_=id_, text=text) for id_, text in corpus.items()]
    index = VectorStoreIndex(nodes, show_progress=True)
    retriever = index.as_retriever(similarity_top_k=top_k)

    eval_results = []
    for query_id, query in tqdm(queries.items()):
        retrieved_nodes = retriever.retrieve(query)
        retrieved_ids = [node.node.node_id for node in retrieved_nodes]
        expected_id = relevant_docs[query_id][0]
        is_hit = expected_id in retrieved_ids  # assume 1 relevant doc

        eval_result = {
            "is_hit": is_hit,
            "retrieved": retrieved_ids,
            "expected": expected_id,
            "query": query_id,
        }
        eval_results.append(eval_result)
    return eval_results

### Evaluation Procedure 2: InformationRetrievalEvaluator

For the second evaluation procedure, we utilize the `InformationRetrievalEvaluator` from the `sentence-transformers` library. This evaluator offers a more comprehensive suite of metrics, enabling us to perform a detailed analysis of the embedding models' performance.

However, it's important to note that the `InformationRetrievalEvaluator` is only compatible with `sentence-transformers` models. This means that we can evaluate our open-source and fine-tuned embedding models using this procedure, but we cannot apply it to the proprietary API-based embedding models, such as the Bedrock Titan embedding models.

By employing the `InformationRetrievalEvaluator`, we gain access to a wide range of evaluation metrics, providing a more in-depth understanding of how our `sentence-transformers` compatible models perform in various aspects of information retrieval tasks.

This second evaluation procedure complements the hit rate metric used in the first procedure, allowing us to form a well-rounded assessment of the embedding models' capabilities and limitations.


In [None]:
from pathlib import Path

from sentence_transformers.evaluation import InformationRetrievalEvaluator
from sentence_transformers import SentenceTransformer

def evaluate_st(
    dataset,
    model_id,
    name,
    query_instruction_for_retrieval = "",
    passage_instruction_for_retrieval = "",
):
    """ Evaluate sentence-transformer models with InformationRetrievalEvaluator """

    ds = deepcopy(dataset)

    corpus = ds.corpus
    queries = ds.queries
    relevant_docs = ds.relevant_docs

    # Add instruction to query if instruction is provided
    for k,v in queries.items():
        queries[k] = query_instruction_for_retrieval + v
    # Add instruction to passage if instruction is provided
    for k,v in corpus.items():
        corpus[k] = passage_instruction_for_retrieval + v

    evaluator = InformationRetrievalEvaluator(
        queries, corpus, relevant_docs, name=name
    )
    model = SentenceTransformer(model_id)
    output_path = "results/"
    Path(output_path).mkdir(exist_ok=True, parents=True)
    return evaluator(model, output_path=output_path)

### Evaluate the model(s)

In [None]:
import os

def evaluate_models(models, val_dataset):
    results = []
    for name, model_info in models.items():
        print(f"Evaluating model: {name}")
        model = model_info['model']
        query_instruction = model_info.get('query_instruction', '')
        passage_instruction = model_info.get('passage_instruction', '')

        # For all models, evaluate using the basic hit rate evaluation function
        val_results = evaluate(val_dataset, model, query_instruction_for_retrieval=query_instruction, passage_instruction_for_retrieval=passage_instruction)
        df = pd.DataFrame(val_results)
        hit_rate = df["is_hit"].mean()

        result = {"model": name, "hit_rate": hit_rate}

        # For BGE models, additionally evaluate using sentence-transformers
        if "bge" in name.lower():
            csv_file = f"results/Information-Retrieval_evaluation_{name.lower()}_results.csv"
            if os.path.exists(csv_file):
                os.remove(csv_file)
            _ = evaluate_st(val_dataset, model.model_name, name=name, query_instruction_for_retrieval=query_instruction, passage_instruction_for_retrieval=passage_instruction)
            ir_results = pd.read_csv(csv_file, index_col=0, header=0)
            row_dict = ir_results.to_dict('records')[0]
            result.update(row_dict)

        results.append(result)

    df = pd.DataFrame(results)
    return df.sort_values('hit_rate', ascending=False)


models = {
    "titan-v1": {  # Bedrock Titan Text V1 model
        "model": BedrockEmbedding(model="amazon.titan-embed-text-v1")
    },
    "titan-v2": {  # Bedrock Titan Text V2 model
        "model": BedrockEmbedding(model="amazon.titan-embed-text-v2:0")
    },
    "bge": { # Base BGE model with no instructions
        "model": HuggingFaceEmbedding(base_model_local_dir),
    },
    "ft-bge": {  # Fine-tuned BGE model
        "model": HuggingFaceEmbedding(finetuned_model_dir),
        "query_instruction": query_instruction_for_retrieval,
        "passage_instruction": passage_instruction_for_retrieval,
    },
    "ft-mixed-bge": {  # Fine-tuned mixed BGE model
        "model": HuggingFaceEmbedding(mixed_model_output_dir),
        "query_instruction": query_instruction_for_retrieval,
        "passage_instruction": passage_instruction_for_retrieval,
    },
}

results_df = evaluate_models(models, val_dataset)

results_df

Note that better performance can be achieved by using the large version of the BGE model.

We can see from the results that the fine-tuned model has the best Hit Rate, though we may want to use
the mixed model, since it's close in performance, and should have greater generalization capability.

## Upload the Model

#### Option 1. Upload to S3

In [None]:
import sagemaker
from sagemaker.s3 import S3Uploader

sagemaker_session = sagemaker.Session()
s3_bucket = sagemaker_session.default_bucket()
s3_prefix = f"models/{base_model_local_dir}"

desired_s3_uri = f"s3://{s3_bucket}/{s3_prefix}"

s3_model_uri = S3Uploader.upload(local_path=mixed_model_output_dir, desired_s3_uri=desired_s3_uri)
print(s3_model_uri)

upload_method = "s3"

#### Option 2. Upload to Hugging Face Hub

Run the following, and paste in a valid HuggingFace Access Token with Write permission:

In [None]:
# from huggingface_hub import notebook_login

# notebook_login()

Now that we’re authenticated, we can upload our fine-tuned model:

In [None]:
# from huggingface_hub import HfApi
# from transformers import AutoModel, AutoTokenizer

# # Initialize the API
# api = HfApi()

# # Retrieve username
# hf_username = api.whoami()["name"]

# model_id = f"{hf_username}/{base_model_local_dir}"
# model = AutoModel.from_pretrained(mixed_model_output_dir)
# tokenizer = AutoTokenizer.from_pretrained(mixed_model_output_dir)

# model.push_to_hub(model_id)
# tokenizer.push_to_hub(model_id)

# upload_method = "hf_hub"

# print(model_id)

## Part 6: Model Deployment

In this section, we'll demonstrate how to deploy the model to Amazon SageMaker using a Hugging Face Text Embedding Inference Container. Text Embeddings Inference (TEI) is a high-performance toolkit for deploying and serving popular text embeddings and sequence classification models, including support for FlagEmbedding models. It will provide us with the fastest serving framework to deploy our model on SageMaker.

In [None]:
import boto3
import sagemaker
from datetime import datetime

sagemaker_session = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it does not exist
sagemaker_session_bucket = None
if sagemaker_session_bucket is None and sagemaker_session is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sagemaker_session.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sagemaker_session = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker session region: {sagemaker_session.boto_region_name}")

### Retrieve the new Hugging Face Embedding Container

In [None]:
from sagemaker.huggingface import get_huggingface_llm_image_uri

instance_type = "ml.g5.xlarge"

# retrieve the image uri based on instance type
def get_image_uri(instance_type):
  key = "huggingface-tei" if instance_type.startswith("ml.g") or instance_type.startswith("ml.p") else "huggingface-tei-cpu"
  return get_huggingface_llm_image_uri(key, version="1.2.3")

### Deploy model to Amazon SageMaker

In [None]:
import json
from sagemaker.huggingface import HuggingFaceModel

# sagemaker config
instance_type = "ml.g5.xlarge"

# Define Model and Endpoint configuration parameter
config = {
  "POOLING": "cls",
  "MAX_CONCURRENT_REQUESTS": json.dumps(512),  # The maximum amount of concurrent requests for this particular deployment.
  "MAX_BATCH_TOKENS": json.dumps(16384),  # Limits the number of tokens that can be processed in parallel during the generation
}

# Decide whether to deploy the model from S3 or Hugging Face Hub
if upload_method == "s3":
    config["HF_MODEL_ID"] = "/opt/ml/model"
elif upload_method == "hf_hub":
    model_data = None
    config["HF_MODEL_ID"] = model_id

print(config)

# create HuggingFaceModel with the image uri
emb_model = HuggingFaceModel(
  role=role,
  image_uri=get_image_uri(instance_type),
  env=config,
  model_data={'S3DataSource':{'S3Uri': s3_model_uri + "/",'S3DataType': 'S3Prefix','CompressionType': 'None'}},
)

In [None]:
resource_name = "Demo-{}-{}"

tei_endpoint = emb_model.deploy(
  initial_instance_count=1,
  instance_type=instance_type,
  endpoint_name=resource_name.format("TEI-Endpoint", datetime.now().strftime("%Y-%m-%d-%H-%M-%S")),
)

### 5. Set autoscaling (optional)

We can also add autoscaling:

In [None]:
asg = boto3.client('application-autoscaling')

# Resource type is variant and the unique identifier is the resource ID.
resource_id=f"endpoint/{tei_endpoint.endpoint_name}/variant/AllTraffic"

# scaling configuration
response = asg.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', 
    MinCapacity=1,
    MaxCapacity=4
)

response = asg.put_scaling_policy(
    PolicyName=f'Request-ScalingPolicy-{tei_endpoint.endpoint_name}',
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 10.0, # Threshold
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance',
        },
        'ScaleInCooldown': 300, # duration until scale in
        'ScaleOutCooldown': 60 # duration between scale out
    }
)

## Part 7. Model testing

Finally, we can run the model, and optionally add instructions. Note that if the model was fine-tuned with instructions for queries and/or passages, you should match what was done during fine-tuning. In our case we used instructions for queries, but not for passages, so we can do the same while performing inference.

In [None]:
query_instruction_for_retrieval = "Represent the Financial question for retrieving supporting documents: "

queries = [query_instruction_for_retrieval + query for query in similar_queries]

example = {
    "inputs": queries,
    "truncate": True,
}

results = tei_endpoint.predict(example)

# print some results
print(f"length of embeddings: {len(results[0])}")
print(f"first 5 elements of embeddings: {results[0][:5]}")

And we can get the average response time:

In [None]:
%%timeit

emb = tei_endpoint.predict(example)

Also we can test the throughput:

In [None]:
import threading
import time

number_of_threads = 10
number_of_requests = int(3900 // number_of_threads)
print(f"number of threads: {number_of_threads}")
print(f"number of requests per thread: {number_of_requests}")

def send_requests():
    for _ in range(number_of_requests):
        # input counted at https://huggingface.co/spaces/Xenova/the-tokenizer-playground for 100 tokens
        tei_endpoint.predict(data={"inputs": "Hugging Face is a company and a popular platform in the field of natural language processing (NLP) and machine learning. They are known for their contributions to the development of state-of-the-art models for various NLP tasks and for providing a platform that facilitates the sharing and usage of pre-trained models. One of the key offerings from Hugging Face is the Transformers library, which is an open-source library for working with a variety of pre-trained transformer models, including those for text generation, translation, summarization, question answering, and more. The library is widely used in the research and development of NLP applications and is supported by a large and active community. Hugging Face also provides a model hub where users can discover, share, and download pre-trained models. Additionally, they offer tools and frameworks to make it easier for developers to integrate and use these models in their own projects. The company has played a significant role in advancing the field of NLP and making cutting-edge models more accessible to the broader community. Hugging Face also provides a model hub where users can discover, share, and download pre-trained models. Additionally, they offer tools and frameworks to make it easier for developers and ma"})

# Create multiple threads
threads = [threading.Thread(target=send_requests) for _ in range(number_of_threads)]
# start all threads
start = time.time()
[t.start() for t in threads]
# wait for all threads to finish
[t.join() for t in threads]
print(f"total time: {round(time.time() - start)} seconds")

In [None]:
print(f"https://console.aws.amazon.com/cloudwatch/home?region={sagemaker_session.boto_region_name}#metricsV2:graph=~(metrics~(~(~'AWS*2fSageMaker~'ModelLatency~'EndpointName~'{tei_endpoint.endpoint_name}~'VariantName~'AllTraffic))~view~'timeSeries~stacked~false~region~'{sagemaker_session.boto_region_name}~start~'-PT5M~end~'P0D~stat~'Average~period~30);query=~'*7bAWS*2fSageMaker*2cEndpointName*2cVariantName*7d*20{tei_endpoint.endpoint_name}")

### LangChain integration

As a bonus, we can create a custom LangChain content handler and instantiate a SagemakerEndpointEmbeddings model for use with LangChain applications:

In [None]:
import json
from typing import Dict, List

from botocore.config import Config
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler

class BGEContentHandler(EmbeddingsContentHandler):
    """
    We'll create a custom LangChain content handler for the BGE model which is deployed to a SageMaker endpoint.
    """

    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, inputs: list[str], model_kwargs: Dict) -> bytes:
        """
        Transforms the input into bytes that can be consumed by SageMaker endpoint.
        Args:
            inputs: List of input strings.
            model_kwargs: Additional keyword arguments to be passed to the endpoint.
        Returns:
            The transformed bytes input.
        """
        input_str = json.dumps({"inputs": inputs, **model_kwargs})
        return input_str.encode("utf-8")

    def transform_output(self, output: bytes) -> List[List[float]]:
        """
        Transforms the bytes output from the endpoint into a list of embeddings.
        Args:
            output: The bytes output from SageMaker endpoint.
        Returns:
            The transformed output - list of embeddings
        Note:
            The length of the outer list is the number of input strings.
            The length of the inner lists is the embedding dimension.
        """
        return json.loads(output.read().decode("utf-8"))

content_handler = BGEContentHandler()

sagemaker_config = Config(connect_timeout=180, read_timeout=180, retries={"max_attempts": 30})

sagemaker_client = boto3.client(
    service_name="sagemaker-runtime",
    region_name=sagemaker_session.boto_region_name,
    config=sagemaker_config,
)

embeddings = SagemakerEndpointEmbeddings(
    endpoint_name=tei_endpoint.endpoint_name,
    client=sagemaker_client,
    content_handler=content_handler,
)

embeddings.model_kwargs = {"truncate": True}

results = embeddings.embed_documents(queries)

assert len(results) == len(queries)
assert len(results[0]) == 768 or len(results[0]) == 1024

print(results[0][:5])

In [None]:
# Get embeddings for similar documents and queries
similar_document_embeddings = embeddings.embed_documents(similar_documents)
similar_query_embeddings = embeddings.embed_documents(similar_queries)

# Get embeddings for different documents and queries
different_document_embeddings = embeddings.embed_documents(different_documents)
different_query_embeddings = embeddings.embed_documents(different_queries)

# Calculate cosine similarity for similar document-query pairs
cosine_sim_pairs_similar = []
for doc_emb, query_emb in zip(similar_document_embeddings, similar_query_embeddings):
    cosine_sim = cosine_similarity([doc_emb], [query_emb])
    cosine_sim_pairs_similar.append(cosine_sim[0][0])

# Calculate cosine similarity for different document-query pairs
cosine_sim_pairs_different = []
for doc_emb, query_emb in zip(different_document_embeddings, different_query_embeddings):
    cosine_sim = cosine_similarity([doc_emb], [query_emb])
    cosine_sim_pairs_different.append(cosine_sim[0][0])

# Print the cosine similarity for each similar document-query pair
print("Cosine Similarity for Similar Document-Query Pairs:")
for i, sim in enumerate(cosine_sim_pairs_similar):
    print(f"Pair {i+1}: Cosine Similarity = {sim}")

# Print the cosine similarity for each different document-query pair
print("\nCosine Similarity for Different Document-Query Pairs:")
for i, sim in enumerate(cosine_sim_pairs_different):
    print(f"Pair {i+1}: Cosine Similarity = {sim}")

## Clean up

Finally, when we're done with the model, we can delete it.

In [None]:
tei_endpoint.delete_model()
tei_endpoint.delete_endpoint()

## Conclusion

In this post, we walked through the process of fine-tuning a BGE embedding model using synthetic data generated from Amazon Bedrock. We covered key steps including generating high-quality synthetic data, fine-tuning the model, evaluating performance, and deploying the optimized model using Amazon SageMaker.

By leveraging synthetic data and advanced fine-tuning techniques like hard negative mining and model merging, you can significantly enhance the performance of embedding models for your specific use cases. This approach is especially valuable when real-world data is limited or difficult to obtain.

To get started, we encourage you to experiment with the code and techniques demonstrated in this post. Adapt them to your own datasets and models to unlock performance improvements in your applications. You can find all the code used in this post in our GitHub repository.

Related Links:

* [LlamaIndex Finetune Embeddings Example](https://docs.llamaindex.ai/en/stable/examples/finetuning/embeddings/finetune_embedding/)
* [BAAI/bge-base-en-v1.5](https://huggingface.co/BAAI/bge-base-en-v1.5)
* [FlagEmbedding](https://github.com/FlagOpen/FlagEmbedding)
* [LM-Cocktail](https://github.com/FlagOpen/FlagEmbedding/tree/master/LM_Cocktail)
* [AWS Machine Learning Blog](https://aws.amazon.com/blogs/machine-learning/)
* [GitHub Repository](URL_TO_GITHUB_REPOSITORY)