# Selecting an embedding model for your custom data

In a recent blog post, ["Understanding embedding models: make an informed choice for your RAG"](https://unstructured.io/blog/understanding-embedding-models-make-an-informed-choice-for-your-rag), we have explored what you need to know in order to navigate the [Hugging Face MTEB leaderboard](https://huggingface.co/spaces/mteb/leaderboard) effortlessly and select a baseline text embedding model. 

You are likely to find more than one candidate model that meets your criteria. In this case, you should evaluate the candidates on your own data. Good performance on academic benchmarks is one thing, but your custom data has its own nuances, domain-specific language, and other unique traits. In this example  we'll build an end-to-end data processing pipeline with Unstructured Serverless API, and will show how to incorporate a model evaluation step into it.   

We'll be comparing the performance of three embedding models from the MTEB leaderboard: [BAAI/bge-large-en-v1.5](https://huggingface.co/BAAI/bge-large-en-v1.5), [mukaj/fin-mpnet-base](https://huggingface.co/mukaj/fin-mpnet-base), and [Snowflake/snowflake-arctic-embed-l](https://huggingface.co/Snowflake/snowflake-arctic-embed-l) on financial data, specifically, annual reports (Form-10K) that are standard in the US. 

This notebook covers:
* Preprocessing data from PDFs
* Generating a synthetic question-answer dataset
* Measuring models' performance
* Completing the data preprocessing pipeline with the embedding step using the best model

## Install the required dependencies

In this notebook, we'll be using a local `Llama3.1:8b` model via Ollama to generate a synthetic dataset. 
Go to https://ollama.com and download the app for your OS, then pull the model onto your local machine.

```bash
ollama pull llama3.1:8b
```

Alternatively, we show how to use Claude Sonnet 3.5. To switch to Anthropic, acquire your key, and add `anthropic` to pip installs. 

Next, let's install the libraries that we will be using: 

* `unstructured` & `unstructured-ingest` for preprocessing documents. 
* `python-dotenv` to load the environment variables from a `.env` file 
* `chromadb` and `langchain` to set up retrievers with different embedding models
* `ollama` to prompt an LLM to generate a synthetic evaluation dataset


To use this example, you'll need to get an [Unstructured API key](https://unstructured.io/api-key-hosted). The Unstructured Serverless API comes with a 14-day trial capped at 1000 pages per day. 

In [None]:
!pip install -qU "unstructured-ingest[pdf, embed-huggingface]" unstructured python-dotenv langchain chromadb ollama

## Load the environment variables

Store your environment variables, such as `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_URL` in a `.env` file, then load them here. 

In [24]:
import os
import dotenv

dotenv.load_dotenv('.env')

True

## Preprocess PDFs from a source location

The data we use in this example is stored in 2 large PDF files (feel free to substitute with your own data). These are annual financial reports (Form 10-K) for the year 2023 from two large companies - Walmart Inc., and Exxon Mobil Corporation. These documents are publicly available on these companies' respective websites. We'll use them as an example of domain-specific data (financial industry). 

As we don't have the actual user queries to evaluate retrieval performance, the next best thing is to generate an evaluation dataset from custom data. For this, we will first need to preprocess the PDFs. First, let's do the necessary imports: 

In [17]:
import json
import ollama
# import anthropic
import pandas as pd

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig

from unstructured.staging.base import elements_from_json
from unstructured.staging.base import elements_to_dicts
from unstructured.staging.base import dict_to_elements
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langchain.vectorstores import utils as chromautils
from langchain.embeddings import HuggingFaceEmbeddings

To process the PDFs from a local directory, set up Unstructured ingest pipeline with a local source connector, and a local destination connector. Unstructured supports dozens of source and destination connectors, so you can easily modify this pipeline to ingest documents from an S3 bucket, or Azure blob storage, or Google Drive, or any of the other supported sources. 
At this stage, let's keep the destination local. Once we're done evaluating models we can modify and re-run the pipeline to add an embedding step and a vector store as a destination. 

The Unstructured processing pipeline can be assembled from a number of configurations: 

In [4]:
Pipeline.from_configs(
    context=ProcessorConfig(
        verbose=True,
        tqdm=True,
        num_processes=5
    ),
    indexer_config=LocalIndexerConfig(input_path="PDFS"),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_URL"),
        strategy="fast", # for complex image-based PDFs replace this with "hi_res"
        additional_partition_args={
                "split_pdf_page": True,
                "split_pdf_allow_failed": True,
                "split_pdf_concurrency_level": 15
            }
        ),
    chunker_config=ChunkerConfig(
        chunking_strategy="by_title",
        chunk_max_characters=1500,
        chunk_overlap = 150,
        ),
    uploader_config=LocalUploaderConfig(output_dir="local-ingest-output")
).run()

2024-08-19 09:05:16,033 MainProcess INFO     Created index with configs: {"input_path": "PDFS", "recursive": false}, connection configs: {"access_config": {}}
2024-08-19 09:05:16,033 MainProcess INFO     Created download with configs: {"download_dir": null}, connection configs: {"access_config": {}}
2024-08-19 09:05:16,034 MainProcess INFO     Created partition with configs: {"strategy": "fast", "ocr_languages": null, "encoding": null, "additional_partition_args": {"split_pdf_page": true, "split_pdf_allow_failed": true, "split_pdf_concurrency_level": 15}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructuredapp.io/general/v0/general", "partition_by_api": true, "api_key": "*******", "hi_res_model_name": null}
2024-08-19 09:05:16,034 MainProcess INFO     Created chunk with configs: {"chunking_strategy": "by_title"

* `ProcessorConfig` describes general behavior such as logs verbosity, number of processes, etc.
* `LocalIndexerConfig`, `LocalDownloaderConfig`, and `LocalConnectionConfig` control data ingestion from a local source, you only need to provide a path to your local directory with PDFs here.
* `PartitionerConfig`: use it to supply your credentials for the Unstructured Serverless API, and customize the partitioning behavior, e.g. what partitioning strategy to use, whether to exclude some types of metadata, etc. In this case, we use `fast` strategy to partition the files, as the PDFs are not complex and contain text only.
* `ChunkerConfig`: after partitioning we will chunk the documents into meaningful sized chunks that are not exceeding the input size of all the embedding models we'll be evaluating.
* `LocalUploaderConfig`: specify a local directory to load the processed files into.   

## Create an evaluation dataset

Once we have preprocessed the documents into chunks, let's build a synthetic evaluation dataset. To load all the processed files from the output directory, we can use the `elements_from_json` function for each JSON file:

In [18]:
def load_processed_files(directory_path):
    """
    Reads all preprocessed data from JSON files in the given directory and returns elements as a list

    Args:
        :param directory_path: The path to the directory containing JSON files.
    """
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith('.json'):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_to_dicts(elements_from_json(filename=file_path))) 
            except IOError:
                print(f"Error: Could not read file {filename}.")
    
    return elements

In [19]:
elements = load_processed_files("local-ingest-output")

len(elements)

1082

Let's add a helper function that will parse LLM's responses into a dictionary, and add `context` (chunk content) and `chunk_id` of the chunk the question is based on, so that we could later see whether we retrieve the chunk or not:

In [20]:
def convert_qa_string_to_dict(input_string, chunk_id, chunk_text):
    """
    Converts a string response from an LLM to a Python dictionary with question-answer-context entries.
    
    Args: 
        :param input_string: The LLM's response string.
        :param chunk_id: Chunk id for the chunk the questions were generated from.
        :param chunk_text: Original text of the chunk the questions were generated from.        
    """
    try:
        result = json.loads(input_string)
        questions = result["questions"]
        for question in questions:
            question['id'] = chunk_id
            question['context'] = chunk_text
        return questions
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return []


For the synthetic evaluation dataset, we'll go over the chunks, and for each chunk we'll prompt the local `llama3.1:8b` model to generate two question/answer pairs.  

In [37]:
def generate_chunk_qa_pairs(element):
    """
    Uses a local LLM to generate two question-answer pairs for a chunk, then 
    parses the string response to a Python dictionary.
    
    Args: 
        element: document element from a json file containing a single chunk
    """
    
    prompt = """
    You are an assistant specialized in RAG tasks. \n
    The task is the following: given a document chunk, you will have to
    generate questions that can be asked by a user to retrieve information from
    a large documentary corpus. \n
    The question should be relevant to the chunk, and should not be too specific
    or too general. The question should be about the subject of the chunk, and
    the answer needs to be found in the chunk. \n

    Remember that the question is asked by a user to get some information from a
    large documentary corpus. \n

    Generate a question that could be asked by a user without knowing the existence and the content of the corpus. \n
    Also generate the answer to the question, which should be found in the
    document chunk.  \n
    Generate TWO pairs of questions and answers per chunk in a
    dictionary with the following format, your answer should ONLY contain this dictionary, NOTHING ELSE: \n
    {
        "questions": [
            {
                "question": "XXXXXX",
                "answer": "YYYYYY",
            },
            {
                "question": "XXXXXX",
                "answer": "YYYYYY",
            },
        ]
    }
    where XXXXXX is the question, YYYYYY is the corresponding answers that could be as long as needed. \n
    Note: If there are no questions to ask about the chunk, return an empty list.
    Focus on making relevant questions concerning the page. \n
    Here is the chunk: \n
"""    

    response = ollama.generate('llama3.1:8b', prompt + element['text'])
    return convert_qa_string_to_dict(response['response'], element['element_id'], element['text'])

    # replace with the following if you want to switch to Claude3.5-sonnet
    # client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
    # response = client.messages.create(
    #     model="claude-3-5-sonnet-20240620",
    #     max_tokens=1024,
    #     messages=[
    #         {"role": "user", "content": prompt + element['text']}
    #     ]
    # )
    # return convert_qa_string_to_dict(response.content[0].text, element['element_id'], element['text'])
        

In [22]:
def generate_qa_pairs_dataset(elements):
    """
    Creates a dataset of question-answer-context pairs from a dictionary with elements.
    
    Args: 
        :param elements: document element from a json file containing a single chunk
    """    
    
    dataset = []
    for el in elements:        
        dataset.extend(generate_chunk_qa_pairs(el))        
    return dataset

Finally, let's generate the dataset. 
Running the following cell can take a long time depending on your hardware, model you use, how large your documents are and how many of them you have. You may also see a few JSON parsing errors, that's ok, that means that some LLM responses were not a correct JSON. In our experiments, there was a negligible amount of them.

In [None]:
eval_dataset = generate_qa_pairs_dataset(elements)

Let's save the dataset as a CSV file locally.

In [10]:
def save_dataset_as_csv(dict_list, output_file):
    """
    Saves a list of dictionaries with QA pairs as a CSV file.
    """
    
    df = pd.DataFrame(dict_list)
    df = df[df['question'].notna()]
    df.to_csv(output_file, index=False)
    print(f"DataFrame saved to {output_file}")

save_dataset_as_csv(eval_dataset, "qa_pairs_dataset.csv")

DataFrame saved to qa_pairs_dataset.csv


## Set up retrievers and collect responses to questions

Now that we have an evaluation dataset, we can set up a retriever with each of the embedding models, and retrieve results for each of the question in the evaluation dataset - `setup_and_query_rag()`does just that. 

In [9]:
def setup_and_query_rag(embedding_model, documents, eval_dataset, output_directory, n_to_retrieve=10):
    
    elements = load_processed_files(documents)
    staged_elements = dict_to_elements(elements)

    documents = []

    for element in staged_elements:
        metadata = element.metadata.to_dict()
        metadata['element_id'] = element._element_id
        del metadata['orig_elements']
        documents.append(Document(page_content=element.text, metadata=metadata))

    documents = chromautils.filter_complex_metadata(documents)
    db = Chroma.from_documents(documents, HuggingFaceEmbeddings(model_name=embedding_model))
    retriever =  db.as_retriever(search_type="similarity", search_kwargs={"k": n_to_retrieve})
    
    df = pd.read_csv(eval_dataset)
    df = df[df['question'].notna()]
    questions = df["question"].to_list()
    
    results = []
    for question in questions:
        try:
            retrieved_documents = retriever.invoke(question)
            retrieved_ids = [doc.metadata['element_id'] for doc in retrieved_documents]
            results.append({"question": question, "retrieved_ids": retrieved_ids})
        except:
            print(f"Skipped question: {question}")

    os.makedirs(output_directory, exist_ok=True)
    file_path = os.path.join(output_directory, f"{embedding_model.replace('/', '@')}-{n_to_retrieve}.csv")

    df = pd.DataFrame(results)
    df.to_csv(file_path, index=False)
    print(f"DataFrame saved to {file_path}")
    db.delete_collection()

In [14]:
models = ["BAAI/bge-large-en-v1.5", "mukaj/fin-mpnet-base", "Snowflake/snowflake-arctic-embed-l"]

for model in models:
    setup_and_query_rag(model, "local-ingest-output", "qa_pairs_dataset.csv", "retriever_results")

DataFrame saved to retriever_results/BAAI@bge-large-en-v1.5-10.csv
DataFrame saved to retriever_results/mukaj@fin-mpnet-base-10.csv
DataFrame saved to retriever_results/Snowflake@snowflake-arctic-embed-l-10.csv


## Calculate the metrics and compare the results

Once you have the results from each of the retrievers, let's calculate some metrics. 
In this example, we'll use two metrics: Recall@K, and MRR. 

Since the evaluation dataset has one relevant chunk per question, the average Recall@K will tell us how often this chunk was retrieved _at all_ in the K retrieved documents. The value of 1 would mean that we retrieved the relevant chunk for every question (without taking into account its position in the list of retrieved chunks), the value of 0 would mean that the relevant chunk was never retrieved for any question.

The average MRR (Mean reciprocal rank) will tell us the average position of the relevant chunk in the list of retrieved chunks, e.g. mrr = 1 would mean it was always the first result, mrr = 1/2 would mean it was second, etc.    

In [19]:
def calculate_retrieval_metrics(evaluation_data: pd.DataFrame, retrieval_results: pd.DataFrame, top_k=10):
    eval_list = evaluation_data.to_dict('records')
    retrieval_list = retrieval_results.to_dict('records')
    recall = []
    ranks = []

    for item in retrieval_list:
        question = item["question"]        
        retrieved_ids = eval(item["retrieved_ids"])[:top_k]
        
        for eval_point in eval_list:
            if eval_point['question'] == question:
                correct_id = eval_point["id"]

        if correct_id in retrieved_ids:
            recall.append(1)
            rank = retrieved_ids.index(correct_id) + 1
            ranks.append(1 / rank)  
        else:
            recall.append(0)
            ranks.append(0)
        
    # Calculate average metrics
    avg_recall = sum(recall) / len(retrieval_list)
    mrr = sum(ranks) / len(retrieval_list)
    metrics = {
        'Recall': avg_recall,
        'MRR': mrr, 
    }
        
    return metrics

In [20]:
eval_dataset = pd.read_csv("qa_pairs_dataset.csv")

directory_with_retrieval_results = "retriever_results"
k = 10
all_metrics = dict()

for filename in os.listdir(directory_with_retrieval_results):
    if filename.endswith('.csv'):
        file_path = os.path.join(directory_with_retrieval_results, filename)
        try:
            model_name = filename[:-4].rsplit('-', 1)[0].replace('@', '/')
            retrieval_results = pd.read_csv(file_path)
            all_metrics[model_name] = calculate_retrieval_metrics(eval_dataset, retrieval_results, top_k=k) 
        except IOError:
            print(f"Error: Could not read file {filename}.")

In [21]:
all_metrics

{'Snowflake/snowflake-arctic-embed-l': {'Recall': 0.3502610346464167,
  'MRR': 0.20764157268666042},
 'BAAI/bge-large-en-v1.5': {'Recall': 0.8794494542002848,
  'MRR': 0.6415374677002579},
 'mukaj/fin-mpnet-base': {'Recall': 0.8239202657807309,
  'MRR': 0.5528548074822408}}

 
* 87.9% of the time the relevant id is retrieved. 
* On average, the relevant id is approximately between the first and second position (1/1.5 = 0.67, and we got 0.64)  

In [22]:
model_with_max_recall = max(all_metrics, key=lambda k: all_metrics[k]['Recall'])
model_with_max_recall

'BAAI/bge-large-en-v1.5'

## Complete the preprocessing pipeline with an embedding and upload steps

The results of partitioning and chunking are already cached, so by adding an embedding configuration to the pipeline we the pipeline will pick up at the embedding step, and won't re-process the documents from scratch.

In [23]:
Pipeline.from_configs(
    context=ProcessorConfig(
        verbose=True,
        tqdm=True,
        num_processes=20,
    ),
    indexer_config=LocalIndexerConfig(input_path="PDFS"),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_URL"),
        strategy="fast",
        additional_partition_args={
                "split_pdf_page": True,
                "split_pdf_allow_failed": True,
                "split_pdf_concurrency_level": 15
            }
        ),
    chunker_config=ChunkerConfig(
        chunking_strategy="by_title",
        chunk_max_characters=1500,
        chunk_overlap = 150,
        ),
    embedder_config=EmbedderConfig(
        embedding_provider="langchain-huggingface",
        embedding_model_name=model_with_max_recall, # use the model with the highest recall
    ),
    uploader_config=LocalUploaderConfig(output_dir="outputs-with-embeddings") # Changing the output location
).run()

2024-08-19 18:39:10,872 MainProcess INFO     Created index with configs: {"input_path": "PDFS", "recursive": false}, connection configs: {"access_config": {}}
2024-08-19 18:39:10,873 MainProcess INFO     Created download with configs: {"download_dir": null}, connection configs: {"access_config": {}}
2024-08-19 18:39:10,874 MainProcess INFO     Created partition with configs: {"strategy": "fast", "ocr_languages": null, "encoding": null, "additional_partition_args": {"split_pdf_page": true, "split_pdf_allow_failed": true, "split_pdf_concurrency_level": 15}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructuredapp.io/general/v0/general", "partition_by_api": true, "api_key": "*******", "hi_res_model_name": null}
2024-08-19 18:39:10,875 MainProcess INFO     Created chunk with configs: {"chunking_strategy": "by_title"