Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow more options for elasticsearch client (auth, multiple hosts) #845

Merged
merged 7 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions docs/_src/api/api/document_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore)
#### \_\_init\_\_

```python
| __init__(host: str = "localhost", port: int = 9200, username: str = "", password: str = "", index: str = "document", label_index: str = "label", search_fields: Union[str, list] = "text", text_field: str = "text", name_field: str = "name", embedding_field: str = "embedding", embedding_dim: int = 768, custom_mapping: Optional[dict] = None, excluded_meta_data: Optional[list] = None, faq_question_field: Optional[str] = None, analyzer: str = "standard", scheme: str = "http", ca_certs: str = None, verify_certs: bool = True, create_index: bool = True, update_existing_documents: bool = False, refresh_type: str = "wait_for", similarity="dot_product", timeout=30, return_embedding: bool = False)
| __init__(host: Union[str, List[str]] = "localhost", port: Union[int, List[int]] = 9200, username: str = "", password: str = "", api_key_id: Optional[str] = None, api_key: Optional[str] = None, index: str = "document", label_index: str = "label", search_fields: Union[str, list] = "text", text_field: str = "text", name_field: str = "name", embedding_field: str = "embedding", embedding_dim: int = 768, custom_mapping: Optional[dict] = None, excluded_meta_data: Optional[list] = None, faq_question_field: Optional[str] = None, analyzer: str = "standard", scheme: str = "http", ca_certs: Optional[str] = None, verify_certs: bool = True, create_index: bool = True, update_existing_documents: bool = False, refresh_type: str = "wait_for", similarity="dot_product", timeout=30, return_embedding: bool = False)
```

A DocumentStore using Elasticsearch to store and query the documents for our search.
Expand All @@ -102,11 +102,14 @@ A DocumentStore using Elasticsearch to store and query the documents for our sea

**Arguments**:

- `host`: url of elasticsearch
- `port`: port of elasticsearch
- `username`: username
- `password`: password
- `index`: Name of index in elasticsearch to use. If not existing yet, we will create one.
- `host`: url(s) of elasticsearch nodes
- `port`: port(s) of elasticsearch nodes
- `username`: username (standard authentication via http_auth)
- `password`: password (standard authentication via http_auth)
- `api_key_id`: ID of the API key (altenative authentication mode to the above http_auth)
- `api_key`: Secret value of the API key (altenative authentication mode to the above http_auth)
- `index`: Name of index in elasticsearch to use for storing the documents that we want to search. If not existing yet, we will create one.
- `label_index`: Name of index in elasticsearch to use for storing labels. If not existing yet, we will create one.
- `search_fields`: Name of fields used by ElasticsearchRetriever to find matches in the docs to our incoming query (using elastic's multi_match query), e.g. ["title", "full_text"]
- `text_field`: Name of field that might contain the answer and will therefore be passed to the Reader Model (e.g. "full_text").
If no Reader is used (e.g. in FAQ-Style QA) the plain content of this field will just be returned.
Expand Down
69 changes: 58 additions & 11 deletions haystack/document_store/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from elasticsearch.exceptions import RequestError
import numpy as np
from scipy.special import expit
import traceback

from haystack.document_store.base import BaseDocumentStore
from haystack import Document, Label
Expand All @@ -20,10 +21,12 @@
class ElasticsearchDocumentStore(BaseDocumentStore):
def __init__(
self,
host: str = "localhost",
port: int = 9200,
host: Union[str, List[str]] = "localhost",
port: Union[int, List[int]] = 9200,
username: str = "",
password: str = "",
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
index: str = "document",
label_index: str = "label",
search_fields: Union[str, list] = "text",
Expand All @@ -36,7 +39,7 @@ def __init__(
faq_question_field: Optional[str] = None,
analyzer: str = "standard",
scheme: str = "http",
ca_certs: str = None,
ca_certs: Optional[str] = None,
verify_certs: bool = True,
create_index: bool = True,
update_existing_documents: bool = False,
Expand All @@ -52,11 +55,14 @@ def __init__(
* You can either use an existing Elasticsearch index or create a new one via haystack
* Retrievers operate on top of this DocumentStore to find the relevant documents for a query

:param host: url of elasticsearch
:param port: port of elasticsearch
:param username: username
:param password: password
:param index: Name of index in elasticsearch to use. If not existing yet, we will create one.
:param host: url(s) of elasticsearch nodes
:param port: port(s) of elasticsearch nodes
:param username: username (standard authentication via http_auth)
:param password: password (standard authentication via http_auth)
:param api_key_id: ID of the API key (altenative authentication mode to the above http_auth)
:param api_key: Secret value of the API key (altenative authentication mode to the above http_auth)
:param index: Name of index in elasticsearch to use for storing the documents that we want to search. If not existing yet, we will create one.
:param label_index: Name of index in elasticsearch to use for storing labels. If not existing yet, we will create one.
:param search_fields: Name of fields used by ElasticsearchRetriever to find matches in the docs to our incoming query (using elastic's multi_match query), e.g. ["title", "full_text"]
:param text_field: Name of field that might contain the answer and will therefore be passed to the Reader Model (e.g. "full_text").
If no Reader is used (e.g. in FAQ-Style QA) the plain content of this field will just be returned.
Expand Down Expand Up @@ -86,10 +92,11 @@ def __init__(
:param timeout: Number of seconds after which an ElasticSearch request times out.
:param return_embedding: To return document embedding


"""
self.client = Elasticsearch(hosts=[{"host": host, "port": port}], http_auth=(username, password),
scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs, timeout=timeout)

self.client = self._init_elastic_client(host=host, port=port, username=username, password=password,
api_key=api_key, api_key_id=api_key_id, scheme=scheme,
ca_certs=ca_certs, verify_certs=verify_certs,timeout=timeout)

# configure mappings to ES fields that will be used for querying / displaying results
if type(search_fields) == str:
Expand Down Expand Up @@ -121,6 +128,46 @@ def __init__(
self.update_existing_documents = update_existing_documents
self.refresh_type = refresh_type

def _init_elastic_client(self,
host: Union[str, List[str]],
port: Union[int, List[int]],
username: str,
password: str,
api_key_id: Optional[str],
api_key: Optional[str],
scheme: str,
ca_certs: Optional[str],
verify_certs: bool,
timeout: int) -> Elasticsearch:
# Create list of host(s) + port(s) to allow direct client connections to multiple elasticsearch nodes
if isinstance(host, list):
if isinstance(port, list):
if not len(port) == len(host):
raise ValueError("Length of list `host` must match length of list `port`")
hosts = [{"host":h, "port":p} for h, p in zip(host,port)]
else:
hosts = [{"host": h, "port": port} for h in host]
else:
hosts = [{"host": host, "port": port}]

if (api_key or api_key_id) and not(api_key and api_key_id):
raise ValueError("You must provide either both or none of `api_key_id` and `api_key`")

# Init client based on authentication mode
try:
if api_key:
# api key authentication
return Elasticsearch(hosts=hosts, http_auth=(username, password),
scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs,
timeout=timeout)
else:
# standard http_auth
return Elasticsearch(hosts=hosts, api_key=(api_key_id, api_key),
scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs, timeout=timeout)
except Exception as e:
tholor marked this conversation as resolved.
Show resolved Hide resolved
tb = traceback.format_exc()
raise ConnectionError(f"Initial connection to Elasticsearch failed. Error: {e}, Traceback: {tb}")

def _create_document_index(self, index_name: str):
"""
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
Expand Down
23 changes: 23 additions & 0 deletions test/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@
from haystack.document_store.elasticsearch import ElasticsearchDocumentStore


@pytest.mark.elasticsearch
def test_init_elastic_client():
# defaults
_ = ElasticsearchDocumentStore()

# list of hosts + single port
_ = ElasticsearchDocumentStore(host=["localhost", "127.0.0.1"], port=9200)

# list of hosts + list of ports (wrong)
with pytest.raises(Exception):
_ = ElasticsearchDocumentStore(host=["localhost", "127.0.0.1"], port=[9200])

# list of hosts + list
_ = ElasticsearchDocumentStore(host=["localhost", "127.0.0.1"], port=[9200, 9200])

# only api_key
with pytest.raises(Exception):
_ = ElasticsearchDocumentStore(host=["localhost"], port=[9200], api_key="test")

# api_key + id
_ = ElasticsearchDocumentStore(host=["localhost"], port=[9200], api_key="test", api_key_id="test")


@pytest.mark.elasticsearch
def test_get_all_documents_without_filters(document_store_with_docs):
documents = document_store_with_docs.get_all_documents()
Expand Down