# Elasticsearch With Haystack

We will be communicating with our Elasticsearch document store via Haystack. First, we need to install Haystack using pip:

On Windows:

```
pip install farm-haystack -f https://download.pytorch.org/whl/torch_stable.html
```

Anything else:

```
pip install farm-haystack
```

We will start by indexing the SQuAD dev data. So let's load that into our notebook first.



In [6]:
import json

with open('../../data/squad/dev.json', 'r') as f:
    squad = json.load(f)

Next, we initialize a connection between Haystack and our local Elasticsearch instance like so:

In [1]:
from haystack.document_store.elasticsearch import ElasticsearchDocumentStore

document_store = ElasticsearchDocumentStore(
    host='localhost',
    index='squad_docs',
)
# document_store = ElasticsearchDocumentStore()


03/08/2022 08:57:09 - INFO - faiss -   Loading faiss.
03/08/2022 08:57:16 - INFO - elasticsearch -   HEAD http://localhost:9200/squad_docs [status:200 request:0.042s]
03/08/2022 08:57:16 - INFO - elasticsearch -   GET http://localhost:9200/squad_docs [status:200 request:0.002s]
03/08/2022 08:57:16 - INFO - elasticsearch -   PUT http://localhost:9200/squad_docs/_mapping [status:200 request:0.015s]
03/08/2022 08:57:16 - INFO - elasticsearch -   HEAD http://localhost:9200/label [status:200 request:0.003s]


Great, we've established our connection, now let's try querying our Elasticsearch instance. We will do this through the `requests` library.

In [2]:
import requests

Let's check our cluster *health* (eg the general status of our Elasticsearch instance). We do this by sending a **GET** request to the `_cluster/health` endpoint.

In [3]:
res = requests.get('http://localhost:9200/_cluster/health')

res.json()

{'cluster_name': 'elasticsearch',
 'status': 'yellow',
 'timed_out': False,
 'number_of_nodes': 1,
 'number_of_data_nodes': 1,
 'active_primary_shards': 5,
 'active_shards': 5,
 'relocating_shards': 0,
 'initializing_shards': 0,
 'unassigned_shards': 3,
 'delayed_unassigned_shards': 0,
 'number_of_pending_tasks': 0,
 'number_of_in_flight_fetch': 0,
 'task_max_waiting_in_queue_millis': 0,
 'active_shards_percent_as_number': 62.5}

Okay we can see that the cluster is definitely running. The cluster status is *yellow*, ideally we want to aim for *green* but the reason we see yellow here is because not all replica shards have been allocated to nodes. The details of this don't really matter, but it essentially just means that we don't have a full set of backup (*replica*) data shards - which is only a problem if our *primary* data sources get corrupted/lost. That is beyond the scope of what we are doing here however.

## Adding Data

Right now our Elasticsearch instance contains a single, empty index called *'squad_docs'*. We need to populate this with our `squad` data. We populate our index through the `document_store.write_documents(<input_data>)` method, where our *\<input_data\>* must be a list of dictionaries in the format:

```json
{
    'text': '<document text here>',
    'meta': {
        'other': '<other info here>'
    }
}
```

We **must** include the `'text'` key. The *text* must contain the text from each sample, which in our case is a *context* string. The `'meta'` data is optional, but is usually used to contain anything else that might be relevant, so for example we might want to include the *group* that the context came from (eg 'Beyonce', or 'Matter').

In [8]:
squad_docs = []

for sample in squad:
    squad_docs.append({
        'text': sample['context']
    })

Then we add our data to the index like this:

In [9]:
document_store.write_documents(squad_docs)

03/08/2022 08:58:30 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.775s]
03/08/2022 08:58:31 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.879s]
03/08/2022 08:58:32 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.056s]
03/08/2022 08:58:33 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.026s]
03/08/2022 08:58:34 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.024s]
03/08/2022 08:58:35 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.261s]
03/08/2022 08:58:36 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.016s]
03/08/2022 08:58:37 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.013s]


When we're retrieving data from Elasticsearch we will be retrieving documents using either the TF-IDF, or BM25 algorithms.

**TF-IDF** is a common *relevance* scoring algorithm, the built is calculated using:

* **TF**, the volume of words in the query (question) that appear in the document.

* **IDF**, the inverse of the fraction of documents that contain the same word (eg common words like *'the'* don't score well, whereas *'Beyonce'* would).

We integrate TD-IDF using:

In [10]:
document_store

<haystack.document_store.elasticsearch.ElasticsearchDocumentStore at 0x7f8faff396d0>

In [12]:
from haystack.retriever.sparse import TfidfRetriever

retriever = TfidfRetriever(document_store)

03/08/2022 09:00:27 - INFO - elasticsearch -   POST http://localhost:9200/squad_docs/_search?scroll=1d&size=10000 [status:200 request:0.146s]
03/08/2022 09:00:27 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.107s]
03/08/2022 09:00:27 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.092s]
03/08/2022 09:00:27 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.024s]
03/08/2022 09:00:27 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.004s]
03/08/2022 09:00:27 - INFO - elasticsearch -   DELETE http://localhost:9200/_search/scroll [status:200 request:0.007s]
03/08/2022 09:00:27 - INFO - haystack.retriever.sparse -   Found 32418 candidate paragraphs from 32418 docs in DB


We can see here that when building our retriever, it identified a total of *16209* 'candidate paragraphs'. These are all of the contexts from our `squad` data:

In [13]:
len(squad)

16209

For now, we can return data from Elasticsearch, using the **TF-IDF** algorithm, with the `retrieve` method.

In [14]:
query = "Physics is a very abstract subject"

retriever.retrieve(query)

[{'text': 'Even though some proofs of complexity-theoretic theorems regularly assume some concrete choice of input encoding, one tries to keep the discussion abstract enough to be independent of the choice of encoding. This can be achieved by ensuring that different representations can be transformed into each other efficiently.', 'id': '69d78246-a7fa-48c3-a081-55a587ebd8d2', 'score': None, 'probability': None, 'question': None, 'meta': {}, 'embedding': None},
 {'text': 'Even though some proofs of complexity-theoretic theorems regularly assume some concrete choice of input encoding, one tries to keep the discussion abstract enough to be independent of the choice of encoding. This can be achieved by ensuring that different representations can be transformed into each other efficiently.', 'id': 'bb6b3f42-0b06-4405-b353-cc3dec1d4a79', 'score': None, 'probability': None, 'question': None, 'meta': {}, 'embedding': None},
 {'text': 'Even though some proofs of complexity-theoretic theorems re

This query returns a huge number of duplicates. The reason we have these is because our data contained duplicates of the same context because each context could be tied to several different questions. So now, we need to restart by first deleting everything inside our *squad_docs* index. Then re-indexing our deduplicated data.

We can delete every document in our index by sending a **POST** request to the `<index_name>/_delete_by_query` endpoint:

In [15]:
res = requests.post(
    'http://localhost:9200/squad_docs/_delete_by_query',
    json={
        'query': {
            'match_all': {}
        }
    }
)

res.json()

{'took': 3117,
 'timed_out': False,
 'total': 32418,
 'deleted': 32418,
 'batches': 33,
 'version_conflicts': 0,
 'noops': 0,
 'retries': {'bulk': 0, 'search': 0},
 'throttled_millis': 0,
 'requests_per_second': -1.0,
 'throttled_until_millis': 0,
 'failures': []}

Our response shows `'deleted': 16209`, which means all *16209* documents have been deleted from our *squad_docs* index. We can confirm this by calling the `<index_name>/_count` endpoint too:

In [16]:
res = requests.get('http://localhost:9200/squad_docs/_count')

res.json()

{'count': 0,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

Now that we've cleared the index, it's time to remove duplicates from our SQuAD contexts and re-index them.

In [17]:
# create list of contexts (we cannot do this using current dictionary format)
contexts = [sample['context'] for sample in squad]

# convert to set to remove duplicates, then back to list
contexts = list(set(contexts))

# convert back to dictionary format we need
squad_docs = [{'text': sample} for sample in contexts]

Finally, we can re-index our Elasticsearch as we did before.

In [18]:
document_store.write_documents(squad_docs)

03/08/2022 09:06:03 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.234s]
03/08/2022 09:06:04 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.069s]
03/08/2022 09:06:05 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.072s]


Because we have changed the contents of our index, we initialize our retriever once more.

In [19]:
retriever = TfidfRetriever(document_store)

03/08/2022 09:06:12 - INFO - elasticsearch -   POST http://localhost:9200/squad_docs/_search?scroll=1d&size=10000 [status:200 request:0.024s]
03/08/2022 09:06:12 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.005s]
03/08/2022 09:06:12 - INFO - elasticsearch -   DELETE http://localhost:9200/_search/scroll [status:200 request:0.003s]
03/08/2022 09:06:12 - INFO - haystack.retriever.sparse -   Found 1204 candidate paragraphs from 1204 docs in DB


And this time we see that our retriever found *1204* documents (much less than the *16209* we found before). Now it's time to query our data again!

In [20]:
retriever.retrieve(query)

[{'text': 'Even though some proofs of complexity-theoretic theorems regularly assume some concrete choice of input encoding, one tries to keep the discussion abstract enough to be independent of the choice of encoding. This can be achieved by ensuring that different representations can be transformed into each other efficiently.', 'id': '44501a50-535f-4899-a3de-f33c719195f8', 'score': None, 'probability': None, 'question': None, 'meta': {}, 'embedding': None},
 {'text': 'With modern insights into quantum mechanics and technology that can accelerate particles close to the speed of light, particle physics has devised a Standard Model to describe forces between particles smaller than atoms. The Standard Model predicts that exchanged particles called gauge bosons are the fundamental means by which forces are emitted and absorbed. Only four main interactions are known: in order of decreasing strength, they are: strong, electromagnetic, weak, and gravitational.:2–10:79 High-energy particle p

Now we're returning a set of relevant documents, without duplicates.

Finally, let's return back to the other *sparse retriever* that we can use with Elasticsearch. We already used **TF-IDF**, by switching `TfidfRetriever` for `ElasticsearchRetriever` we can switch to the **BM25** algorithm, which is an *improved* version of **TF-IDF** and is recommended by Haystack.

So, let's initialize that and make another query with it.

In [21]:
# import BM25 retriever
from haystack.retriever.sparse import ElasticsearchRetriever

# intialize
retriever = ElasticsearchRetriever(document_store)

# and query
retriever.retrieve(query)

03/08/2022 09:06:44 - INFO - elasticsearch -   POST http://localhost:9200/squad_docs/_search [status:200 request:0.037s]


[{'text': 'Even though some proofs of complexity-theoretic theorems regularly assume some concrete choice of input encoding, one tries to keep the discussion abstract enough to be independent of the choice of encoding. This can be achieved by ensuring that different representations can be transformed into each other efficiently.', 'id': '44501a50-535f-4899-a3de-f33c719195f8', 'score': 7.8496785, 'probability': 0.7273482036175146, 'question': None, 'meta': {}, 'embedding': None},
 {'text': 'A computational problem can be viewed as an infinite collection of instances together with a solution for every instance. The input string for a computational problem is referred to as a problem instance, and should not be confused with the problem itself. In computational complexity theory, a problem refers to the abstract question to be solved. In contrast, an instance of this problem is a rather concrete utterance, which can serve as the input for a decision problem. For example, consider the prob

Okay great, this is a pretty big notebook but it covers everything we need to know to get started with Haystack + Elastic (and a little more).