In [3]:
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200", request_timeout=1000)

In [4]:
from typing import Dict, List
import math


class TFIDFVectorizer:
    def __init__(self, es_client: Elasticsearch, index_name: str, analyzer_name: str):
        self.client = es_client
        self.index_name = index_name
        self.analyzer_name = analyzer_name

    def tokenize(self, text: str) -> List[str]:
        """
        given query string it outputs the list of preprocessed tokens from it
        using same analyzer (preprocessing pipeline) than the arxiv abstracts
        """

        # Use the analyze API on the specified index
        response = self.client.indices.analyze(
            index=self.index_name, body={"analyzer": self.analyzer_name, "text": text}
        )

        # Extract just the token strings from the response
        preprocessed_terms = [token_info["token"] for token_info in response["tokens"]]
        return preprocessed_terms

    def doc_count(self) -> int:
        """Returns the number of documents in the index."""
        response = self.client.count(index=self.index_name)
        return response["count"]

    def _doc_terms_info(self, doc_id: str) -> Dict[str, Dict[str, int]]:
        """Retrieves term statistics for the specified document."""
        response = self.client.termvectors(
            index=self.index_name,
            id=doc_id,
            fields=["text"],
            term_statistics=True,
            positions=False,
            offsets=False,
            payloads=False,
        )
        terms_info = response.get("term_vectors", {}).get("text", {}).get("terms", {})
        return terms_info

    def _text_terms_info(self, text: str) -> Dict[str, Dict[str, int]]:
        """Retrieves term statistics for the given text."""
        response = self.client.termvectors(
            index=self.index_name,
            doc={"text": text},
            fields=["text"],
            term_statistics=True,
            positions=False,
            offsets=False,
            payloads=False,
        )
        terms_info = response.get("term_vectors", {}).get("text", {}).get("terms", {})
        return terms_info

    def tf_idf_document(self, doc_id: str) -> Dict[str, float]:
        """Computes the TF-IDF weights for terms in the specified document."""

        tf_idf_weights = {}
        doc_count = self.doc_count()
        terms_info = self._doc_terms_info(doc_id)
        for term, stats in terms_info.items():
            tf = stats.get("term_freq", 0)
            df = stats.get("doc_freq", 1)  # avoid division by zero
            idf = math.log2(doc_count / df)
            tf_idf_weights[term] = tf * idf

        return tf_idf_weights

    def tf_idf_text(self, text: str) -> Dict[str, float]:
        """Computes the TF-IDF weights for terms in the text."""
        tf_idf_weights = {}
        doc_count = self.doc_count()
        terms_info = self._text_terms_info(text)
        for term, stats in terms_info.items():
            tf = stats.get("term_freq", 0)
            df = stats.get("doc_freq", 1)  # avoid division by zero
            idf = math.log2(doc_count / df)
            tf_idf_weights[term] = tf * idf

        return tf_idf_weights

In [5]:
vectorizer = TFIDFVectorizer(client, index_name="arxiv", analyzer_name="custom")
vectorizer.tf_idf_text("machine learning")

{'learning': 3.1174318645927173, 'machine': 4.130636908016368}

In [6]:
query_string = "machine learning"
index_name = "arxiv"
analyzer_name = "custom"
r = 10  # Number of results to return

## Naive implementation

In [None]:
from elasticsearch.helpers import scan
from pprint import pprint
from tqdm import tqdm

print(f"Executing search of query string '{query_string}' with over documents in index '{index_name}'")
vectorizer = TFIDFVectorizer(client, index_name=index_name, analyzer_name=analyzer_name)

# Compute the query vector. No need to normalize it, as we only care about relative similarities.
query_vector = vectorizer.tf_idf_text(query_string)
query_norm = math.sqrt(sum(weight**2 for weight in query_vector.values()))
query_vector_normalized = {token: weight / query_norm for token, weight in query_vector.items()}

# Naive implementation:
# Scan through docs, compute cosine sim between query and each doc
similarities = {}
for doc in tqdm(scan(client, index=index_name, query={"_source": False}), total=vectorizer.doc_count()):
    docid = doc["_id"]
    doc_tf_idf = vectorizer.tf_idf_document(docid)
    doc_tf_idf_norm = math.sqrt(sum(weight**2 for weight in doc_tf_idf.values()))
    similarities[docid] = 0.0
    for token, weight in query_vector_normalized.items():
        similarities[docid] += weight * doc_tf_idf.get(token, 0)
    similarities[docid] /= doc_tf_idf_norm

# now sort by cosine similarity
sorted_answer = sorted(similarities.items(), key=lambda kv: kv[1], reverse=True)
pprint(sorted_answer[:r])

Executing search of query string 'machine learning' with over documents in index 'arxiv'


100%|██████████| 58102/58102 [45:32<00:00, 21.26it/s] 

[('30749', 0.499339916162954),
 ('29106', 0.4550250906242585),
 ('25230', 0.45238527762709785),
 ('30639', 0.4255856152643423),
 ('18989', 0.3944888658727178),
 ('53877', 0.3944888658727178),
 ('37272', 0.3767314656563681),
 ('55842', 0.3767314656563681),
 ('26955', 0.35511371168748196),
 ('35642', 0.3518888496131795)]





## Query Term-First Implementation

We have to first create the inverted index for the documents. We'll store, for each term, a list of documents that contain
that term, along with the TF-IDF weight of that term in the document.

We'll ignore any compression optimizations for simplicity. This won't cause any issues, as it will still fit in memory.

Also, one of the interesting things of the inverted-index approach is that we only need to compute it once, and then we can store it on disk
for future queries.

In [None]:
from typing import Tuple
from collections import defaultdict
import pickle
import os


vectorizer = TFIDFVectorizer(client, index_name=index_name, analyzer_name=analyzer_name)
inverted_index: Dict[str, List[Tuple[int, float]]] = defaultdict(list)  # Lists are amortized O(1) for appends
doc_ids: List[str] = []  # Storing strings in the inverted index would be too memory intensive


# We don't care about the order of the docIDs (in boolean models we would care, to implement fast merging algorithms)
for doc_index, doc in tqdm(
    enumerate(scan(client, index=index_name, query={"_source": False})), total=vectorizer.doc_count()
):
    docid = doc["_id"]
    doc_ids.append(docid)
    doc_tf_idf = vectorizer.tf_idf_document(docid)
    for term, weight in doc_tf_idf.items():
        inverted_index[term].append((doc_index, weight))
        # For typing consistency, we'll store tuples, instead of having a single "sausage" list of weights
inverted_index = dict(inverted_index)  # Convert back to normal dict for pickling

os.makedirs("../data/cache/", exist_ok=True)

# Save the inverted index to disk
with open("../data/cache/inverted_index.pkl", "wb") as f:
    pickle.dump(inverted_index, f)

# Save the document IDs to disk
with open("../data/cache/doc_ids.pkl", "wb") as f:
    pickle.dump(doc_ids, f)

100%|██████████| 58102/58102 [45:51<00:00, 21.11it/s] 


#### Without caching of TF-IDF norms

In [69]:
from pprint import pprint
from collections import defaultdict
import time


print(f"Executing quick search of query string '{query_string}' with over documents in index '{index_name}'")
start = time.time()
vectorizer = TFIDFVectorizer(client, index_name=index_name, analyzer_name=analyzer_name)
vectorizer_loaded_time = time.time()
print(f"Vectorizer loaded in {vectorizer_loaded_time-start:.2f} seconds")

# Compute the query vector. No need to normalize it, as we only care about relative similarities.
query_vector = vectorizer.tf_idf_text(query_string)
query_norm = math.sqrt(sum(weight**2 for weight in query_vector.values()))
query_vector_normalized = {token: weight / query_norm for token, weight in query_vector.items()}


query_vector_computed_time = time.time()
print(f"Query vector computed in {query_vector_computed_time-vectorizer_loaded_time:.2f} seconds")

# Fast implementation:
# Scan through the query terms, look them up in the inverted index, and accumulate partial similarities

# Start by loading the inverted index and document IDs from disk
with open("../data/cache/inverted_index.pkl", "rb") as f:
    inverted_index = pickle.load(f)
with open("../data/cache/doc_ids.pkl", "rb") as f:
    doc_ids = pickle.load(f)

inverted_index_loaded_time = time.time()
print(f"Inverted index loaded in {inverted_index_loaded_time-query_vector_computed_time:.2f} seconds")

similarities = defaultdict(float)
for term, weight in query_vector_normalized.items():
    if term in inverted_index:
        for doc_index, doc_weight in inverted_index[term]:
            docid = doc_ids[doc_index]
            similarities[docid] += weight * doc_weight

similarities_aggregated_time = time.time()
print(
    f"Accumulated similarities for {len(similarities)} documents in {similarities_aggregated_time-inverted_index_loaded_time:.2f} seconds"
)

for docid in similarities:
    doc_tf_idf = vectorizer.tf_idf_document(docid)
    doc_tf_idf_norm = math.sqrt(sum(weight**2 for weight in doc_tf_idf.values()))
    similarities[docid] /= doc_tf_idf_norm

similarities_normalized_time = time.time()
print(f"Normalized similarities in {similarities_normalized_time-similarities_aggregated_time:.2f} seconds")

# now sort by cosine similarity
sorted_answer = sorted(similarities.items(), key=lambda kv: kv[1], reverse=True)
response_time = time.time()
print(f"Sorted top {r} results in {response_time-similarities_normalized_time:.2f} seconds")
print(f"Total query time: {response_time-start:.2f} seconds")
pprint(sorted_answer[:r])

Executing quick search of query string 'machine learning' with over documents in index 'arxiv'
Vectorizer loaded in 0.00 seconds
Query vector computed in 0.05 seconds
Inverted index loaded in 1.40 seconds
Accumulated similarities for 7392 documents in 0.00 seconds
Normalized similarities in 344.75 seconds
Sorted top 10 results in 0.20 seconds
Total query time: 346.40 seconds
[('30749', 0.499339916162954),
 ('29106', 0.4550250906242585),
 ('25230', 0.45238527762709785),
 ('30639', 0.4255856152643423),
 ('18989', 0.3944888658727178),
 ('53877', 0.3944888658727178),
 ('37272', 0.3767314656563681),
 ('55842', 0.3767314656563681),
 ('26955', 0.35511371168748196),
 ('35642', 0.3518888496131795)]


Clearly, querying elasticsearch each time for the norm of the tf-idf vectors of the documents is very inefficient, so we'll try to also cache this data.

#### With caching of TF-IDF norms

To ease the computation, we'll use async calls.

In [2]:
from typing import Dict, List
import math
from elasticsearch import AsyncElasticsearch


class AsyncTFIDFVectorizer:
    def __init__(self, es_client: AsyncElasticsearch, index_name: str, analyzer_name: str):
        self.client = es_client
        self.index_name = index_name
        self.analyzer_name = analyzer_name

    async def tokenize(self, text: str) -> List[str]:
        """
        given query string it outputs the list of preprocessed tokens from it
        using same analyzer (preprocessing pipeline) than the arxiv abstracts
        """

        # Use the analyze API on the specified index
        response = await self.client.indices.analyze(
            index=self.index_name, body={"analyzer": self.analyzer_name, "text": text}
        )

        # Extract just the token strings from the response
        preprocessed_terms = [token_info["token"] for token_info in response["tokens"]]
        return preprocessed_terms

    async def doc_count(self) -> int:
        """Returns the number of documents in the index."""
        response = await self.client.count(index=self.index_name)
        return response["count"]

    async def _doc_terms_info(self, doc_id: str) -> Dict[str, Dict[str, int]]:
        """Retrieves term statistics for the specified document."""
        response = await self.client.termvectors(
            index=self.index_name,
            id=doc_id,
            fields=["text"],
            term_statistics=True,
            positions=False,
            offsets=False,
            payloads=False,
        )
        terms_info = response.get("term_vectors", {}).get("text", {}).get("terms", {})
        return terms_info

    async def _text_terms_info(self, text: str) -> Dict[str, Dict[str, int]]:
        """Retrieves term statistics for the given text."""
        response = await self.client.termvectors(
            index=self.index_name,
            doc={"text": text},
            fields=["text"],
            term_statistics=True,
            positions=False,
            offsets=False,
            payloads=False,
        )
        terms_info = response.get("term_vectors", {}).get("text", {}).get("terms", {})
        return terms_info

    async def tf_idf_document(self, doc_id: str) -> Dict[str, float]:
        """Computes the TF-IDF weights for terms in the specified document."""

        tf_idf_weights = {}
        doc_count = await self.doc_count()
        terms_info = await self._doc_terms_info(doc_id)
        for term, stats in terms_info.items():
            tf = stats.get("term_freq", 0)
            df = stats.get("doc_freq", 1)  # avoid division by zero
            idf = math.log2(doc_count / df)
            tf_idf_weights[term] = tf * idf

        return tf_idf_weights

    async def tf_idf_text(self, text: str) -> Dict[str, float]:
        """Computes the TF-IDF weights for terms in the text."""
        tf_idf_weights = {}
        doc_count = await self.doc_count()
        terms_info = await self._text_terms_info(text)
        for term, stats in terms_info.items():
            tf = stats.get("term_freq", 0)
            df = stats.get("doc_freq", 1)  # avoid division by zero
            idf = math.log2(doc_count / df)
            tf_idf_weights[term] = tf * idf

        return tf_idf_weights

In [5]:
import asyncio
import pickle
from tqdm import tqdm

async_client = AsyncElasticsearch("http://localhost:9200", request_timeout=1000)
vectorizer = AsyncTFIDFVectorizer(async_client, index_name=index_name, analyzer_name=analyzer_name)
vector_norms: List[float] = []  # Again, we'll store floats in a list for memory and access efficiency


with open("../data/cache/doc_ids.pkl", "rb") as f:
    doc_ids: List[str] = pickle.load(f)

bar = tqdm(total=len(doc_ids))


async def compute_norm(docid: str) -> float:
    doc_tf_idf = await vectorizer.tf_idf_document(docid)
    doc_tf_idf_norm = math.sqrt(sum(weight**2 for weight in doc_tf_idf.values()))
    bar.update(1)
    return doc_tf_idf_norm


async def main():
    tasks = [compute_norm(docid) for docid in doc_ids]
    return await asyncio.gather(*tasks)


vector_norms = await main()
bar.close()

with open("../data/cache/vector_norms.pkl", "wb") as f:
    pickle.dump(vector_norms, f)

  0%|          | 0/58102 [00:16<?, ?it/s]
  tasks = [compute_norm(docid) for docid in doc_ids]
100%|██████████| 58102/58102 [00:26<00:00, 2172.20it/s]


A small pat on my own back: This process would've taken 45 minutes if it weren't for the async calls. With them, it took 26 seconds.

In [8]:
from collections import defaultdict
import time
from collections import defaultdict
import pickle

print(f"Executing quick search of query string '{query_string}' with over documents in index '{index_name}'")
start = time.time()
vectorizer = TFIDFVectorizer(client, index_name=index_name, analyzer_name=analyzer_name)
vectorizer_loaded_time = time.time()
print(f"Vectorizer loaded in {vectorizer_loaded_time-start:.2f} seconds")

# Compute the query vector. No need to normalize it, as we only care about relative similarities.
query_vector = vectorizer.tf_idf_text(query_string)
query_norm = math.sqrt(sum(weight**2 for weight in query_vector.values()))
query_vector_normalized = {token: weight / query_norm for token, weight in query_vector.items()}


query_vector_computed_time = time.time()
print(f"Query vector computed in {query_vector_computed_time-vectorizer_loaded_time:.2f} seconds")

# Fast implementation:
# Scan through the query terms, look them up in the inverted index, and accumulate partial similarities

# Start by loading the inverted index and document IDs from disk
with open("../data/cache/inverted_index.pkl", "rb") as f:
    inverted_index = pickle.load(f)
with open("../data/cache/doc_ids.pkl", "rb") as f:
    doc_ids = pickle.load(f)
with open("../data/cache/vector_norms.pkl", "rb") as f:
    vector_norms = pickle.load(f)

inverted_index_loaded_time = time.time()
print(f"Inverted index loaded in {inverted_index_loaded_time-query_vector_computed_time:.2f} seconds")

similarities = defaultdict(float)
for term, weight in query_vector_normalized.items():
    if term in inverted_index:
        for doc_index, doc_weight in inverted_index[term]:
            similarities[doc_index] += weight * doc_weight

similarities_aggregated_time = time.time()
print(
    f"Accumulated similarities for {len(similarities)} documents in {similarities_aggregated_time-inverted_index_loaded_time:.2f} seconds"
)

for doc_index in similarities:
    similarities[doc_index] /= vector_norms[doc_index]

similarities_normalized_time = time.time()
print(f"Normalized similarities in {similarities_normalized_time-similarities_aggregated_time:.2f} seconds")

# now sort by cosine similarity
sorted_answer = sorted(similarities.items(), key=lambda kv: kv[1], reverse=True)
response_time = time.time()
print(f"Sorted top {r} results in {response_time-similarities_normalized_time:.2f} seconds")
print(f"Total query time: {response_time-start:.2f} seconds")
for doc_index, score in sorted_answer[:r]:
    docid = doc_ids[doc_index]
    print(f"({docid}, {score})")

Executing quick search of query string 'machine learning' with over documents in index 'arxiv'
Vectorizer loaded in 0.00 seconds
Query vector computed in 0.04 seconds
Inverted index loaded in 0.97 seconds
Accumulated similarities for 7392 documents in 0.01 seconds
Normalized similarities in 0.00 seconds
Sorted top 10 results in 0.00 seconds
Total query time: 1.02 seconds
(30749, 0.499339916162954)
(29106, 0.4550250906242585)
(25230, 0.45238527762709785)
(30639, 0.4255856152643423)
(18989, 0.3944888658727178)
(53877, 0.3944888658727178)
(37272, 0.3767314656563681)
(55842, 0.3767314656563681)
(26955, 0.35511371168748196)
(35642, 0.3518888496131795)


And voila, what took 45 minutes the first time, was reduced to 1.02 seconds!