# OpenSearch indexing

ETL pipeline is done - but indexing appears painfully slow. From some quick observations with htop and bmon, the bottleneck seems to be OpenSearch calculating the embeddings. But, that's an assumption we can test. Let's make some performance measurements and see what we are working with.

**Estimated total run time**: extraction + cleaning + splitting + embedding + indexing = approximatly 5 days

**TLDR**
1. GPU support in OpenSearch is not great and using the built in text embedding processor as part of an ingest pipeline on CPU is prohibitively slow, about 200 days to process the whole corpus.
2. Calculating the embeddings with HuggingFace transformers is manageable - about 4 days for a single worker.
3. Indexing pre-calculated embeddings into a KNN index is comparatively fast - about 6 hours with a bulk insert batch size of 256.

## 1. Run set-up

### 1.1. Imports

In [None]:
# Change working directory to parent so we can import as we would from __main__.py
print(f'Working directory: ', end = '')
%cd ..

# Standard imports
import time
import random

# PyPI imports
import h5py
import torch
import numpy as np
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModel

# Internal imports
import configuration as config
import functions.notebook_helper as helper_funcs

### 1.2. Notebook parameters

In [2]:
# Which tests to run
run_indexing_rate_benchmark=False
run_bulk_indexing_rate_benchmark=False
pre_embedding_indexing_rate=True

# Estimated total chunks after semantic splitting, determined in
# semantic splitting notebook
estimated_total_chunks=20648877

# Where to save plots
figure_dir='./notebooks/figures/03-opensearch_indexing'

### 1.3. OpenSearch initialization

Let's make two indexes for comparison: one KNN index with an embedding ingest pipeline and one vanilla text index.

In [3]:
# Name for the test indexes to insert to
knn_index_name='wikipedia-knn'
text_index_name='wikipedia-text-only'

# Initialize target indexes for text only and vector KNN
helper_funcs.initialize_index(knn_index_name,helper_funcs.KNN_INDEX_BODY)
helper_funcs.initialize_index(text_index_name, helper_funcs.TEXT_INDEX_BODY)

# Initialize the OpenSearch client
client=helper_funcs.start_client()

### 1.5. Data loading

Let's pre-ingest the data into a list.

In [None]:
# Open a connection to transformed Wikipedia data on disk
input_file_path=f'{config.DATA_PATH}/wikipedia-sample/{config.PARSED_TEXT}'
input_data=h5py.File(input_file_path, 'r')

# Holder for the data
records=[]

# Loop on the input batches
for i in input_data['batches']:

    # Load the batch
    batch=input_data[f'batches/{i}']

    # Loop on the records in the batch
    for record in batch:

        # Collect the text
        records.append(record )

print(f'Have {len(records)} input records')

# Close data connection
input_data.close()

## 2. Indexing rate: text only vs OpenSearch KNN embedding

Test batch replicates, time how long it takes to insert each record in the batch and then average the indexing time across the batch.

### 2.1. Benchmark specific parameters

In [5]:
# Number of batch replicates to use for indexing rate estimation
replicates=10

# Number of records to sample for each batch
batch_size=10

# Collectors for indexing rates
knn_indexing_rates=[]
text_indexing_rates=[]

### 2.2. Benchmark

Collect insert performance data for text only and KNN embedding indexes at the same time by inserting to both in the same loop.

In [None]:
%%time

if run_indexing_rate_benchmark is True:

    # Loop on the replicates
    for replicate in range(replicates):

        batch=random.sample(records, batch_size)

        # Loop on the records in the batch
        for record_num, record in enumerate(batch):

            # Decode the text
            text=record.decode('utf-8')

            # Build the requests
            knn_request=[]
            text_request=[]

            knn_request_header={'create': {'_index': knn_index_name,'_id': record_num}}
            text_request_header={'create': {'_index': text_index_name,'_id': record_num}}

            knn_request.append(knn_request_header)
            text_request.append(text_request_header)

            request_body={'text': text}
            
            knn_request.append(request_body)
            text_request.append(request_body)

            # Insert the record using the bulk interface. We are
            # indexing records one at a time here, but use bulk 
            # for consistency

            # Start the timer
            start_time=time.time()

            _=client.bulk(knn_request)

            # Stop the timer and collect the time
            dT=time.time() - start_time
            knn_indexing_rates.append(dT)

            # Start the timer
            start_time=time.time()

            _=client.bulk(text_request)

            # Stop the timer and collect the time
            dT=time.time() - start_time
            text_indexing_rates.append(1 / dT)


### 2.3. Results

Now, make a plot of the timing results and estimate how long it would take to index the complete Wikipedia corpus with either indexing method.

In [7]:
if run_indexing_rate_benchmark is True:
    plt.title('Indexing rate benchmark: index type')

    # Use the same set of bins for each dataset
    _, bins = np.histogram(knn_indexing_rates + text_indexing_rates, bins=30)

    plt.hist(
        knn_indexing_rates,
        facecolor='green',
        label='KNN embedding index',
        bins=bins
    )

    plt.hist(
        text_indexing_rates,
        facecolor='blue',
        label='Text only index',
        bins=bins
    )

    plt.legend(loc = 'upper right')
    plt.xlabel(f'Rate (records per second)')
    plt.ylabel('Count')
    plt.savefig(f'{figure_dir}/2.3-indexing_rate_index_type.jpg')
    plt.show()

    mean_text_insert_rate=sum(text_indexing_rates)/len(text_indexing_rates)
    estimated_total_text_only_insert_time=estimated_total_chunks / mean_text_insert_rate
    print(f'Estimated total text only indexing time: {estimated_total_text_only_insert_time / (60*60):.1f} hours')
    print(f'Mean text only indexing rate: {mean_text_insert_rate:.0f} records per second\n')

    mean_knn_insert_rate=sum(knn_indexing_rates)/len(knn_indexing_rates)
    estimated_total_knn_only_insert_time=estimated_total_chunks / mean_knn_insert_rate
    print(f'Estimated total KNN indexing time: {estimated_total_knn_only_insert_time / (60*60*24):.0f} days')
    print(f'Mean KNN indexing rate: {mean_knn_insert_rate:.3f} records per second\n')

Yikes, that's not going to work. Looking at the difference in insert rates, it is clear that we were right about the bottleneck - definitely the embedding calculation/KNN indexing. So, a few ideas here.

1. Though the GPUs are available inside the OpenSearch node Docker containers, OpenSearch is not using them... we should troubleshoot that.
2. We can insert batches with the bulk interface - if that means OpenSearch is calculating the embeddings in parallel, using a larger batch size might speed things up.
3. We could use multiple insert workers - just looking at htop, we are using less than 50% of available CPU resources while inserting, so we should be able to go faster.
4. Last would be reading up on OpenSearch cluster management in general to see if there are settings we should be tuning for this type of thing.

Think that getting the GPUs online is going to be the only real solution here. Given the amount of CPU we are already using while making/inserting embeddings we probably can't speed up much more than two-fold through parallelism alone.

Let's at least try bigger bulk batches first.

## 3. Insert rate: bulk insert batch size

Run a few replicates of increasing batch size. Time indexing records in each batch replicate using OpenSearch's bulk interface. Calculate the indexing rate as batch size divided by batch time.

### 3.1. Benchmark specific parameters

In [8]:
# Bulk insert batch sizes to test
batch_sizes=[1,4,16,64,256]

# Replicates for each batch size
replicates=10

# Holder for results
results={}

### 3.2. Benchmark

Only need to test the KNN index rates here - that's what we are really after.

In [None]:
%%time

if run_bulk_indexing_rate_benchmark is True:

    # Reinitialize the KNN index
    helper_funcs.initialize_index(knn_index_name, helper_funcs.KNN_INDEX_BODY)

    # Loop on the batch sizes
    for batch_size in batch_sizes:
        print(f'Running replicates for batch size {batch_size}')

        # Add an empty list to collect the results, using the batch size as key
        results[f'{batch_size}']=[]

        for replicate in range(replicates):

            batch=random.sample(records, batch_size)

            # Loop on the records in the batch to build the indexing requests
            knn_requests=[]

            for record_num, record in enumerate(batch):

                knn_request_header={'create': {'_index': knn_index_name,'_id': record_num}}
                knn_requests.append(knn_request_header)

                # Record text comes from hdf5 data store as bytes, decode it
                text=record.decode('utf-8')

                request_body={'text': text}
                knn_requests.append(request_body)

                # Insert the record using the bulk interface. We are
                # indexing records one at a time here, but use bulk 
                # for consistency

                # Start the timer
                start_time=time.time()

                _=client.bulk(knn_requests)

                # Stop the timer and collect the insert rate
                dT=time.time() - start_time
                results[f'{batch_size}'].append(batch_size / dT)

                # Reset the bulk insert batch for the next round
                bulk_insert_batch=[]

### 3.3. Results

Plot indexing rate as a function of bulk indexing batch size.

In [10]:
if run_bulk_indexing_rate_benchmark is True:
    
    plt.title('Indexing rate benchmark: OpenSearch text embedding ingest pipeline')
    plt.xlabel('Bulk insert batch size (records)')
    plt.ylabel('Rate (records per second)')

    standard_deviations=[]
    means=[]

    for batch_size in batch_sizes:
        times=results[f'{batch_size}']
        means.append(np.mean(times))
        standard_deviations.append(np.std(times))

    plt.errorbar(
        batch_sizes, 
        means, 
        yerr=standard_deviations, 
        linestyle='dotted',
        marker='o', 
        capsize=5
    )

    plt.yscale('log')
    plt.xscale('log')
    plt.savefig(f'{figure_dir}/3.3-indexing_rate_bulk_insert.jpg')
    plt.show()

    mean_indexing_rate=sum(results['256']) / len(results['256'])
    print(f'Estimated total indexing time: {(estimated_total_chunks / mean_indexing_rate) / (60*60*24):.1f} days for batch size of 256')
    print(f'Mean indexing rate: {mean_indexing_rate:.0f} records per second for batch size of 256')


OK, cool, looking at the graph, we can get a speed-up of on the order or 10x by increasing the bulk insert batch size. The gains start to saturate around a batch size of 64. While it is the ~10x speed up over single inserts to the KNN index that we were expecting - it is still going to take over three weeks to index the embeddings for all of Wikipedia. No more benefit to be had from parallelism here - CPU is pinned at 100% utilization on all cores. In fact, I'd like to cut it back, so we can use the machine for other things during the run. Either way, we need another solution.

Two options I can think of:

1. Get the GPUs working with OpenSearch in Docker.
2. Pre-calculate the embeddings ourselves using the GPUs and then index them to OpenSearch.

After some more reading, it looks like we need to go with option 2. Turns out, GPU support is experimental and requires CUDA 11.6, and it's recommended to run on an Amazon EC2 instance with Neuron. Option 2 is much more flexible and will give us more control over the embedding calculation.

## 4. Embedding calculation rate: HF transformers

Let's try it with the same model we were using in OpenSearch following the basic instructions in the [HuggingFace model card](https://huggingface.co/sentence-transformers/msmarco-distilbert-base-tas-b). Looks like the model can take a list of document texts for embedding, so let's make the first experiment the embedding batch size.

The plan will be to time how long it takes to embed some sample size of texts using different batch sizes. We won't include the model loading or other start-up overhead in the time - during the real run, we will be doing many hours of GPU compute per workunit so any startup time is insignificant in comparison.

### 4.1. Benchmark specific parameters

In [11]:
# Model to run and where to run it
model_name='sentence-transformers/msmarco-distilbert-base-tas-b'
gpu='cuda:0'

# Number texts to encode for each replicate of each batch size
target_texts=3200

# Batch sizes to test
batch_sizes=[1,2,4,8,16,32]

# Number of replicates to run for each batch size
replicates=3

# Holder for results
results={}

### 4.2. Benchmark

In [None]:
%%time

if pre_embedding_indexing_rate is True:

    # Load model from HuggingFace Hub
    tokenizer=AutoTokenizer.from_pretrained(model_name)
    model=AutoModel.from_pretrained(model_name, device_map=gpu)

    # Loop on batch sizes
    for batch_size in batch_sizes:
        print(f'Running batch size {batch_size}')

        # Add an empty list to collect the results, using the batch size as key
        results[f'{batch_size}']=[]

        # Calculate how many batches of batch size we need 
        # to get the target number of texts
        num_batches=target_texts // batch_size

        # Loop on the replicates
        for replicate in range(replicates):

            # Generate random batches of texts for this replicate
            batches=[]

            for i in range(num_batches):

                batches.append(random.sample(records, batch_size))

            # Collector for embedded texts
            embedded_texts=[]

            # Start the timer
            start_time=time.time()

            # Loop on the batches and embed them
            for batch in batches:

                # Decode each record in the batch
                texts=[record.decode('utf-8') for record in batch]

                # Tokenize the sample
                encoded_input=tokenizer(
                    texts,
                    padding=True,
                    truncation=True,
                    return_tensors='pt'
                ).to('cuda:0')

                # Compute token embeddings
                with torch.no_grad():
                    model_output=model(**encoded_input, return_dict=True)

                # Perform pooling
                embeddings=model_output.last_hidden_state[:,0]

                # Collect the embeddings
                embedded_texts.extend(embeddings.tolist())

            # Stop the timer and collect the timing data
            dT=time.time() - start_time
            results[f'{batch_size}'].append(len(embedded_texts) / dT)

    print()

### 4.3. Results

Batch size of 128 runs, at batch size of 256 we get OOMs on the 8 GB GTX1070.

In [None]:
if pre_embedding_indexing_rate is True:

    plt.title('Embedding rate benchmark: batch size')
    plt.xlabel('Batch size (records)')
    plt.ylabel('Embedding calculation rate (records per second)')

    standard_deviations=[]
    means=[]

    for batch_size in batch_sizes:
        times=results[f'{batch_size}']
        means.append(np.mean(times))
        standard_deviations.append(np.std(times))

    plt.errorbar(
        batch_sizes, 
        means, 
        yerr=standard_deviations, 
        linestyle='dotted',
        marker='o', 
        capsize=5
    )

    plt.savefig(f'{figure_dir}/4.3-embedding_rate_batch_size.jpg')
    plt.show()

    mean_embedding_rate=sum(results['1']) / len(results['1'])
    print(f'Estimated total embedding time: {(estimated_total_chunks / mean_embedding_rate) / (60*60*24):.1f} days for batch size of 1')
    print(f'Mean embedding rate: {mean_embedding_rate:.0f} records per second for batch size of 1')


Ok, interesting. Not the result I was expecting, but useful none-the-less. Using larger batch sizes slows down the net embedding rate. We can definitely embed texts much faster this way than using an OpenSearch ingest pipeline, especially using a batch size of one.

Remember: we still need to insert the embeddings. Let's hope the performance of batch inserting pre-calculated embeddings to a KNN index is more similar to that of indexing plain text than using an embedding ingest pipeline...

## 5. KNN indexing rate: pre-calculated embeddings

### 5.1. Benchmark specific parameters

In [14]:
# Bulk insert batch sizes to test
batch_sizes=[1,2,4,8,16,32,64,128,256]

# Replicates for each batch size
replicates=3

# Holder for results
results={}

### 5.2. Benchmark

In [None]:
%%time

if pre_embedding_indexing_rate is True:

    # Initialize a KNN index without an OpenSearch ingest pipeline
    helper_funcs.initialize_index(knn_index_name, helper_funcs.PRE_EMBEDDED_KNN_INDEX)

    # Loop on batch sizes
    for batch_size in batch_sizes:
        print(f'Running replicates for batch size {batch_size}')

        # Add the batch size to the results
        results[f'{batch_size}']=[]

        # Start an empty collector for the insert batch
        bulk_insert_batch=[]

        for replicate in range(replicates):

            # Grab a random batch of texts to embed
            embedding_batch=random.sample(embedded_texts, batch_size)

            # Build the requests
            knn_requests=[]

            for record_num, embedding in enumerate(embedding_batch):

                knn_request_header={'create': {'_index': knn_index_name,'_id': record_num}}
                knn_requests.append(knn_request_header)

                request_body={'text_embedding': embedding}
                knn_requests.append(request_body)

            # Insert the record using the bulk interface. We are
            # indexing records one at a time here, but use bulk 
            # for consistency

            # Start the timer
            start_time=time.time()

            _=client.bulk(knn_requests)

            # Stop the timer and collect the insert rate
            dT=time.time() - start_time
            results[f'{batch_size}'].append(batch_size / dT)

    print()

### 5.3. Results

In [None]:
if pre_embedding_indexing_rate is True:
    
    plt.title('Indexing rate benchmark: KNN index, pre-calculated embeddings')
    plt.xlabel('Batch size (records)')
    plt.ylabel('Insert rate (records per second)')

    standard_deviations=[]
    means=[]

    for batch_size in batch_sizes:
        times=results[f'{batch_size}']
        means.append(np.mean(times))
        standard_deviations.append(np.std(times))

    plt.errorbar(
        batch_sizes, 
        means, 
        yerr=standard_deviations, 
        linestyle='dotted',
        marker='o', 
        capsize=5
    )

    plt.yscale('log')
    plt.xscale('log')
    plt.savefig(f'{figure_dir}/5.3-indexing_rate_pre-embedded.jpg')
    plt.show()

    mean_indexing_rate=sum(results['256']) / len(results['256'])
    print(f'Estimated total indexing time: {(estimated_total_chunks / mean_indexing_rate) / (60*60):.1f} hours for batch size of 256')
    print(f'Mean indexing rate: {mean_indexing_rate:.0f} records per second for batch size of 256')

Nice! OK, I think we have it - about four days to calculate the embeddings and four and a half hours to index them is way better than over 200 days using an OpenSearch text embedding pipeline. We probably can speed up the embedding calculation a bit more by parallelizing it over the GPUs.

## 6. Conclusion