diff --git a/.github/workflows/cogstack-es_main.yml b/.github/workflows/cogstack-es_main.yml new file mode 100644 index 000000000..91ab1b854 --- /dev/null +++ b/.github/workflows/cogstack-es_main.yml @@ -0,0 +1,42 @@ +name: cogstack-es - Test + +on: + push: + branches: [ main ] + pull_request: + paths: + - 'cogstack-es/**' + - '.github/workflows/cogstack-es**' + +defaults: + run: + working-directory: ./cogstack-es + +jobs: + types-lint-tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ '3.9', '3.10', '3.11', '3.12' ] + max-parallel: 4 + + steps: + - uses: actions/checkout@v5 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + - name: Check types + run: | + python -m mypy --follow-imports=normal *.py + - name: Lint + run: | + ruff check *.py + - name: Test + run: | + pytest tests diff --git a/cogstack-es/ReadMe.md b/cogstack-es/ReadMe.md new file mode 100644 index 000000000..7f63dbf9c --- /dev/null +++ b/cogstack-es/ReadMe.md @@ -0,0 +1,27 @@ + +# Login and search +This project is responsible for logging in and performing a search. + +## Login details +1. Create a [credentials.py](credentials.py) +2. Populate it with your cogstack instance and login details +An example template can be seen below: +``` +hosts = [] # This is a list of your cogstack elasticsearch instances. + +# These are your login details (either via http_auth or API) +username = None +password = None +``` + +__Note__: If these fields are left blank then the user will be prompted to enter the details themselves. + +If you are unsure about the above information please contact your CogStack system administrator. + +## How to build a Search query + +A core component of cogstack is Elasticsearch which is a search engine built on top of Apache Lucene. + +Lucene has a custom query syntax for querying its indexes (Lucene Query Syntax). This query syntax allows for features such as Keyword matching, Wildcard matching, Regular expression, Proximity matching, Range searches. + +Full documentation for this syntax is available as part of Elasticsearch [query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/8.5/query-dsl-query-string-query.html#query-string-syntax). \ No newline at end of file diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py new file mode 100644 index 000000000..37965194f --- /dev/null +++ b/cogstack-es/cogstack.py @@ -0,0 +1,930 @@ +from collections.abc import Mapping +import getpass +import traceback +from typing import Any, Optional, Iterable, Sequence, Union, Protocol +from typing import cast, TYPE_CHECKING +import warnings + + +if TYPE_CHECKING: + from elasticsearch import Elasticsearch as _Elasticsearch + # from opensearchpy import OpenSearch as _OpenSearch + import elasticsearch.helpers + # import opensearchpy.helpers + # ElasticClient = Union[_Elasticsearch, _OpenSearch] + ElasticClient = _Elasticsearch + es_cls = _Elasticsearch + es_helpers = elasticsearch.helpers +else: + try: + from elasticsearch import Elasticsearch as ElasticClient + import elasticsearch.helpers + es_helpers = elasticsearch.helpers + except ImportError: + from opensearchpy import OpenSearch as ElasticClient + import opensearchpy.helpers + es_helpers = opensearchpy.helpers + es_cls = ElasticClient +from IPython.display import display, HTML +import pandas as pd +import tqdm + +warnings.filterwarnings("ignore") + + +class IndicesClientProto(Protocol): + + def get_alias(self, *args: Any, **kwargs: Any) -> Any: + pass + + def get_mapping(self, *args: Any, **kwargs: Any) -> Any: + pass + + +class ESClient(Protocol): + + @property + def indices(self) -> IndicesClientProto: + pass + + def count(self, *args: Any, **kwargs: Any) -> Any: + pass + + def search(self, *args: Any, **kwargs: Any) -> Any: + pass + + def scroll(self, *args: Any, **kwargs: Any) -> Any: + pass + + def clear_scroll(self, *args: Any, **kwargs: Any) -> Any: + pass + + +class CogStack: + """ + A class for interacting with Elasticsearch or OpenSearch. + + Parameters + ------------ + elastic : ESClient + The ElasticSearch or OpenSearch instance. + """ + + ES_TIMEOUT = 300 + + def __init__(self, elastic: ESClient) -> None: + self.elastic = elastic + + @classmethod + def with_basic_auth( + cls, + hosts: list[str], + username: Optional[str] = None, + password: Optional[str] = None, + ) -> "CogStack": + """ + Create an instance of CogStack using basic authentication. + + Parameters + ---------- + hosts : list[str] + A list of Elasticsearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a password. + Returns + ------- + CogStack: An instance of the CogStack class. + """ + elastic = CogStack.get_es_with_basic_auth(hosts, username, password) + return cls(elastic) + + @classmethod + def with_api_key_auth( + cls, hosts: list[str], api_key: Optional[dict] = None + ) -> "CogStack": + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + hosts : list[str] + A list of Elasticsearch host URLs. + apiKey : dict, optional + + API key object with string fields either: + - A: "id" and "api_key" + - B: "encoded" + Generated in Elasticsearch or Kibana and provided by your + CogStack administrator. + + If not provided, the user will be prompted to enter API + key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + Returns + ------- + CogStack: An instance of the CogStack class. + """ + elastic = CogStack.get_es_with_api_key(hosts, api_key) + return cls(elastic) + + @staticmethod + def get_es_with_basic_auth( + hosts: list[str], username: Optional[str] = None, + password: Optional[str] = None + ) -> ESClient: + """ + Create an instance of CogStack using basic authentication. + If the `username` or `password` parameters are not provided, + the user will be prompted to enter them. + + Parameters + ---------- + hosts : list[str] + A list of Elasticsearch or OpenSearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch or OpenSearch. + If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch or OpenSearch. + If not provided, the user will be prompted to enter a password. + + Returns + ------- + ESClient: An instance of the Elasticsearch or OpenSearch. + """ + if username is None: + username = input("Username: ") + if password is None: + password = getpass.getpass("Password: ") + + return CogStack.__connect( + hosts, + basic_auth=(username, password) if username and password else None + ) + + @staticmethod + def get_es_with_api_key(hosts: list[str], + api_key: Optional[dict] = None + ) -> ESClient: + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + hosts : list[str] + A list of Elasticsearch or OpenSearch host URLs. + apiKey : Dict, optional + + API key object with string fields either: + - A: "id" and "api_key" + - B: "encoded" + Generated in Elasticsearch, OpenSearch, or Kibana and provided by + your CogStack administrator. + + If not provided, the user will be prompted to enter API + key "encoded" value. + + If not provided, the user will be prompted to enter API + key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + + Returns + ------- + ESClient: An instance of the Elasticsearch or OpenSearch. + """ + has_encoded_value = False + api_id_value: str + api_key_value: str + + if not api_key: + api_key = {"encoded": input("Encoded API key: ")} + else: + if isinstance(api_key, str): + # If api_key is a string, it is assumed to be + # the encoded API key + encoded = api_key + has_encoded_value = True + elif isinstance(api_key, dict): + # If api_key is a dictionary, check for "encoded", + # "id" and "api_key" keys + if ( + "id" in api_key.keys() + and api_key["id"] != "" + and "api_key" in api_key.keys() + and api_key["api_key"] != "" + ): + # If both "id" and "api_key" are present, use them + encoded = None + else: + # If "encoded" is present, use it; otherwise prompt for it + encoded = ( + api_key["encoded"] + if "encoded" in api_key.keys() and api_key["encoded"] + else input("Encoded API key: ") + ) + has_encoded_value = encoded is not None and encoded != "" + + if not has_encoded_value: + api_id_value = str( + api_key["id"] + if "id" in api_key.keys() and api_key["id"] != "" + else input("API Id: ") + ) + api_key_value = str( + api_key["api_key"] + if "api_key" in api_key.keys() and api_key["api_key"] != "" + else getpass.getpass("API Key: ") + ) + + return CogStack.__connect( + hosts, + api_key=encoded if has_encoded_value else + (api_id_value, api_key_value) + ) + + @staticmethod + def __connect( + hosts: list[str], + basic_auth: Optional[tuple[str, str]] = None, + api_key: Optional[Union[str, tuple[str, str]]] = None, + ) -> ESClient: + """Connect to Elasticsearch or OpenSearch using the credentials. + Parameters + ---------- + hosts : list[str] + A list of Elasticsearch or OpenSearch host URLs. + basic_auth : Tuple[str, str], optional + A tuple containing the username and password for + basic authentication. + api_key : str or Tuple[str, str], optional + The API key or a tuple containing the API key ID and API key + for API key authentication. + Returns + ------- + ESClient: An instance of the Elasticsearch or OpenSearch. + Raises + ------ + Exception: If the connection to Elasticsearch or OpenSearch fails. + """ + elastic = es_cls( + hosts=hosts, + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=CogStack.ES_TIMEOUT, + ) + if not elastic.ping(): + raise ConnectionError( + "CogStack connection failed. " + "Please check your host list and credentials and try again." + ) + print("CogStack connection established successfully.") + return elastic + + def get_indices_and_aliases(self): + """ + Retrieve indices and their aliases + + Returns: + --------- + A table of indices and aliases to use in subsequent queries + """ + all_aliases = self.elastic.indices.get_alias().body + index_aliases_coll = [] + for index in all_aliases: + index_aliases = {} + index_aliases["Index"] = index + aliases = [] + for alias in all_aliases[index]["aliases"]: + aliases.append(alias) + index_aliases["Aliases"] = ", ".join(aliases) + index_aliases_coll.append(index_aliases) + with pd.option_context("display.max_colwidth", None): + return pd.DataFrame( + index_aliases_coll, columns=["Index", "Aliases"]) + + def get_index_fields(self, index: Union[str, Sequence[str]]): + """ + Retrieve indices and their fields with data type + + Parameters + ---------- + index: str | Sequence[str] + Name(s) of indices or aliases for which the list of fields + is retrieved + + Returns + ---------- + pandas.DataFrame + A DataFrame containing index names and their fields with + data types + + Raises + ------ + Exception + If the operation fails for any reason. + """ + try: + if len(index) == 0: + raise ValueError( + "Provide at least one index or index alias name") + all_mappings = self.elastic.indices.get_mapping( + index=index, allow_no_indices=False + ).body + columns = ["Field", "Type"] + if isinstance(index, list): + columns.insert(0, "Index") + index_mappings_coll = [] + for index_name in all_mappings: + for property_name in all_mappings[ + index_name]["mappings"]["properties"]: + index_mapping = {} + index_mapping["Index"] = index_name + index_mapping["Field"] = property_name + index_mapping["Type"] = ( + all_mappings[index_name]["mappings"]["properties"][ + property_name + ]["type"] + if "type" + in all_mappings[index_name]["mappings"]["properties"][ + property_name + ].keys() + else "?" + ) + index_mappings_coll.append(index_mapping) + except Exception as err: + raise Exception( + "Unexpected issue while getting index fields") from err + with pd.option_context( + "display.max_rows", len(index_mappings_coll) + 1): + return display(pd.DataFrame( + data=index_mappings_coll, columns=columns)) + + def count_search_results(self, index: Union[str, Sequence[str]], + query: dict): + """ + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their + aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + """ + if len(index) == 0: + raise ValueError("Provide at least one index or index alias name") + query = self.__extract_query(query=query) + count = self.elastic.count(index=index, query=query, + allow_no_indices=False)["count"] + return f"Number of documents: {format(count, ',')}" + + def read_data_with_scan( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: int = 1000, + request_timeout: int = ES_TIMEOUT, + show_progress: bool = True, + ): + """ + Retrieve documents from an Elasticsearch or OpenSearch index or + indices using search query and elasticsearch or OpenSearch scan helper + function. The function converts search results to a Pandas DataFrame + and does not return current scroll id if the process fails. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be used in the search + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + (preferred method to avoid clashing with other parameters) + + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will + be included. Columns _index, _id, _score + are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or + scroll API during each iteration. + MAX: 10,000. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response + from Elasticsearch or OpenSearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + """ + pr_bar: Optional[tqdm.tqdm] = None + try: + if len(index) == 0: + raise ValueError( + "Provide at least one index or index alias name") + self.__validate_size(size=size) + if "query" not in query.keys(): + temp_query = query.copy() + query.clear() + query["query"] = temp_query + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", + disable=not show_progress, colour="green" + ) + + scan_results = es_helpers.scan( + cast(es_cls, self.elastic), + index=index, + query=query, + size=size, + request_timeout=request_timeout, + source=False, + fields=include_fields, + allow_no_indices=False, + ) + all_mapped_results = [] + pr_bar.iterable = scan_results + pr_bar.total = self.elastic.count( + index=index, query=query["query"])["count"] + all_mapped_results = self.__map_search_results(hits=pr_bar) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) + print("Request cancelled and current " + "search_scroll_id deleted...") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) + print( + Exception( + f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}") + ) + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_scroll( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: int = 1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int] = ES_TIMEOUT, + show_progress: Optional[bool] = True, + ): + """ + Retrieves documents from an Elasticsearch or OpenSearch index using + search query and scroll API. Default scroll timeout is set to 10 + minutes. The function converts search results to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key + and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields + will be included. + Columns _index, _id, _score are present + in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query + or scroll API during each iteration. + MAX: 10,000. + search_scroll_id : str, optional + The value of the last scroll_id + returned by scroll API and used to continue the search + if the current search fails. + The value of scroll_id + times out after 10 minutes. + After which the search will have to be restarted. + Note: Absence of this parameter indicates + a new search. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from + Elasticsearch or OpenSearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + IMPORTANT: The progress bar displays + the total hits for the query even if continuing the + search using `search_scroll_id`. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + If the search fails, error message includes the value of + current `search_scroll_id` which can be used as a function + parameter to continue the search. IMPORTANT: + If the function fails after `scroll` request, the subsequent + request will skip results of the failed scroll by the value + of `size` parameter. + """ + try: + if len(index) == 0: + raise ValueError( + "Provide at least one index or index alias name") + self.__validate_size(size=size) + query = self.__extract_query(query=query) + result_count = size + all_mapped_results = [] + search_result = None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = ( + [{"field": field} for field in include_fields] + if include_fields is not None + else None + ) + + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", + disable=not show_progress, colour="green") + + if search_scroll_id is None: + search_result = self.elastic.search( + index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + scroll="10m", + timeout=f"{request_timeout}s", + allow_no_indices=False, + rest_total_hits_as_int=True, + ) + + pr_bar.total = search_result.body["hits"]["total"] + hits = search_result.body["hits"]["hits"] + result_count = len(hits) + search_scroll_id = search_result.body["_scroll_id"] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + pr_bar.update(len(hits)) + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) + + while search_scroll_id and result_count == size: + # Perform ES scroll request + search_result = self.elastic.scroll( + scroll_id=search_scroll_id, + scroll="10m", + rest_total_hits_as_int=True, + ) + hits = search_result.body["hits"]["hits"] + pr_bar.total = ( + pr_bar.total + if pr_bar.total + else search_result.body["hits"]["total"] + ) + all_mapped_results.extend(self.__map_search_results(hits=hits)) + search_scroll_id = search_result.body["_scroll_id"] + result_count = len(hits) + pr_bar.update(result_count) + self.elastic.clear_scroll(scroll_id=search_scroll_id) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) + self.elastic.clear_scroll(scroll_id=search_scroll_id) + print("Request cancelled and current " + "search_scroll_id deleted...") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) + print( + Exception( + f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}"), + f"{search_scroll_id=}", + sep="\n", + ) + + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_sorting( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: Optional[int] = 1000, + sort: Optional[Union[dict, list[str]]] = None, + search_after: Optional[list[Union[str, int, float, Any, None]]] = None, + request_timeout: Optional[int] = ES_TIMEOUT, + show_progress: Optional[bool] = True, + ): + """ + Retrieve documents from an Elasticsearch or OpenSearch index using + search query and convert them to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score + fields will be included. + Columns _index, _id, _score are + present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query + or scroll API during each iteration. + MAX: 10,000. + sort : dict|list[str], optional, default = {"id": "asc"} + Sort field name(s) and order (`asc` or `desc`) + in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a + tiebreaker field. Default sorting order is `asc` + Example: + - `dict : {"filed_Name" : "desc", "id" : "asc"}` + - `list : ["filed_Name", "id"]` + search_after : list[str|int|float|Any|None], optional + The sort value of the last record in search results. + Can be provided if the a search fails and needs to + be restarted from the last successful search. + Use the value of `search_after_value` from the + error message + request_timeout : int, optional, default = 300 + The time in seconds to wait for a response from + Elasticsearch or OpenSearch before timing out. + show_progress : bool, optional + Whether to show the progress in console. Defaults to true. + + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + Error message includes the value of current + `search_after_value` which can be used as a function + parameter to continue the search. + """ + try: + if len(index) == 0: + raise ValueError( + "Provide at least one index or index alias name") + result_count = size + all_mapped_results = [] + if sort is None: + sort = {"id": "asc"} + search_after_value = search_after + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = ( + [{"field": field} for field in include_fields] + if include_fields is not None + else None + ) + + self.__validate_size(size=size) + query = self.__extract_query(query=query) + + if (isinstance(sort, dict) and "id" not in sort.keys()) or ( + isinstance(sort, list) and "id" not in sort + ): + if isinstance(sort, dict): + sort["id"] = "asc" + else: + sort.append("id") + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", + disable=not show_progress, colour="green") + + while result_count == size: + search_result = self.elastic.search( + index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + sort=sort, + search_after=search_after_value, + timeout=f"{request_timeout}s", + track_scores=True, + track_total_hits=True, + allow_no_indices=False, + rest_total_hits_as_int=True, + ) + hits = search_result["hits"]["hits"] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + result_count = len(hits) + pr_bar.update(result_count) + search_after_value = hits[-1]["sort"] + pr_bar.total = ( + pr_bar.total + if pr_bar.total + else search_result.body["hits"]["total"] + ) + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) + print("Request cancelled.") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) + print(f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}") + print(f"The last {search_after_value=}") + + return self.__create_dataframe(all_mapped_results, include_fields) + + def __extract_query(self, query: dict): + if "query" in query.keys(): + return query["query"] + return query + + def __validate_size(self, size): + if size > 10000: + raise ValueError("Size must not be greater than 10000") + + def __map_search_results(self, hits: Iterable): + hit: dict + for hit in hits: + row = dict() + row["_index"] = hit["_index"] + row["_id"] = hit["_id"] + row["_score"] = hit["_score"] + if "fields" in hit.keys(): + row.update( + {k: ", ".join(map(str, v)) for + k, v in dict(hit["fields"]).items()} + ) + yield row + + def __create_dataframe(self, all_mapped_results, column_headers): + """ + Create a Pandas DataFrame from the search results. + + Parameters + ---------- + all_mapped_results : list + The list of mapped search results. + column_headers : list or None + The list of column headers to include in the DataFrame. + + Returns + ------- + pandas.DataFrame + A DataFrame containing the search results. + """ + df_headers = ["_index", "_id", "_score"] + if column_headers and "*" not in column_headers: + df_headers.extend(column_headers) + return pd.DataFrame(data=all_mapped_results, columns=df_headers) + return pd.DataFrame(data=all_mapped_results) + + +def print_dataframe(df: pd.DataFrame, separator: str = "\\n"): + """ + Replace separator string with HTML + <br/> tag for printing in Notebook + + Parameters: + ----------- + df : DataFrame + Input DataFrame + separator : str + Separator to be replaced with HTML <br/> + """ + return display(HTML(df.to_html().replace(separator, "
"))) + + +def list_chunker(user_list: list[Any], n: int) -> list[list[Any]]: + """ + Divide a list into sublists of a specified size. + + Parameters: + ---------- + user_list : list[Any] + The list to be divided. + n : int + The size of the sublists. + + Returns: + -------- + list[list[Any]]: A list of sublists containing the elements of + the input list. + """ + n = max(1, n) + return [user_list[i: i + n] for i in range(0, len(user_list), n)] diff --git a/cogstack-es/credentials.py b/cogstack-es/credentials.py new file mode 100644 index 000000000..9e2fa2d7c --- /dev/null +++ b/cogstack-es/credentials.py @@ -0,0 +1,24 @@ +from typing import List +# CogStack login details +# Any questions on what these details are please contact +# your local CogStack administrator. + +hosts: List[str] = [ + # "https://cogstack-es-1:9200", + # # This is an example of a CogStack ElasticSearch instance. + ] # This is a list of your CogStack ElasticSearch instances. + +# These are your login details (either via http_auth or API) Should be in +# string format +username = None +password = None +# If you are using API key authentication +# Use either "id" and "api_key" or "encoded" field, or both. +api_key = { + # This is the API key id issued by your cogstack administrator. + "id": "", + # This is the api key issued by your cogstack administrator. + "api_key": "", + # This is the encoded api key issued by your cogstack administrator. + "encoded": "", +} diff --git a/cogstack-es/data/.keep b/cogstack-es/data/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/cogstack-es/requirements-dev.txt b/cogstack-es/requirements-dev.txt new file mode 100644 index 000000000..6896deb36 --- /dev/null +++ b/cogstack-es/requirements-dev.txt @@ -0,0 +1,6 @@ +mypy +pandas-stubs +types-tqdm +pytest +ruff +nbconvert diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt new file mode 100644 index 000000000..588e322b1 --- /dev/null +++ b/cogstack-es/requirements.txt @@ -0,0 +1,5 @@ +elasticsearch>=9.0.0,<10.0 +opensearch-py>=2.0.0 +ipython +tqdm>=4.64,<5.0 +pandas>=2.2,<3.0 diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb new file mode 100644 index 000000000..8975e4f61 --- /dev/null +++ b/cogstack-es/search_template.ipynb @@ -0,0 +1,294 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Seaching CogStack\n", + "\n", + "This script is designed to be a template for cogstack searches" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from credentials import hosts, api_key, username, password\n", + "from cogstack import CogStack, print_dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Login and Initialise" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cs = CogStack.with_api_key_auth(hosts=hosts, api_key=api_key)\n", + "#cs = CogStack.with_basic_auth(hosts=hosts, username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(api_key=api_key)\n", + "#cs = CogStack(hosts).use_basic_auth(username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(\"\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Check the list of Indices and columns" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View all indices and their aliases available to this user. Either index names or their aliases can be used to extract data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print_dataframe(cs.get_indices_and_aliases(), ', ')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View fields/columns and their data types for provided index names or aliases" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "indices = [\"example_index\"] # <- CHANGE THIS\n", + "cs.get_index_fields(indices)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Set search query parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pt_list = [ ] # example list of patients' patient_TrustNumber here" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Columns of interest\n", + "\n", + "Select your fields and list in order of output columns" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "columns = []" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build query\n", + "\n", + "For further information on [how to build a query can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html)\n", + "\n", + "Further information on [free text string queries can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-simple-query-string-query.html)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "search_query = {\n", + " \"bool\": {\n", + " #\"filter\": {\n", + " # \"terms\": {\n", + " # \"patient_TrustNumber\": pt_list\n", + " # }\n", + " #},\n", + " \"must\": [\n", + " {\n", + " \"query_string\": {\n", + " \"query\": \"\",\n", + " \"default_field\":\"\"\n", + " }\n", + " }\n", + " ]\n", + " }\n", + "}\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Count the number of documents matching the search query\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "cs.count_search_results(index=example_indices, query=search_query)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "# Search, Process, and Save\n", + "Use either of the functions to extract search results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data using scan helper function.\n", + "# Does not provide a scroll id, so cannot be resumed if search fails midway.\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_scan(index=example_indices, query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with scroll API and get scroll id if search fails midway. \n", + "# Can be used to resume the search from the failed scroll id.\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_scroll(index=example_indices, query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with sorting and get search_after value if search fails midway.\n", + "# Can be used to resume the search from the failed search_after value but can be slower than scan or scroll methods for large datasets.\n", + "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_sorting(index=example_indices, query=search_query, \n", + " include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Process" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Whatever you want here\n", + "# For example, display the first few rows of the DataFrame\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Save" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Save the DataFrame to a CSV file\n", + "path_to_results = \"data/cogstack_search_results\"\n", + "file_name = \"file_name.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df.to_csv(path_to_results + '\\\\' + file_name, index=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + }, + "vscode": { + "interpreter": { + "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/cogstack-es/tests/__init__.py b/cogstack-es/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cogstack-es/tests/test_cogstack.py b/cogstack-es/tests/test_cogstack.py new file mode 100644 index 000000000..e025f1f61 --- /dev/null +++ b/cogstack-es/tests/test_cogstack.py @@ -0,0 +1,87 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +import pandas as pd +from cogstack import CogStack + + +@pytest.fixture +def mock_elasticsearch(): + """Fixture to mock Elasticsearch client""" + with patch('cogstack.es_cls') as mock_es: + mock_client = Mock() + mock_es.return_value = mock_client + mock_client.ping.return_value = True + print("With Mock ES...") + yield mock_es, mock_client + print("DONE WITH MOCK ES") + + +def test_basic_auth_connection(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test basic authentication""" + dunder_init, _ = mock_elasticsearch + with patch('builtins.input', return_value='test_user'): + with patch('getpass.getpass', return_value='test_pass'): + cs = CogStack.with_basic_auth(['http://localhost:9200']) + assert isinstance(cs, CogStack) + assert cs.elastic.ping() + dunder_init.assert_called_once() + + +def test_api_key_auth_connection(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test API key authentication""" + dunder_init, _ = mock_elasticsearch + api_key = {"encoded": "test_encoded_key"} + cs = CogStack.with_api_key_auth(['http://localhost:9200'], api_key) + assert isinstance(cs, CogStack) + assert cs.elastic.ping() + dunder_init.assert_called_once() + + +def test_count_search_results(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test count_search_results method""" + # Mock the count response + _, mock_inst = mock_elasticsearch + mock_inst.count.return_value = {'count': 100} + + cs = CogStack(['http://localhost:9200']) + cs.elastic = mock_inst + + query = {"match": {"title": "test"}} + result = cs.count_search_results('test_index', query) + + assert "100" in result + mock_inst.count.assert_called_once() + + +def test_read_data_with_scan(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test read_data_with_scan method""" + _, mock_inst = mock_elasticsearch + # Mock scan results + mock_hits = MagicMock() + mock_hits.__iter__.return_value = [ + {'_index': 'test_index', '_id': '1', '_score': 1.0, + 'fields': {'title': 'test1'}}, + {'_index': 'test_index', '_id': '2', '_score': 0.8, + 'fields': {'title': 'test2'}} + ] + + # Mock scan helper + with patch('cogstack.es_helpers.scan') as mock_scan, \ + patch('cogstack.tqdm.tqdm') as mock_tqdm: + + mock_scan.return_value = mock_hits + mock_tqdm.return_value = mock_hits # Make tqdm iterable + mock_tqdm.total = 2 + + # Mock count for progress bar + mock_inst.count.return_value = {'count': 2} + + cs = CogStack(['http://localhost:9200']) + cs.elastic = mock_inst + + query = {"query": {"match": {"title": "test"}}} + result = cs.read_data_with_scan('test_index', query, ['title']) + + assert isinstance(result, pd.DataFrame) + assert len(result) == 2 + assert 'title' in result.columns diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py new file mode 100644 index 000000000..b5ffa3fbf --- /dev/null +++ b/cogstack-es/tests/test_search_template_nb.py @@ -0,0 +1,85 @@ +import nbformat +from nbconvert import PythonExporter +from unittest.mock import Mock, patch, MagicMock +from contextlib import contextmanager +import tempfile +import os +import pytest + + +EXPECTED_TEMP_FILE_PATH = "data/cogstack_search_results\\file_name.csv" + + +@contextmanager +def all_mocked(python_code: str): + with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: + temp_file.write(python_code) + with patch('cogstack.es_cls') as mock_es: + with patch('credentials.hosts', ['http://localhost:9200']): + with patch('credentials.api_key', {"encoded": "test_api_key"}): + with patch('elasticsearch.helpers.scan') as mock_scan: + with patch('tqdm.tqdm') as mock_tqdm: + yield (temp_file.name, mock_es, + mock_scan, mock_tqdm) + + +def setup_mocks(mock_es: MagicMock, mock_scan: MagicMock, mock_tqdm: MagicMock): + # Setup mocks + mock_client = Mock() + mock_es.return_value = mock_client + mock_client.ping.return_value = True + + mock_aliases = Mock() + mock_aliases.body = { + 'index1': {'aliases': {'alias1': {}}} + } + mock_mapping = Mock() + mock_mapping.body = { + 'index1': {'mappings': {'properties': {}}} + } + # Mock Elasticsearch responses + mock_client.indices.get_alias.return_value = mock_aliases + mock_client.indices.get_mapping.return_value = mock_mapping + mock_client.count.return_value = {'count': 10} + + # Mock scan results + mock_hits = MagicMock() + mock_hits.__iter__.return_value = [{ + '_index': 'test', '_id': '1', '_score': 1.0, + 'fields': {'test_field': ['value']} + }] + mock_scan.return_value = mock_hits + mock_tqdm.return_value = mock_hits + mock_tqdm.total = 1 + + +@pytest.fixture +def temp_file_remover(): + yield + if os.path.exists(EXPECTED_TEMP_FILE_PATH): + os.remove(EXPECTED_TEMP_FILE_PATH) + + +def test_notebook_execution(temp_file_remover): + """Execute the notebook with mocked dependencies""" + + # Read the notebook + notebook_path = 'search_template.ipynb' + with open(notebook_path, 'r') as f: + notebook = nbformat.read(f, as_version=4) + + # Convert to Python code + exporter = PythonExporter() + python_code, _ = exporter.from_notebook_node(notebook) + + # Mock all the dependencies + with all_mocked(python_code) as (temp_code_path, mock_es, + mock_scan, mock_tqdm): + setup_mocks(mock_es, mock_scan, mock_tqdm) + + # Execute the notebook code + exec(python_code, { + '__file__': temp_code_path, + '__name__': '__main__' + }) + assert os.path.exists(EXPECTED_TEMP_FILE_PATH)