In [None]:
!pip install -r requirements.txt
!pip install flash-attn --no-build-isolation
!huggingface-cli download jinaai/jina-embeddings-v3 --local-dir ./models/jina-embeddings-v3

In [None]:
from torch.cuda import is_available as is_cuda_available
from transformers import AutoModel

# from xlm_roberta.modeling_lora import XLMRobertaLoRA

model_folder = './models/jina-embeddings-v3/'

# Initialize the model
model = AutoModel.from_pretrained(model_folder, trust_remote_code=True, use_flash_attn=False)
# model: XLMRobertaLoRA

if is_cuda_available():
    model.to('cuda')

In [4]:
texts = [
    "sample text",
    "Look at her face",
    "a love song",
    "a sad song",
]

# When calling the `encode` function, you can choose a `task` based on the use case:
# 'retrieval.query', 'retrieval.passage', 'separation', 'classification', 'text-matching'
# Alternatively, you can choose not to pass a `task`, and no specific LoRA adapter will be used.
query_embeddings = model.encode(texts, task="retrieval.query")
print(query_embeddings.shape)
print(query_embeddings[0])

(4, 1024)
[ 0.0736009  -0.08957924  0.0887474  ...  0.01675522 -0.00211969
  0.00456729]


In [7]:
import numpy as np

# load embeddings
passage_embeddings = np.load('./jina-embeddings-v3_retrieval.passage.npy')
print(passage_embeddings.shape)
print(passage_embeddings[0])

(57650, 1024)
[-0.1406054  -0.04126596  0.02898028 ... -0.0118162  -0.0104178
 -0.00558778]


1. Multi-threaded parallel MapReduce
This approach utilizes Python's ThreadPoolExecutor to slice the similarity computation task and process it in parallel:
Slicing (Map): The large-scale embedded matrix is sliced by rows into a number of small blocks, and each block calculates the similarity individually.
Parallel Computing: Process multiple slices simultaneously through multiple threads, making full use of multi-core CPUs to improve computational efficiency.
Reduce: Splice the similarity results of each slice into a complete similarity matrix.

Applicable Scenarios:
The embedding matrix is large and the memory of a single machine is not enough to load all the data at once.

In [12]:
%%time
from concurrent.futures import ThreadPoolExecutor
import numpy as np


def compute_shard_similarity(shard, query_embeddings):
    return query_embeddings @ shard.T

# divide `passage_embeddings` into different pieces
num_shards = 10
shards = np.array_split(passage_embeddings, num_shards, axis=0)

with ThreadPoolExecutor(max_workers=4) as executor:
    shard_similarities = list(executor.map(compute_shard_similarity, shards, [query_embeddings] * len(shards)))


similarities = np.hstack(shard_similarities)
print("MapReduce-style computation completed!")
print(similarities)

MapReduce-style computation completed!
[[0.11739243 0.20485012 0.15104674 ... 0.12595034 0.16781409 0.17875487]
 [0.3053073  0.11800735 0.15972804 ... 0.09573218 0.08708793 0.08289387]
 [0.30639645 0.35178605 0.37007502 ... 0.30292025 0.27659038 0.34594226]
 [0.32856753 0.30532134 0.310084   ... 0.22818434 0.3047609  0.3286222 ]]
CPU times: user 173 ms, sys: 100 ms, total: 273 ms
Wall time: 24.2 ms


2. MapReduce based on map and functools.reduce
This approach simulates the MapReduce workflow using Python's built-in map and functools.reduce:
Map: Each slice of the embedded matrix is passed into the mapper function, which computes the similarity.
Reduce: Merge the results of all the slices with functools.reduce to get the full similarity matrix.

Applicable Scenarios:
Embedded matrices are moderate and only need to be processed in a standalone environment.

In [13]:
%%time
import functools
import numpy as np

# define mapper 和 reducer
def mapper(shard):
    shard_similarity = query_embeddings @ shard.T
    return shard_similarity

def reducer(p, c):
    return np.hstack((p, c))

# divide `passage_embeddings` into many pieces
num_shards = 10
shards = np.array_split(passage_embeddings, num_shards, axis=0)

mapped = map(mapper, shards)
reduced = functools.reduce(reducer, mapped)

print("MapReduce-style computation completed!")
print(reduced)


MapReduce-style computation completed!
[[0.11739243 0.20485012 0.15104674 ... 0.12595034 0.16781409 0.17875487]
 [0.3053073  0.11800735 0.15972804 ... 0.09573218 0.08708793 0.08289387]
 [0.30639645 0.35178605 0.37007502 ... 0.30292025 0.27659038 0.34594226]
 [0.32856753 0.30532134 0.310084   ... 0.22818434 0.3047609  0.3286222 ]]
CPU times: user 154 ms, sys: 87 ms, total: 241 ms
Wall time: 21 ms


Comparison Summary
Multi-threaded parallel MapReduce realizes parallel processing through thread pooling, which is more suitable for super-large-scale embedded matrix processing, especially in multi-core CPU environment, which can give full play to the hardware performance.

MapReduce based on map and functools.reduce is more lightweight and suitable for medium-sized data processing, but performs better when computing resources are limited or the task size is small.

In [14]:
%%time
# get top k indices
top_k = 5
top_k_indices = np.argsort(-similarities, axis=1)[:, :top_k]
print(top_k_indices)

[[12383 46207 47392 12494 44620]
 [43615  4627 33356 20269  6894]
 [25317  1841 31527 44260 24504]
 [23647 32184 45343  4425 33847]]
CPU times: user 9.95 ms, sys: 0 ns, total: 9.95 ms
Wall time: 9.34 ms


In [15]:
%%time
# a more efficient way to get top k indices
num_queries = similarities.shape[0]
arange = np.arange(num_queries)[:, None]

top_k = 5
top_k_indices = np.argpartition(-similarities, top_k, axis=1)[:, :top_k]
# Sort the top_k indices to get them in order
top_k_indices = top_k_indices[arange, np.argsort(-similarities[arange, top_k_indices])]
print(top_k_indices)

[[12383 46207 47392 12494 44620]
 [43615  4627 33356 20269  6894]
 [25317  1841 31527 44260 24504]
 [23647 32184 45343  4425 33847]]
CPU times: user 4.59 ms, sys: 0 ns, total: 4.59 ms
Wall time: 6.17 ms


In [16]:
# print similarities for top k indices
for i in range(len(top_k_indices)):
    print(similarities[i, top_k_indices[i]])

[0.31368357 0.30879918 0.3023354  0.30092064 0.3008294 ]
[0.5256693  0.523084   0.5055026  0.49298722 0.48881698]
[0.57650244 0.57347554 0.56800336 0.5575718  0.55670124]
[0.6037559  0.58124775 0.57745135 0.5753776  0.55670756]


## Data

In [17]:
import pandas as pd

data_path = './spotify_millsongdata.csv'
df = pd.read_csv(data_path)

print(df.head())

          version https://git-lfs.github.com/spec/v1
0  oid sha256:7cd19a8adf74791bfd99e1ccb8b1fc3bd2e...
1                                      size 74864162


In [None]:
# get entries for top k indices of query "a love song"
top_k_entries = df.iloc[top_k_indices[-2]]
print(top_k_entries)

In [19]:
# get entries for top k indices of query "a sad song"
top_k_entries = df.iloc[top_k_indices[-1]]
print(top_k_entries)

IndexError: positional indexers are out-of-bounds