From 8af57d8bc5b1b28526a13f755c1a9e1f2a340cb2 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:49:43 +0100 Subject: [PATCH 01/40] CU-869aa22g2: Add ES requirement --- cogstack-es/requirements.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 cogstack-es/requirements.txt diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt new file mode 100644 index 00000000..ca089377 --- /dev/null +++ b/cogstack-es/requirements.txt @@ -0,0 +1 @@ +elasticsearch>=9.0.0,<10.0 From 64f95ad9ba48a5883b4015cda6bdb05f0c6818b1 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:49:55 +0100 Subject: [PATCH 02/40] CU-869aa22g2: Add initial README --- cogstack-es/ReadMe.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 cogstack-es/ReadMe.md diff --git a/cogstack-es/ReadMe.md b/cogstack-es/ReadMe.md new file mode 100644 index 00000000..7f63dbf9 --- /dev/null +++ b/cogstack-es/ReadMe.md @@ -0,0 +1,27 @@ + +# Login and search +This project is responsible for logging in and performing a search. + +## Login details +1. Create a [credentials.py](credentials.py) +2. Populate it with your cogstack instance and login details +An example template can be seen below: +``` +hosts = [] # This is a list of your cogstack elasticsearch instances. + +# These are your login details (either via http_auth or API) +username = None +password = None +``` + +__Note__: If these fields are left blank then the user will be prompted to enter the details themselves. + +If you are unsure about the above information please contact your CogStack system administrator. + +## How to build a Search query + +A core component of cogstack is Elasticsearch which is a search engine built on top of Apache Lucene. + +Lucene has a custom query syntax for querying its indexes (Lucene Query Syntax). This query syntax allows for features such as Keyword matching, Wildcard matching, Regular expression, Proximity matching, Range searches. + +Full documentation for this syntax is available as part of Elasticsearch [query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/8.5/query-dsl-query-string-query.html#query-string-syntax). \ No newline at end of file From fcf4d960a010e69f14ab58d6c375f52279906561 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:50:17 +0100 Subject: [PATCH 03/40] CU-869aa22g2: Add (slightly) converted credentials --- cogstack-es/credentials.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 cogstack-es/credentials.py diff --git a/cogstack-es/credentials.py b/cogstack-es/credentials.py new file mode 100644 index 00000000..9e2fa2d7 --- /dev/null +++ b/cogstack-es/credentials.py @@ -0,0 +1,24 @@ +from typing import List +# CogStack login details +# Any questions on what these details are please contact +# your local CogStack administrator. + +hosts: List[str] = [ + # "https://cogstack-es-1:9200", + # # This is an example of a CogStack ElasticSearch instance. + ] # This is a list of your CogStack ElasticSearch instances. + +# These are your login details (either via http_auth or API) Should be in +# string format +username = None +password = None +# If you are using API key authentication +# Use either "id" and "api_key" or "encoded" field, or both. +api_key = { + # This is the API key id issued by your cogstack administrator. + "id": "", + # This is the api key issued by your cogstack administrator. + "api_key": "", + # This is the encoded api key issued by your cogstack administrator. + "encoded": "", +} From bfdcf82bd770189cfa1b4c09b14386b43caf2233 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:51:34 +0100 Subject: [PATCH 04/40] CU-869aa22g2: Add cogstack module ported from WWC --- cogstack-es/cogstack.py | 709 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 709 insertions(+) create mode 100644 cogstack-es/cogstack.py diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py new file mode 100644 index 00000000..151986f6 --- /dev/null +++ b/cogstack-es/cogstack.py @@ -0,0 +1,709 @@ +from collections.abc import Mapping +import getpass +import traceback +from typing import Dict, List, Any, Optional, Iterable, Sequence, Union +import warnings +import elasticsearch +import elasticsearch.helpers as es_helpers +from IPython.display import display, HTML +import pandas as pd +import tqdm + +warnings.filterwarnings("ignore") + +class CogStack(): + """ + A class for interacting with Elasticsearch. + + Parameters + ------------ + hosts : List[str] + A list of Elasticsearch host URLs. + """ + ES_TIMEOUT = 300 + + def __init__(self, hosts: List[str]): + self.hosts = hosts + self.elastic: elasticsearch.Elasticsearch + + @classmethod + def with_basic_auth(cls, + hosts: List[str], + username: Optional[str] = None, + password: Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a password. + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_basic_auth(username, password) + return cs + + @classmethod + def with_api_key_auth(cls, + hosts: List[str], + api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + apiKey : Dict, optional + + API key object with "id" and "api_key" or "encoded" strings as fields. + Generated in Elasticsearch or Kibana and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_api_key_auth(api_key) + return cs + + def use_basic_auth(self, + username: Optional[str] = None, + password:Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + If the `username` or `password` parameters are not provided, + the user will be prompted to enter them. + + Parameters + ---------- + username : str, optional + The username to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a password. + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + if username is None: + username = input("Username: ") + if password is None: + password = getpass.getpass("Password: ") + + return self.__connect(basic_auth=(username, password) if username and password else None) + + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + apiKey : Dict, optional + + API key object with "id" and "api_key" or "encoded" strings as fields. + Generated in Elasticsearch or Kibana and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + has_encoded_value = False + api_id_value:str + api_key_value:str + + if not api_key: + api_key = {"encoded": input("Encoded API key: ")} + else: + if isinstance(api_key, str): + # If api_key is a string, it is assumed to be the encoded API key + encoded = api_key + has_encoded_value = True + elif isinstance(api_key, Dict): + # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys + if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() \ + and api_key["api_key"] != '': + # If both "id" and "api_key" are present, use them + encoded = None + else: + # If "encoded" is present, use it; otherwise prompt for it + encoded = api_key["encoded"] \ + if "encoded" in api_key.keys() and api_key["encoded"] != '' \ + else input("Encoded API key: ") + has_encoded_value = encoded is not None and encoded != '' + + if(not has_encoded_value): + api_id_value = str(api_key["id"] \ + if "id" in api_key.keys() and api_key["id"] != '' \ + else input("API Id: ")) + api_key_value = str(api_key["api_key"] \ + if "api_key" in api_key.keys() and api_key["api_key"] != '' \ + else getpass.getpass("API Key: ")) + + return self.__connect(api_key=encoded if has_encoded_value else (api_id_value, api_key_value)) + + def __connect(self, + basic_auth : Optional[tuple[str,str]] = None, + api_key: Optional[Union[str, tuple[str, str]]] = None) -> 'CogStack': + """ Connect to Elasticsearch using the provided credentials. + Parameters + ---------- + basic_auth : Tuple[str, str], optional + A tuple containing the username and password for basic authentication. + api_key : str or Tuple[str, str], optional + The API key or a tuple containing the API key ID and API key + for API key authentication. + Returns + ------- + CogStack: An instance of the CogStack class. + Raises + ------ + Exception: If the connection to Elasticsearch fails. + """ + self.elastic = elasticsearch.Elasticsearch(hosts=self.hosts, + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=self.ES_TIMEOUT) + if not self.elastic.ping(): + raise ConnectionError("CogStack connection failed. " \ + "Please check your host list and credentials and try again.") + print("CogStack connection established successfully.") + return self + + def get_indices_and_aliases(self): + """ + Retrieve indices and their aliases + + Returns: + --------- + A table of indices and aliases to use in subsequent queries + """ + all_aliases = self.elastic.indices.get_alias().body + index_aliases_coll = [] + for index in all_aliases: + index_aliases = {} + index_aliases['Index'] = index + aliases=[] + for alias in all_aliases[index]['aliases']: + aliases.append(alias) + index_aliases['Aliases'] = ', '.join(aliases) + index_aliases_coll.append(index_aliases) + with pd.option_context('display.max_colwidth', None): + return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) + + def get_index_fields(self, index: Union[str, Sequence[str]]): + """ + Retrieve indices and their fields with data type + + Parameters + ---------- + index: str | Sequence[str] + Name(s) of indices or aliases for which the list of fields is retrieved + + Returns + ---------- + pandas.DataFrame + A DataFrame containing index names and their fields with data types + + Raises + ------ + Exception + If the operation fails for any reason. + """ + try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') + all_mappings = self.elastic.indices\ + .get_mapping(index=index, allow_no_indices=False).body + columns= ['Field', 'Type'] + if isinstance(index, List): + columns.insert(0, 'Index') + index_mappings_coll = [] + for index_name in all_mappings: + for property_name in all_mappings[index_name]['mappings']['properties']: + index_mapping = {} + index_mapping['Index'] = index_name + index_mapping['Field'] = property_name + index_mapping['Type'] = \ + all_mappings[index_name]['mappings']['properties'][property_name]['type'] \ + if "type" in all_mappings[index_name]['mappings']\ + ['properties'][property_name].keys() \ + else '?' + index_mappings_coll.append(index_mapping) + except Exception as err: + raise Exception(f"Unexpected {err=}, {type(err)=}") + with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): + return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) + + def count_search_results(self, index: Union[str, Sequence[str]], query: dict): + """ + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + """ + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') + query = self.__extract_query(query=query) + count = self.elastic.count(index=index, query=query, allow_no_indices=False)['count'] + return f"Number of documents: {format(count, ',')}" + + def read_data_with_scan(self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]]=None, + size: int=1000, + request_timeout: int=ES_TIMEOUT, + show_progress: bool = True): + """ + Retrieve documents from an Elasticsearch index or + indices using search query and elasticsearch scan helper function. + The function converts search results to a Pandas DataFrame and does + not return current scroll id if the process fails. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be used in the search + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + (preferred method to avoid clashing with other parameters) + + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll + API during each iteration. MAX: 10,000. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response + from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + """ + try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') + self.__validate_size(size=size) + if "query" not in query.keys(): + temp_query = query.copy() + query.clear() + query["query"] = temp_query + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, colour='green') + + scan_results = es_helpers.scan(self.elastic, + index=index, + query=query, + size=size, + request_timeout=request_timeout, + source=False, + fields = include_fields, + allow_no_indices=False,) + all_mapped_results = [] + pr_bar.iterable = scan_results + pr_bar.total = self.elastic.count(index=index, query=query["query"])["count"] + all_mapped_results = self.__map_search_results(hits=pr_bar) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format ="%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled and current search_scroll_id deleted...") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", + "\033[0;31m", + "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_scroll(self, + index: Union[str, Sequence[str]], + query: dict, + include_fields:Optional[list[str]]=None, + size: int=1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int]=ES_TIMEOUT, + show_progress: Optional[bool] = True): + """ + Retrieves documents from an Elasticsearch index using search query and scroll API. + Default scroll timeout is set to 10 minutes. + The function converts search results to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key + and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query + or scroll API during each iteration. + MAX: 10,000. + search_scroll_id : str, optional + The value of the last scroll_id + returned by scroll API and used to continue the search + if the current search fails. + The value of scroll_id + times out after 10 minutes. + After which the search will have to be restarted. + Note: Absence of this parameter indicates a new search. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + IMPORTANT: The progress bar displays the total hits + for the query even if continuing the search using `search_scroll_id`. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + If the search fails, error message includes the value of current `search_scroll_id` + which can be used as a function parameter to continue the search. + IMPORTANT: If the function fails after `scroll` request, + the subsequent request will skip results of the failed scroll by the + value of `size` parameter. + """ + try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') + self.__validate_size(size=size) + query = self.__extract_query(query=query) + result_count = size + all_mapped_results =[] + search_result=None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ + [{"field": field} for field in include_fields] if include_fields is not None else None + + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, colour='green') + + if search_scroll_id is None: + search_result = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + scroll="10m", + timeout=f"{request_timeout}s", + allow_no_indices=False, + rest_total_hits_as_int=True) + + pr_bar.total = search_result.body['hits']['total'] + hits = search_result.body['hits']['hits'] + result_count = len(hits) + search_scroll_id = search_result.body['_scroll_id'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + pr_bar.update(len(hits)) + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) + + while search_scroll_id and result_count == size: + # Perform ES scroll request + search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", + rest_total_hits_as_int=True) + hits = search_result.body['hits']['hits'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + search_scroll_id = search_result.body['_scroll_id'] + result_count = len(hits) + pr_bar.update(result_count) + self.elastic.clear_scroll(scroll_id = search_scroll_id) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + self.elastic.clear_scroll(scroll_id = search_scroll_id) + print("Request cancelled and current search_scroll_id deleted...") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') + + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_sorting(self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]]=None, + size: Optional[int]=1000, + sort: Optional[Union[dict,list[str]]] = None, + search_after: Optional[list[Union[str,int,float,Any,None]]] = None, + request_timeout: Optional[int]=ES_TIMEOUT, + show_progress: Optional[bool] = True): + """ + Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + sort : dict|list[str], optional, default = {"id": "asc"} + Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. + Default sorting order is `asc` + >Example: + - `dict : {"filed_Name" : "desc", "id" : "asc"}` + - `list : ["filed_Name", "id"]` + search_after : list[str|int|float|Any|None], optional + The sort value of the last record in search results. + Can be provided if the a search fails and needs to be restarted from the last successful search. + Use the value of `search_after_value` from the error message + request_timeout : int, optional, default = 300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional + Whether to show the progress in console. Defaults to true. + + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + Error message includes the value of current `search_after_value` + which can be used as a function parameter to continue the search. + """ + try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') + result_count = size + all_mapped_results =[] + if sort is None: + sort = {'id': 'asc'} + search_after_value = search_after + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ + [{"field": field} for field in include_fields] if include_fields is not None else None + + self.__validate_size(size=size) + query = self.__extract_query(query=query) + + if ((isinstance(sort, dict) and 'id' not in sort.keys()) + or (isinstance(sort, list) and 'id' not in sort)): + if isinstance(sort, dict): + sort['id'] = 'asc' + else: + sort.append('id') + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, + colour='green') + + while result_count == size: + search_result = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + sort=sort, + search_after=search_after_value, + timeout=f"{request_timeout}s", + track_scores=True, + track_total_hits=True, + allow_no_indices=False, + rest_total_hits_as_int=True) + hits = search_result['hits']['hits'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + result_count = len(hits) + pr_bar.update(result_count) + search_after_value = hits[-1]['sort'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled.") + else: + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", + "\033[0;31m", + "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") + print(f"The last {search_after_value=}") + + return self.__create_dataframe(all_mapped_results, include_fields) + + def __extract_query(self, query: dict): + if "query" in query.keys(): + return query['query'] + return query + + def __validate_size(self, size): + if size > 10000: + raise ValueError('Size must not be greater than 10000') + + def __map_search_results(self, hits: Iterable): + hit: dict + for hit in hits: + row = dict() + row['_index'] = hit['_index'] + row['_id'] = hit['_id'] + row['_score'] = hit['_score'] + if 'fields' in hit.keys(): + row.update({k: ', '.join(map(str, v)) for k,v in dict(hit['fields']).items()}) + yield row + + def __create_dataframe(self, all_mapped_results, column_headers): + """ + Create a Pandas DataFrame from the search results. + + Parameters + ---------- + all_mapped_results : list + The list of mapped search results. + column_headers : list or None + The list of column headers to include in the DataFrame. + + Returns + ------- + pandas.DataFrame + A DataFrame containing the search results. + """ + df_headers = ['_index', '_id', '_score'] + if column_headers and "*" not in column_headers: + df_headers.extend(column_headers) + return pd.DataFrame(data=all_mapped_results, columns=df_headers) + return pd.DataFrame(data=all_mapped_results) + +def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): + """ + Replace separator string with HTML <br/> + tag for printing in Notebook + + Parameters: + ----------- + df : DataFrame + Input DataFrame + separator : str + Separator to be replaced with HTML <br/> + """ + return display(HTML(df.to_html().replace(separator, '
'))) + +def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: + """ + Divide a list into sublists of a specified size. + + Parameters: + ---------- + user_list : List[Any] + The list to be divided. + n : int + The size of the sublists. + + Returns: + -------- + List[List[Any]]: A list of sublists containing the elements of the input list. + """ + n=max(1, n) + return [user_list[i:i+n] for i in range(0, len(user_list), n)] From 2332a637d569c0c7a6a4a329922d2da608a31c96 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:51:51 +0100 Subject: [PATCH 05/40] CU-869aa22g2: Add search template notebook --- cogstack-es/search_template.ipynb | 285 ++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 cogstack-es/search_template.ipynb diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb new file mode 100644 index 00000000..c442f0a6 --- /dev/null +++ b/cogstack-es/search_template.ipynb @@ -0,0 +1,285 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Seaching CogStack\n", + "\n", + "This script is designed to be a template for cogstack searches" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append('..')\n", + "import pandas as pd\n", + "from credentials import *\n", + "from cogstack2 import CogStack, print_dataframe" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Login and Initialise" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cs = CogStack.with_api_key_auth(hosts=hosts, api_key=api_key)\n", + "#cs = CogStack.with_basic_auth(hosts=hosts, username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(api_key=api_key)\n", + "#cs = CogStack(hosts).use_basic_auth(username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(\"\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Check the list of Indices and columns" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View all indices and their aliases available to this user. Either index names or their aliases can be used to extract data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print_dataframe(cs.get_indices_and_aliases(), ', ')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View fields/columns and their data types for provided index names or aliases" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cs.get_index_fields([])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Set search query parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pt_list = [ ] # example list of patients' patient_TrustNumber here" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Columns of interest\n", + "\n", + "Select your fields and list in order of output columns" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "columns = []" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build query\n", + "\n", + "For further information on [how to build a query can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html)\n", + "\n", + "Further information on [free text string queries can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-simple-query-string-query.html)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "search_query = {\n", + " \"bool\": {\n", + " #\"filter\": {\n", + " # \"terms\": {\n", + " # \"patient_TrustNumber\": pt_list\n", + " # }\n", + " #},\n", + " \"must\": [\n", + " {\n", + " \"query_string\": {\n", + " \"query\": \"\",\n", + " \"default_field\":\"\"\n", + " }\n", + " }\n", + " ]\n", + " }\n", + "}\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Count the number of documents matching the search query\n", + "cs.count_search_results(index=[], query=search_query)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "# Search, Process, and Save\n", + "Use either of the functions to extract search results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data using scan helper function.\n", + "# Does not provide a scroll id, so cannot be resumed if search fails midway.\n", + "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with scroll API and get scroll id if search fails midway. \n", + "# Can be used to resume the search from the failed scroll id.\n", + "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with sorting and get search_after value if search fails midway.\n", + "# Can be used to resume the search from the failed search_after value but can be slower than scan or scroll methods for large datasets.\n", + "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", + "df = cs.read_data_with_sorting(index=[], query=search_query, \n", + " include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Process" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Whatever you want here\n", + "# For example, display the first few rows of the DataFrame\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Save" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Save the DataFrame to a CSV file\n", + "path_to_results = \"../data/cogstack_search_results\"\n", + "file_name = \"file_name.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df.to_csv(path_to_results + '\\\\' + file_name, index=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + }, + "vscode": { + "interpreter": { + "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From 4dc92aa719b21fbf8622383b6a332af6c8e6d2c6 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:55:09 +0100 Subject: [PATCH 06/40] CU-869aa22g2: Add ipython dependency (for cogstack and notebook) --- cogstack-es/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt index ca089377..a9dc8344 100644 --- a/cogstack-es/requirements.txt +++ b/cogstack-es/requirements.txt @@ -1 +1,2 @@ elasticsearch>=9.0.0,<10.0 +ipython From 4f2d4638651b7fea56d63693d065092255973f68 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:57:08 +0100 Subject: [PATCH 07/40] CU-869aa22g2: Add tqdm dependency --- cogstack-es/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt index a9dc8344..93564110 100644 --- a/cogstack-es/requirements.txt +++ b/cogstack-es/requirements.txt @@ -1,2 +1,3 @@ elasticsearch>=9.0.0,<10.0 ipython +tqdm>=4.64,<5.0 From 61b20f056617c8ce1e9adaa49afdebe6a341dc5a Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 14:57:26 +0100 Subject: [PATCH 08/40] CU-869aa22g2: Add pandas dependency --- cogstack-es/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt index 93564110..1e06ec9d 100644 --- a/cogstack-es/requirements.txt +++ b/cogstack-es/requirements.txt @@ -1,3 +1,4 @@ elasticsearch>=9.0.0,<10.0 ipython tqdm>=4.64,<5.0 +pandas>=2.2,<3.0 From ec2a7801efc9b881300740e337803e1c7410a0b7 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:20:31 +0100 Subject: [PATCH 09/40] CU-869aa22g2: Add a few initial tests --- cogstack-es/tests/__init__.py | 0 cogstack-es/tests/test_cogstack.py | 87 ++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 cogstack-es/tests/__init__.py create mode 100644 cogstack-es/tests/test_cogstack.py diff --git a/cogstack-es/tests/__init__.py b/cogstack-es/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cogstack-es/tests/test_cogstack.py b/cogstack-es/tests/test_cogstack.py new file mode 100644 index 00000000..7fea96a5 --- /dev/null +++ b/cogstack-es/tests/test_cogstack.py @@ -0,0 +1,87 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +import pandas as pd +from cogstack import CogStack + + +@pytest.fixture +def mock_elasticsearch(): + """Fixture to mock Elasticsearch client""" + with patch('cogstack.elasticsearch.Elasticsearch') as mock_es: + mock_client = Mock() + mock_es.return_value = mock_client + mock_client.ping.return_value = True + print("With Mock ES...") + yield mock_es, mock_client + print("DONE WITH MOCK ES") + + +def test_basic_auth_connection(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test basic authentication""" + dunder_init, _ = mock_elasticsearch + with patch('builtins.input', return_value='test_user'): + with patch('getpass.getpass', return_value='test_pass'): + cs = CogStack.with_basic_auth(['http://localhost:9200']) + assert isinstance(cs, CogStack) + assert cs.elastic.ping() + dunder_init.assert_called_once() + + +def test_api_key_auth_connection(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test API key authentication""" + dunder_init, _ = mock_elasticsearch + api_key = {"encoded": "test_encoded_key"} + cs = CogStack.with_api_key_auth(['http://localhost:9200'], api_key) + assert isinstance(cs, CogStack) + assert cs.elastic.ping() + dunder_init.assert_called_once() + + +def test_count_search_results(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test count_search_results method""" + # Mock the count response + _, mock_inst = mock_elasticsearch + mock_inst.count.return_value = {'count': 100} + + cs = CogStack(['http://localhost:9200']) + cs.elastic = mock_inst + + query = {"match": {"title": "test"}} + result = cs.count_search_results('test_index', query) + + assert "100" in result + mock_inst.count.assert_called_once() + + +def test_read_data_with_scan(mock_elasticsearch: tuple[MagicMock, Mock]): + """Test read_data_with_scan method""" + _, mock_inst = mock_elasticsearch + # Mock scan results + mock_hits = MagicMock() + mock_hits.__iter__.return_value = [ + {'_index': 'test_index', '_id': '1', '_score': 1.0, + 'fields': {'title': 'test1'}}, + {'_index': 'test_index', '_id': '2', '_score': 0.8, + 'fields': {'title': 'test2'}} + ] + + # Mock scan helper + with patch('cogstack.es_helpers.scan') as mock_scan, \ + patch('cogstack.tqdm.tqdm') as mock_tqdm: + + mock_scan.return_value = mock_hits + mock_tqdm.return_value = mock_hits # Make tqdm iterable + mock_tqdm.total = 2 + + # Mock count for progress bar + mock_inst.count.return_value = {'count': 2} + + cs = CogStack(['http://localhost:9200']) + cs.elastic = mock_inst + + query = {"query": {"match": {"title": "test"}}} + result = cs.read_data_with_scan('test_index', query, ['title']) + + assert isinstance(result, pd.DataFrame) + assert len(result) == 2 + assert 'title' in result.columns From 03221647274acb3d9f9fbce2d24ab8495fa82e49 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:26:19 +0100 Subject: [PATCH 10/40] CU-869aa22g2: Remove a bunch of extra whitespace --- cogstack-es/cogstack.py | 245 ++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 122 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 151986f6..b3fdc9e0 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -11,17 +11,18 @@ warnings.filterwarnings("ignore") + class CogStack(): """ A class for interacting with Elasticsearch. - + Parameters ------------ hosts : List[str] A list of Elasticsearch host URLs. """ ES_TIMEOUT = 300 - + def __init__(self, hosts: List[str]): self.hosts = hosts self.elastic: elasticsearch.Elasticsearch @@ -39,10 +40,10 @@ def with_basic_auth(cls, hosts : List[str] A list of Elasticsearch host URLs. username : str, optional - The username to use when connecting to Elasticsearch. + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. password : str, optional - The password to use when connecting to Elasticsearch. + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. Returns ------- @@ -51,10 +52,10 @@ def with_basic_auth(cls, cs = cls(hosts) cs.use_basic_auth(username, password) return cs - + @classmethod - def with_api_key_auth(cls, - hosts: List[str], + def with_api_key_auth(cls, + hosts: List[str], api_key: Optional[Dict] = None) -> 'CogStack': """ Create an instance of CogStack using API key authentication. @@ -67,10 +68,10 @@ def with_api_key_auth(cls, API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. - + If not provided, the user will be prompted to enter API key "encoded" value. - - Example: + + Example: .. code-block:: json { "id": "API_KEY_ID", @@ -90,18 +91,18 @@ def use_basic_auth(self, password:Optional[str] = None) -> 'CogStack': """ Create an instance of CogStack using basic authentication. - If the `username` or `password` parameters are not provided, + If the `username` or `password` parameters are not provided, the user will be prompted to enter them. Parameters ---------- username : str, optional - The username to use when connecting to Elasticsearch. + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. password : str, optional - The password to use when connecting to Elasticsearch. + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. - + Returns ------- CogStack: An instance of the CogStack class. @@ -112,7 +113,7 @@ def use_basic_auth(self, password = getpass.getpass("Password: ") return self.__connect(basic_auth=(username, password) if username and password else None) - + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': """ Create an instance of CogStack using API key authentication. @@ -123,17 +124,17 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. - + If not provided, the user will be prompted to enter API key "encoded" value. - - Example: + + Example: .. code-block:: json { "id": "API_KEY_ID", "api_key": "API_KEY", "encoded": "API_KEY_ENCODED_STRING" } - + Returns ------- CogStack: An instance of the CogStack class. @@ -171,7 +172,7 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': else getpass.getpass("API Key: ")) return self.__connect(api_key=encoded if has_encoded_value else (api_id_value, api_key_value)) - + def __connect(self, basic_auth : Optional[tuple[str,str]] = None, api_key: Optional[Union[str, tuple[str, str]]] = None) -> 'CogStack': @@ -181,7 +182,7 @@ def __connect(self, basic_auth : Tuple[str, str], optional A tuple containing the username and password for basic authentication. api_key : str or Tuple[str, str], optional - The API key or a tuple containing the API key ID and API key + The API key or a tuple containing the API key ID and API key for API key authentication. Returns ------- @@ -197,10 +198,10 @@ def __connect(self, request_timeout=self.ES_TIMEOUT) if not self.elastic.ping(): raise ConnectionError("CogStack connection failed. " \ - "Please check your host list and credentials and try again.") + "Please check your host list and credentials and try again.") print("CogStack connection established successfully.") return self - + def get_indices_and_aliases(self): """ Retrieve indices and their aliases @@ -228,14 +229,14 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): Parameters ---------- - index: str | Sequence[str] + index: str | Sequence[str] Name(s) of indices or aliases for which the list of fields is retrieved Returns ---------- - pandas.DataFrame + pandas.DataFrame A DataFrame containing index names and their fields with data types - + Raises ------ Exception @@ -269,21 +270,21 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): def count_search_results(self, index: Union[str, Sequence[str]], query: dict): """ Count number of documents returned by the query - + Parameters ---------- index : str or Sequence[str] The name(s) of the Elasticsearch indices or their aliases to search. - + query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other - query options which will be ignored + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored - .. code-block:: json + .. code-block:: json {"query": {"match": {"title": "python"}}}} or only consist of content of `query` block - .. code-block:: json + .. code-block:: json {"match": {"title": "python"}}} """ if len(index) == 0: @@ -291,55 +292,55 @@ def count_search_results(self, index: Union[str, Sequence[str]], query: dict): query = self.__extract_query(query=query) count = self.elastic.count(index=index, query=query, allow_no_indices=False)['count'] return f"Number of documents: {format(count, ',')}" - - def read_data_with_scan(self, - index: Union[str, Sequence[str]], - query: dict, - include_fields: Optional[list[str]]=None, - size: int=1000, + + def read_data_with_scan(self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]]=None, + size: int=1000, request_timeout: int=ES_TIMEOUT, show_progress: bool = True): """ - Retrieve documents from an Elasticsearch index or + Retrieve documents from an Elasticsearch index or indices using search query and elasticsearch scan helper function. - The function converts search results to a Pandas DataFrame and does + The function converts search results to a Pandas DataFrame and does not return current scroll id if the process fails. - + Parameters ---------- index : str or Sequence[str] The name(s) of the Elasticsearch indices or their aliases to search. query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other - query options which will be used in the search + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be used in the search .. code-block:: json {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block + or only consist of content of `query` block (preferred method to avoid clashing with other parameters) .. code-block:: json {"match": {"title": "python"}}} - + include_fields : list[str], optional - A list of fields to be included in search results - and presented as columns in the DataFrame. + A list of fields to be included in search results + and presented as columns in the DataFrame. If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 The number of documents to be returned by the query or scroll API during each iteration. MAX: 10,000. request_timeout : int, optional, default=300 - The time in seconds to wait for a response + The time in seconds to wait for a response from Elasticsearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. Returns ------ - pandas.DataFrame + pandas.DataFrame A DataFrame containing the retrieved documents. - + Raises ------ Exception @@ -383,7 +384,7 @@ def read_data_with_scan(self, pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) return self.__create_dataframe(all_mapped_results, include_fields) - + def read_data_with_scroll(self, index: Union[str, Sequence[str]], query: dict, @@ -396,58 +397,58 @@ def read_data_with_scroll(self, Retrieves documents from an Elasticsearch index using search query and scroll API. Default scroll timeout is set to 10 minutes. The function converts search results to a Pandas DataFrame. - + Parameters ---------- index : str or Sequence[str] The name(s) of the Elasticsearch indices or their aliases to search. query : dict - A dictionary containing the search query parameters. - Query can start with `query` key - and contain other query options which will be ignored + A dictionary containing the search query parameters. + Query can start with `query` key + and contain other query options which will be ignored - .. code-block:: json + .. code-block:: json {"query": {"match": {"title": "python"}}}} or only consist of content of `query` block - .. code-block:: json + .. code-block:: json {"match": {"title": "python"}}} - + include_fields : list[str], optional - A list of fields to be included in search results - and presented as columns in the DataFrame. + A list of fields to be included in search results + and presented as columns in the DataFrame. If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query - or scroll API during each iteration. + The number of documents to be returned by the query + or scroll API during each iteration. MAX: 10,000. search_scroll_id : str, optional - The value of the last scroll_id - returned by scroll API and used to continue the search - if the current search fails. - The value of scroll_id - times out after 10 minutes. - After which the search will have to be restarted. + The value of the last scroll_id + returned by scroll API and used to continue the search + if the current search fails. + The value of scroll_id + times out after 10 minutes. + After which the search will have to be restarted. Note: Absence of this parameter indicates a new search. request_timeout : int, optional, default=300 The time in seconds to wait for a response from Elasticsearch before timing out. show_progress : bool, optional, default=True - Whether to show the progress in console. - IMPORTANT: The progress bar displays the total hits + Whether to show the progress in console. + IMPORTANT: The progress bar displays the total hits for the query even if continuing the search using `search_scroll_id`. Returns ------ - pandas.DataFrame + pandas.DataFrame A DataFrame containing the retrieved documents. - + Raises ------ Exception - If the search fails or cancelled by the user. - If the search fails, error message includes the value of current `search_scroll_id` - which can be used as a function parameter to continue the search. - IMPORTANT: If the function fails after `scroll` request, - the subsequent request will skip results of the failed scroll by the + If the search fails or cancelled by the user. + If the search fails, error message includes the value of current `search_scroll_id` + which can be used as a function parameter to continue the search. + IMPORTANT: If the function fails after `scroll` request, + the subsequent request will skip results of the failed scroll by the value of `size` parameter. """ try: @@ -465,16 +466,16 @@ def read_data_with_scroll(self, disable=not show_progress, colour='green') if search_scroll_id is None: - search_result = self.elastic.search(index=index, + search_result = self.elastic.search(index=index, size=size, - query=query, - fields=include_fields_map, - source=False, + query=query, + fields=include_fields_map, + source=False, scroll="10m", timeout=f"{request_timeout}s", allow_no_indices=False, - rest_total_hits_as_int=True) - + rest_total_hits_as_int=True) + pr_bar.total = search_result.body['hits']['total'] hits = search_result.body['hits']['hits'] result_count = len(hits) @@ -486,7 +487,7 @@ def read_data_with_scroll(self, while search_scroll_id and result_count == size: # Perform ES scroll request - search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", + search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) hits = search_result.body['hits']['hits'] pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] @@ -508,7 +509,7 @@ def read_data_with_scroll(self, pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') - + return self.__create_dataframe(all_mapped_results, include_fields) def read_data_with_sorting(self, @@ -522,37 +523,37 @@ def read_data_with_sorting(self, show_progress: Optional[bool] = True): """ Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. - + Parameters ---------- index : str or Sequence[str] The name(s) of the Elasticsearch indices or their aliases to search. query : dict A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored + Query can start with `query` key and contain other query options which will be ignored - .. code-block:: json + .. code-block:: json {"query": {"match": {"title": "python"}}}} or only consist of content of `query` block - .. code-block:: json + .. code-block:: json {"match": {"title": "python"}}} include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. + A list of fields to be included in search results and presented as columns in the DataFrame. If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. + The number of documents to be returned by the query or scroll API during each iteration. MAX: 10,000. sort : dict|list[str], optional, default = {"id": "asc"} - Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. - `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. + Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. Default sorting order is `asc` >Example: - `dict : {"filed_Name" : "desc", "id" : "asc"}` - `list : ["filed_Name", "id"]` search_after : list[str|int|float|Any|None], optional The sort value of the last record in search results. - Can be provided if the a search fails and needs to be restarted from the last successful search. + Can be provided if the a search fails and needs to be restarted from the last successful search. Use the value of `search_after_value` from the error message request_timeout : int, optional, default = 300 The time in seconds to wait for a response from Elasticsearch before timing out. @@ -561,14 +562,14 @@ def read_data_with_sorting(self, Returns ------ - pandas.DataFrame + pandas.DataFrame A DataFrame containing the retrieved documents. - + Raises ------ - Exception - If the search fails or cancelled by the user. - Error message includes the value of current `search_after_value` + Exception + If the search fails or cancelled by the user. + Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. """ try: @@ -585,29 +586,29 @@ def read_data_with_sorting(self, self.__validate_size(size=size) query = self.__extract_query(query=query) - if ((isinstance(sort, dict) and 'id' not in sort.keys()) + if ((isinstance(sort, dict) and 'id' not in sort.keys()) or (isinstance(sort, list) and 'id' not in sort)): if isinstance(sort, dict): sort['id'] = 'asc' else: - sort.append('id') - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", - disable=not show_progress, - colour='green') + sort.append('id') + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, + colour='green') while result_count == size: - search_result = self.elastic.search(index=index, + search_result = self.elastic.search(index=index, size=size, - query=query, - fields=include_fields_map, - source=False, + query=query, + fields=include_fields_map, + source=False, sort=sort, search_after=search_after_value, timeout=f"{request_timeout}s", track_scores=True, track_total_hits=True, allow_no_indices=False, - rest_total_hits_as_int=True) + rest_total_hits_as_int=True) hits = search_result['hits']['hits'] all_mapped_results.extend(self.__map_search_results(hits=hits)) result_count = len(hits) @@ -618,20 +619,20 @@ def read_data_with_sorting(self, raise LookupError(search_result["_shards"]["failures"]) except BaseException as err: if isinstance(err, KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", - "\033[0;33m", - "\033[0;33m") + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) print("Request cancelled.") else: if pr_bar is not None: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", - "\033[0;31m", + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", + "\033[0;31m", "\033[0;31m") pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") print(f"The last {search_after_value=}") - + return self.__create_dataframe(all_mapped_results, include_fields) def __extract_query(self, query: dict): @@ -642,7 +643,7 @@ def __extract_query(self, query: dict): def __validate_size(self, size): if size > 10000: raise ValueError('Size must not be greater than 10000') - + def __map_search_results(self, hits: Iterable): hit: dict for hit in hits: @@ -679,11 +680,11 @@ def __create_dataframe(self, all_mapped_results, column_headers): def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): """ Replace separator string with HTML <br/> - tag for printing in Notebook + tag for printing in Notebook Parameters: ----------- - df : DataFrame + df : DataFrame Input DataFrame separator : str Separator to be replaced with HTML <br/> @@ -693,14 +694,14 @@ def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: """ Divide a list into sublists of a specified size. - + Parameters: ---------- user_list : List[Any] The list to be divided. n : int The size of the sublists. - + Returns: -------- List[List[Any]]: A list of sublists containing the elements of the input list. From 696e66976a54cd44bebbbb6b1279d0737865d49b Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:26:46 +0100 Subject: [PATCH 11/40] CU-869aa22g2: Add ruff dependecny --- cogstack-es/requirements-dev.txt | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 cogstack-es/requirements-dev.txt diff --git a/cogstack-es/requirements-dev.txt b/cogstack-es/requirements-dev.txt new file mode 100644 index 00000000..7ad36053 --- /dev/null +++ b/cogstack-es/requirements-dev.txt @@ -0,0 +1,5 @@ +mypy +pandas-stubs +types-tqdm +pytest +ruff From 90d339bbacdeb12292165f29eb4e86436efa0e2d Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:27:58 +0100 Subject: [PATCH 12/40] CU-869aa22g2: Run ruff on cogstack module --- cogstack-es/cogstack.py | 559 ++++++++++++++++++++++++---------------- 1 file changed, 332 insertions(+), 227 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index b3fdc9e0..c8bb994f 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -12,7 +12,7 @@ warnings.filterwarnings("ignore") -class CogStack(): +class CogStack: """ A class for interacting with Elasticsearch. @@ -21,6 +21,7 @@ class CogStack(): hosts : List[str] A list of Elasticsearch host URLs. """ + ES_TIMEOUT = 300 def __init__(self, hosts: List[str]): @@ -28,10 +29,12 @@ def __init__(self, hosts: List[str]): self.elastic: elasticsearch.Elasticsearch @classmethod - def with_basic_auth(cls, - hosts: List[str], - username: Optional[str] = None, - password: Optional[str] = None) -> 'CogStack': + def with_basic_auth( + cls, + hosts: List[str], + username: Optional[str] = None, + password: Optional[str] = None, + ) -> "CogStack": """ Create an instance of CogStack using basic authentication. @@ -45,7 +48,7 @@ def with_basic_auth(cls, password : str, optional The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. - Returns + Returns ------- CogStack: An instance of the CogStack class. """ @@ -54,9 +57,9 @@ def with_basic_auth(cls, return cs @classmethod - def with_api_key_auth(cls, - hosts: List[str], - api_key: Optional[Dict] = None) -> 'CogStack': + def with_api_key_auth( + cls, hosts: List[str], api_key: Optional[Dict] = None + ) -> "CogStack": """ Create an instance of CogStack using API key authentication. @@ -66,7 +69,7 @@ def with_api_key_auth(cls, A list of Elasticsearch host URLs. apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -86,9 +89,9 @@ def with_api_key_auth(cls, cs.use_api_key_auth(api_key) return cs - def use_basic_auth(self, - username: Optional[str] = None, - password:Optional[str] = None) -> 'CogStack': + def use_basic_auth( + self, username: Optional[str] = None, password: Optional[str] = None + ) -> "CogStack": """ Create an instance of CogStack using basic authentication. If the `username` or `password` parameters are not provided, @@ -96,10 +99,10 @@ def use_basic_auth(self, Parameters ---------- - username : str, optional + username : str, optional The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. - password : str, optional + password : str, optional The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. @@ -112,9 +115,11 @@ def use_basic_auth(self, if password is None: password = getpass.getpass("Password: ") - return self.__connect(basic_auth=(username, password) if username and password else None) + return self.__connect( + basic_auth=(username, password) if username and password else None + ) - def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> "CogStack": """ Create an instance of CogStack using API key authentication. @@ -122,7 +127,7 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': ---------- apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -140,8 +145,8 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': CogStack: An instance of the CogStack class. """ has_encoded_value = False - api_id_value:str - api_key_value:str + api_id_value: str + api_key_value: str if not api_key: api_key = {"encoded": input("Encoded API key: ")} @@ -152,31 +157,45 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': has_encoded_value = True elif isinstance(api_key, Dict): # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys - if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() \ - and api_key["api_key"] != '': - # If both "id" and "api_key" are present, use them + if ( + "id" in api_key.keys() + and api_key["id"] != "" + and "api_key" in api_key.keys() + and api_key["api_key"] != "" + ): + # If both "id" and "api_key" are present, use them encoded = None else: # If "encoded" is present, use it; otherwise prompt for it - encoded = api_key["encoded"] \ - if "encoded" in api_key.keys() and api_key["encoded"] != '' \ - else input("Encoded API key: ") - has_encoded_value = encoded is not None and encoded != '' - - if(not has_encoded_value): - api_id_value = str(api_key["id"] \ - if "id" in api_key.keys() and api_key["id"] != '' \ - else input("API Id: ")) - api_key_value = str(api_key["api_key"] \ - if "api_key" in api_key.keys() and api_key["api_key"] != '' \ - else getpass.getpass("API Key: ")) - - return self.__connect(api_key=encoded if has_encoded_value else (api_id_value, api_key_value)) - - def __connect(self, - basic_auth : Optional[tuple[str,str]] = None, - api_key: Optional[Union[str, tuple[str, str]]] = None) -> 'CogStack': - """ Connect to Elasticsearch using the provided credentials. + encoded = ( + api_key["encoded"] + if "encoded" in api_key.keys() and api_key["encoded"] != "" + else input("Encoded API key: ") + ) + has_encoded_value = encoded is not None and encoded != "" + + if not has_encoded_value: + api_id_value = str( + api_key["id"] + if "id" in api_key.keys() and api_key["id"] != "" + else input("API Id: ") + ) + api_key_value = str( + api_key["api_key"] + if "api_key" in api_key.keys() and api_key["api_key"] != "" + else getpass.getpass("API Key: ") + ) + + return self.__connect( + api_key=encoded if has_encoded_value else (api_id_value, api_key_value) + ) + + def __connect( + self, + basic_auth: Optional[tuple[str, str]] = None, + api_key: Optional[Union[str, tuple[str, str]]] = None, + ) -> "CogStack": + """Connect to Elasticsearch using the provided credentials. Parameters ---------- basic_auth : Tuple[str, str], optional @@ -191,14 +210,18 @@ def __connect(self, ------ Exception: If the connection to Elasticsearch fails. """ - self.elastic = elasticsearch.Elasticsearch(hosts=self.hosts, - api_key=api_key, - basic_auth=basic_auth, - verify_certs=False, - request_timeout=self.ES_TIMEOUT) + self.elastic = elasticsearch.Elasticsearch( + hosts=self.hosts, + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=self.ES_TIMEOUT, + ) if not self.elastic.ping(): - raise ConnectionError("CogStack connection failed. " \ - "Please check your host list and credentials and try again.") + raise ConnectionError( + "CogStack connection failed. " + "Please check your host list and credentials and try again." + ) print("CogStack connection established successfully.") return self @@ -214,14 +237,14 @@ def get_indices_and_aliases(self): index_aliases_coll = [] for index in all_aliases: index_aliases = {} - index_aliases['Index'] = index - aliases=[] - for alias in all_aliases[index]['aliases']: + index_aliases["Index"] = index + aliases = [] + for alias in all_aliases[index]["aliases"]: aliases.append(alias) - index_aliases['Aliases'] = ', '.join(aliases) + index_aliases["Aliases"] = ", ".join(aliases) index_aliases_coll.append(index_aliases) - with pd.option_context('display.max_colwidth', None): - return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) + with pd.option_context("display.max_colwidth", None): + return pd.DataFrame(index_aliases_coll, columns=["Index", "Aliases"]) def get_index_fields(self, index: Union[str, Sequence[str]]): """ @@ -244,62 +267,72 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): """ try: if len(index) == 0: - raise ValueError('Provide at least one index or index alias name') - all_mappings = self.elastic.indices\ - .get_mapping(index=index, allow_no_indices=False).body - columns= ['Field', 'Type'] + raise ValueError("Provide at least one index or index alias name") + all_mappings = self.elastic.indices.get_mapping( + index=index, allow_no_indices=False + ).body + columns = ["Field", "Type"] if isinstance(index, List): - columns.insert(0, 'Index') + columns.insert(0, "Index") index_mappings_coll = [] for index_name in all_mappings: - for property_name in all_mappings[index_name]['mappings']['properties']: + for property_name in all_mappings[index_name]["mappings"]["properties"]: index_mapping = {} - index_mapping['Index'] = index_name - index_mapping['Field'] = property_name - index_mapping['Type'] = \ - all_mappings[index_name]['mappings']['properties'][property_name]['type'] \ - if "type" in all_mappings[index_name]['mappings']\ - ['properties'][property_name].keys() \ - else '?' + index_mapping["Index"] = index_name + index_mapping["Field"] = property_name + index_mapping["Type"] = ( + all_mappings[index_name]["mappings"]["properties"][ + property_name + ]["type"] + if "type" + in all_mappings[index_name]["mappings"]["properties"][ + property_name + ].keys() + else "?" + ) index_mappings_coll.append(index_mapping) - except Exception as err: + except Exception as err: raise Exception(f"Unexpected {err=}, {type(err)=}") - with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): - return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) + with pd.option_context("display.max_rows", len(index_mappings_coll) + 1): + return display(pd.DataFrame(data=index_mappings_coll, columns=columns)) def count_search_results(self, index: Union[str, Sequence[str]], query: dict): """ - Count number of documents returned by the query - - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other - query options which will be ignored - - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block - .. code-block:: json - {"match": {"title": "python"}}} + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} """ if len(index) == 0: - raise ValueError('Provide at least one index or index alias name') + raise ValueError("Provide at least one index or index alias name") query = self.__extract_query(query=query) - count = self.elastic.count(index=index, query=query, allow_no_indices=False)['count'] + count = self.elastic.count(index=index, query=query, allow_no_indices=False)[ + "count" + ] return f"Number of documents: {format(count, ',')}" - def read_data_with_scan(self, - index: Union[str, Sequence[str]], - query: dict, - include_fields: Optional[list[str]]=None, - size: int=1000, - request_timeout: int=ES_TIMEOUT, - show_progress: bool = True): + def read_data_with_scan( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: int = 1000, + request_timeout: int = ES_TIMEOUT, + show_progress: bool = True, + ): """ Retrieve documents from an Elasticsearch index or indices using search query and elasticsearch scan helper function. @@ -315,12 +348,12 @@ def read_data_with_scan(self, Query can start with `query` key and contain other query options which will be used in the search - .. code-block:: json + .. code-block:: json {"query": {"match": {"title": "python"}}}} or only consist of content of `query` block (preferred method to avoid clashing with other parameters) - .. code-block:: json + .. code-block:: json {"match": {"title": "python"}}} include_fields : list[str], optional @@ -329,7 +362,7 @@ def read_data_with_scan(self, If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll + The number of documents to be returned by the query or scroll API during each iteration. MAX: 10,000. request_timeout : int, optional, default=300 The time in seconds to wait for a response @@ -344,55 +377,74 @@ def read_data_with_scan(self, Raises ------ Exception - If the search fails or cancelled by the user. + If the search fails or cancelled by the user. """ try: if len(index) == 0: - raise ValueError('Provide at least one index or index alias name') + raise ValueError("Provide at least one index or index alias name") self.__validate_size(size=size) if "query" not in query.keys(): - temp_query = query.copy() + temp_query = query.copy() query.clear() query["query"] = temp_query - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", - disable=not show_progress, colour='green') - - scan_results = es_helpers.scan(self.elastic, - index=index, - query=query, - size=size, - request_timeout=request_timeout, - source=False, - fields = include_fields, - allow_no_indices=False,) + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", disable=not show_progress, colour="green" + ) + + scan_results = es_helpers.scan( + self.elastic, + index=index, + query=query, + size=size, + request_timeout=request_timeout, + source=False, + fields=include_fields, + allow_no_indices=False, + ) all_mapped_results = [] pr_bar.iterable = scan_results - pr_bar.total = self.elastic.count(index=index, query=query["query"])["count"] + pr_bar.total = self.elastic.count(index=index, query=query["query"])[ + "count" + ] all_mapped_results = self.__map_search_results(hits=pr_bar) - except BaseException as err: + except BaseException as err: if isinstance(err, KeyboardInterrupt): - pr_bar.bar_format ="%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", - "\033[0;33m", - "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) print("Request cancelled and current search_scroll_id deleted...") else: if pr_bar is not None: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", - "\033[0;31m", - "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) - print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) + print( + Exception( + f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}" + ) + ) return self.__create_dataframe(all_mapped_results, include_fields) - def read_data_with_scroll(self, - index: Union[str, Sequence[str]], - query: dict, - include_fields:Optional[list[str]]=None, - size: int=1000, - search_scroll_id: Optional[str] = None, - request_timeout: Optional[int]=ES_TIMEOUT, - show_progress: Optional[bool] = True): + def read_data_with_scroll( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: int = 1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int] = ES_TIMEOUT, + show_progress: Optional[bool] = True, + ): """ Retrieves documents from an Elasticsearch index using search query and scroll API. Default scroll timeout is set to 10 minutes. @@ -453,33 +505,39 @@ def read_data_with_scroll(self, """ try: if len(index) == 0: - raise ValueError('Provide at least one index or index alias name') + raise ValueError("Provide at least one index or index alias name") self.__validate_size(size=size) query = self.__extract_query(query=query) result_count = size - all_mapped_results =[] - search_result=None - include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ - [{"field": field} for field in include_fields] if include_fields is not None else None + all_mapped_results = [] + search_result = None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = ( + [{"field": field} for field in include_fields] + if include_fields is not None + else None + ) - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", - disable=not show_progress, colour='green') + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", disable=not show_progress, colour="green" + ) if search_scroll_id is None: - search_result = self.elastic.search(index=index, - size=size, - query=query, - fields=include_fields_map, - source=False, - scroll="10m", - timeout=f"{request_timeout}s", - allow_no_indices=False, - rest_total_hits_as_int=True) - - pr_bar.total = search_result.body['hits']['total'] - hits = search_result.body['hits']['hits'] + search_result = self.elastic.search( + index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + scroll="10m", + timeout=f"{request_timeout}s", + allow_no_indices=False, + rest_total_hits_as_int=True, + ) + + pr_bar.total = search_result.body["hits"]["total"] + hits = search_result.body["hits"]["hits"] result_count = len(hits) - search_scroll_id = search_result.body['_scroll_id'] + search_scroll_id = search_result.body["_scroll_id"] all_mapped_results.extend(self.__map_search_results(hits=hits)) pr_bar.update(len(hits)) if search_result["_shards"]["failed"] > 0: @@ -487,40 +545,65 @@ def read_data_with_scroll(self, while search_scroll_id and result_count == size: # Perform ES scroll request - search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", - rest_total_hits_as_int=True) - hits = search_result.body['hits']['hits'] - pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + search_result = self.elastic.scroll( + scroll_id=search_scroll_id, + scroll="10m", + rest_total_hits_as_int=True, + ) + hits = search_result.body["hits"]["hits"] + pr_bar.total = ( + pr_bar.total + if pr_bar.total + else search_result.body["hits"]["total"] + ) all_mapped_results.extend(self.__map_search_results(hits=hits)) - search_scroll_id = search_result.body['_scroll_id'] + search_scroll_id = search_result.body["_scroll_id"] result_count = len(hits) pr_bar.update(result_count) - self.elastic.clear_scroll(scroll_id = search_scroll_id) + self.elastic.clear_scroll(scroll_id=search_scroll_id) except BaseException as err: if isinstance(err, KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", - "\033[0;33m", - "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) - self.elastic.clear_scroll(scroll_id = search_scroll_id) + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) + self.elastic.clear_scroll(scroll_id=search_scroll_id) print("Request cancelled and current search_scroll_id deleted...") else: if pr_bar is not None: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) - print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) + print( + Exception( + f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}" + ), + f"{search_scroll_id=}", + sep="\n", + ) return self.__create_dataframe(all_mapped_results, include_fields) - def read_data_with_sorting(self, - index: Union[str, Sequence[str]], - query: dict, - include_fields: Optional[list[str]]=None, - size: Optional[int]=1000, - sort: Optional[Union[dict,list[str]]] = None, - search_after: Optional[list[Union[str,int,float,Any,None]]] = None, - request_timeout: Optional[int]=ES_TIMEOUT, - show_progress: Optional[bool] = True): + def read_data_with_sorting( + self, + index: Union[str, Sequence[str]], + query: dict, + include_fields: Optional[list[str]] = None, + size: Optional[int] = 1000, + sort: Optional[Union[dict, list[str]]] = None, + search_after: Optional[list[Union[str, int, float, Any, None]]] = None, + request_timeout: Optional[int] = ES_TIMEOUT, + show_progress: Optional[bool] = True, + ): """ Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. @@ -529,7 +612,7 @@ def read_data_with_sorting(self, index : str or Sequence[str] The name(s) of the Elasticsearch indices or their aliases to search. query : dict - A dictionary containing the search query parameters. + A dictionary containing the search query parameters. Query can start with `query` key and contain other query options which will be ignored .. code-block:: json @@ -552,7 +635,7 @@ def read_data_with_sorting(self, - `dict : {"filed_Name" : "desc", "id" : "asc"}` - `list : ["filed_Name", "id"]` search_after : list[str|int|float|Any|None], optional - The sort value of the last record in search results. + The sort value of the last record in search results. Can be provided if the a search fails and needs to be restarted from the last successful search. Use the value of `search_after_value` from the error message request_timeout : int, optional, default = 300 @@ -571,65 +654,83 @@ def read_data_with_sorting(self, If the search fails or cancelled by the user. Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. - """ + """ try: if len(index) == 0: - raise ValueError('Provide at least one index or index alias name') + raise ValueError("Provide at least one index or index alias name") result_count = size - all_mapped_results =[] + all_mapped_results = [] if sort is None: - sort = {'id': 'asc'} + sort = {"id": "asc"} search_after_value = search_after - include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ - [{"field": field} for field in include_fields] if include_fields is not None else None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = ( + [{"field": field} for field in include_fields] + if include_fields is not None + else None + ) self.__validate_size(size=size) query = self.__extract_query(query=query) - if ((isinstance(sort, dict) and 'id' not in sort.keys()) - or (isinstance(sort, list) and 'id' not in sort)): + if (isinstance(sort, dict) and "id" not in sort.keys()) or ( + isinstance(sort, list) and "id" not in sort + ): if isinstance(sort, dict): - sort['id'] = 'asc' + sort["id"] = "asc" else: - sort.append('id') - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", - disable=not show_progress, - colour='green') + sort.append("id") + pr_bar = tqdm.tqdm( + desc="CogStack retrieved...", disable=not show_progress, colour="green" + ) while result_count == size: - search_result = self.elastic.search(index=index, - size=size, - query=query, - fields=include_fields_map, - source=False, - sort=sort, - search_after=search_after_value, - timeout=f"{request_timeout}s", - track_scores=True, - track_total_hits=True, - allow_no_indices=False, - rest_total_hits_as_int=True) - hits = search_result['hits']['hits'] + search_result = self.elastic.search( + index=index, + size=size, + query=query, + fields=include_fields_map, + source=False, + sort=sort, + search_after=search_after_value, + timeout=f"{request_timeout}s", + track_scores=True, + track_total_hits=True, + allow_no_indices=False, + rest_total_hits_as_int=True, + ) + hits = search_result["hits"]["hits"] all_mapped_results.extend(self.__map_search_results(hits=hits)) result_count = len(hits) pr_bar.update(result_count) - search_after_value = hits[-1]['sort'] - pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + search_after_value = hits[-1]["sort"] + pr_bar.total = ( + pr_bar.total + if pr_bar.total + else search_result.body["hits"]["total"] + ) if search_result["_shards"]["failed"] > 0: raise LookupError(search_result["_shards"]["failures"]) - except BaseException as err: + except BaseException as err: if isinstance(err, KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", - "\033[0;33m", - "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) print("Request cancelled.") else: if pr_bar is not None: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", - "\033[0;31m", - "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;31m", + "\033[0;31m", + "\033[0;31m", + ) + pr_bar.set_description( + "CogStack read failed! Processed", refresh=True + ) print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") print(f"The last {search_after_value=}") @@ -637,22 +738,24 @@ def read_data_with_sorting(self, def __extract_query(self, query: dict): if "query" in query.keys(): - return query['query'] + return query["query"] return query def __validate_size(self, size): if size > 10000: - raise ValueError('Size must not be greater than 10000') + raise ValueError("Size must not be greater than 10000") def __map_search_results(self, hits: Iterable): hit: dict for hit in hits: row = dict() - row['_index'] = hit['_index'] - row['_id'] = hit['_id'] - row['_score'] = hit['_score'] - if 'fields' in hit.keys(): - row.update({k: ', '.join(map(str, v)) for k,v in dict(hit['fields']).items()}) + row["_index"] = hit["_index"] + row["_id"] = hit["_id"] + row["_score"] = hit["_score"] + if "fields" in hit.keys(): + row.update( + {k: ", ".join(map(str, v)) for k, v in dict(hit["fields"]).items()} + ) yield row def __create_dataframe(self, all_mapped_results, column_headers): @@ -671,13 +774,14 @@ def __create_dataframe(self, all_mapped_results, column_headers): pandas.DataFrame A DataFrame containing the search results. """ - df_headers = ['_index', '_id', '_score'] + df_headers = ["_index", "_id", "_score"] if column_headers and "*" not in column_headers: df_headers.extend(column_headers) return pd.DataFrame(data=all_mapped_results, columns=df_headers) return pd.DataFrame(data=all_mapped_results) -def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): + +def print_dataframe(df: pd.DataFrame, separator: str = "\\n"): """ Replace separator string with HTML <br/> tag for printing in Notebook @@ -689,7 +793,8 @@ def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): separator : str Separator to be replaced with HTML <br/> """ - return display(HTML(df.to_html().replace(separator, '
'))) + return display(HTML(df.to_html().replace(separator, "
"))) + def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: """ @@ -706,5 +811,5 @@ def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: -------- List[List[Any]]: A list of sublists containing the elements of the input list. """ - n=max(1, n) - return [user_list[i:i+n] for i in range(0, len(user_list), n)] + n = max(1, n) + return [user_list[i : i + n] for i in range(0, len(user_list), n)] From 77b3f3dc819114a4d7720407f93a081c39059eee Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:30:55 +0100 Subject: [PATCH 13/40] CU-869aa22g2: Move to primitives collections for type hinting --- cogstack-es/cogstack.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index c8bb994f..c962a5a7 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -1,7 +1,7 @@ from collections.abc import Mapping import getpass import traceback -from typing import Dict, List, Any, Optional, Iterable, Sequence, Union +from typing import Any, Optional, Iterable, Sequence, Union import warnings import elasticsearch import elasticsearch.helpers as es_helpers @@ -18,20 +18,20 @@ class CogStack: Parameters ------------ - hosts : List[str] + hosts : list[str] A list of Elasticsearch host URLs. """ ES_TIMEOUT = 300 - def __init__(self, hosts: List[str]): + def __init__(self, hosts: list[str]): self.hosts = hosts self.elastic: elasticsearch.Elasticsearch @classmethod def with_basic_auth( cls, - hosts: List[str], + hosts: list[str], username: Optional[str] = None, password: Optional[str] = None, ) -> "CogStack": @@ -40,7 +40,7 @@ def with_basic_auth( Parameters ---------- - hosts : List[str] + hosts : list[str] A list of Elasticsearch host URLs. username : str, optional The username to use when connecting to Elasticsearch. @@ -58,16 +58,16 @@ def with_basic_auth( @classmethod def with_api_key_auth( - cls, hosts: List[str], api_key: Optional[Dict] = None + cls, hosts: list[str], api_key: Optional[dict] = None ) -> "CogStack": """ Create an instance of CogStack using API key authentication. Parameters ---------- - hosts : List[str] + hosts : list[str] A list of Elasticsearch host URLs. - apiKey : Dict, optional + apiKey : dict, optional API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. @@ -119,7 +119,7 @@ def use_basic_auth( basic_auth=(username, password) if username and password else None ) - def use_api_key_auth(self, api_key: Optional[Dict] = None) -> "CogStack": + def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": """ Create an instance of CogStack using API key authentication. @@ -155,7 +155,7 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> "CogStack": # If api_key is a string, it is assumed to be the encoded API key encoded = api_key has_encoded_value = True - elif isinstance(api_key, Dict): + elif isinstance(api_key, dict): # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys if ( "id" in api_key.keys() @@ -272,7 +272,7 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): index=index, allow_no_indices=False ).body columns = ["Field", "Type"] - if isinstance(index, List): + if isinstance(index, list): columns.insert(0, "Index") index_mappings_coll = [] for index_name in all_mappings: @@ -796,20 +796,20 @@ def print_dataframe(df: pd.DataFrame, separator: str = "\\n"): return display(HTML(df.to_html().replace(separator, "
"))) -def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: +def list_chunker(user_list: list[Any], n: int) -> list[list[Any]]: """ Divide a list into sublists of a specified size. Parameters: ---------- - user_list : List[Any] + user_list : list[Any] The list to be divided. n : int The size of the sublists. Returns: -------- - List[List[Any]]: A list of sublists containing the elements of the input list. + list[list[Any]]: A list of sublists containing the elements of the input list. """ n = max(1, n) return [user_list[i : i + n] for i in range(0, len(user_list), n)] From fe1656f2b423ec833169f89fb2a6d6202a3c79bf Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:38:13 +0100 Subject: [PATCH 14/40] CU-869aa22g2: Some further linting changes --- cogstack-es/cogstack.py | 213 +++++++++++++++++++++++++--------------- 1 file changed, 133 insertions(+), 80 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index c962a5a7..388a26da 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -69,10 +69,14 @@ def with_api_key_auth( A list of Elasticsearch host URLs. apiKey : dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. - Generated in Elasticsearch or Kibana and provided by your CogStack administrator. + API key object with string fields either: + - A: "id" and "api_key" + - B: "encoded" + Generated in Elasticsearch or Kibana and provided by your + CogStack administrator. - If not provided, the user will be prompted to enter API key "encoded" value. + If not provided, the user will be prompted to enter API + key "encoded" value. Example: .. code-block:: json @@ -127,10 +131,17 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": ---------- apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. - Generated in Elasticsearch or Kibana and provided by your CogStack administrator. + API key object with string fields either: + - A: "id" and "api_key" + - B: "encoded" + Generated in Elasticsearch or Kibana and provided by your + CogStack administrator. - If not provided, the user will be prompted to enter API key "encoded" value. + If not provided, the user will be prompted to enter API + key "encoded" value. + + If not provided, the user will be prompted to enter API + key "encoded" value. Example: .. code-block:: json @@ -152,11 +163,13 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": api_key = {"encoded": input("Encoded API key: ")} else: if isinstance(api_key, str): - # If api_key is a string, it is assumed to be the encoded API key + # If api_key is a string, it is assumed to be + # the encoded API key encoded = api_key has_encoded_value = True elif isinstance(api_key, dict): - # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys + # If api_key is a dictionary, check for "encoded", + # "id" and "api_key" keys if ( "id" in api_key.keys() and api_key["id"] != "" @@ -169,7 +182,7 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": # If "encoded" is present, use it; otherwise prompt for it encoded = ( api_key["encoded"] - if "encoded" in api_key.keys() and api_key["encoded"] != "" + if "encoded" in api_key.keys() and api_key["encoded"] else input("Encoded API key: ") ) has_encoded_value = encoded is not None and encoded != "" @@ -187,7 +200,8 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": ) return self.__connect( - api_key=encoded if has_encoded_value else (api_id_value, api_key_value) + api_key=encoded if has_encoded_value else + (api_id_value, api_key_value) ) def __connect( @@ -199,7 +213,8 @@ def __connect( Parameters ---------- basic_auth : Tuple[str, str], optional - A tuple containing the username and password for basic authentication. + A tuple containing the username and password for + basic authentication. api_key : str or Tuple[str, str], optional The API key or a tuple containing the API key ID and API key for API key authentication. @@ -244,7 +259,8 @@ def get_indices_and_aliases(self): index_aliases["Aliases"] = ", ".join(aliases) index_aliases_coll.append(index_aliases) with pd.option_context("display.max_colwidth", None): - return pd.DataFrame(index_aliases_coll, columns=["Index", "Aliases"]) + return pd.DataFrame( + index_aliases_coll, columns=["Index", "Aliases"]) def get_index_fields(self, index: Union[str, Sequence[str]]): """ @@ -253,12 +269,14 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): Parameters ---------- index: str | Sequence[str] - Name(s) of indices or aliases for which the list of fields is retrieved + Name(s) of indices or aliases for which the list of fields + is retrieved Returns ---------- pandas.DataFrame - A DataFrame containing index names and their fields with data types + A DataFrame containing index names and their fields with + data types Raises ------ @@ -267,7 +285,8 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): """ try: if len(index) == 0: - raise ValueError("Provide at least one index or index alias name") + raise ValueError( + "Provide at least one index or index alias name") all_mappings = self.elastic.indices.get_mapping( index=index, allow_no_indices=False ).body @@ -276,7 +295,8 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): columns.insert(0, "Index") index_mappings_coll = [] for index_name in all_mappings: - for property_name in all_mappings[index_name]["mappings"]["properties"]: + for property_name in all_mappings[ + index_name]["mappings"]["properties"]: index_mapping = {} index_mapping["Index"] = index_name index_mapping["Field"] = property_name @@ -293,17 +313,21 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): index_mappings_coll.append(index_mapping) except Exception as err: raise Exception(f"Unexpected {err=}, {type(err)=}") - with pd.option_context("display.max_rows", len(index_mappings_coll) + 1): - return display(pd.DataFrame(data=index_mappings_coll, columns=columns)) + with pd.option_context( + "display.max_rows", len(index_mappings_coll) + 1): + return display(pd.DataFrame( + data=index_mappings_coll, columns=columns)) - def count_search_results(self, index: Union[str, Sequence[str]], query: dict): + def count_search_results(self, index: Union[str, Sequence[str]], + query: dict): """ Count number of documents returned by the query Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. + The name(s) of the Elasticsearch indices or their + aliases to search. query : dict A dictionary containing the search query parameters. @@ -319,9 +343,8 @@ def count_search_results(self, index: Union[str, Sequence[str]], query: dict): if len(index) == 0: raise ValueError("Provide at least one index or index alias name") query = self.__extract_query(query=query) - count = self.elastic.count(index=index, query=query, allow_no_indices=False)[ - "count" - ] + count = self.elastic.count(index=index, query=query, + allow_no_indices=False)["count"] return f"Number of documents: {format(count, ',')}" def read_data_with_scan( @@ -342,7 +365,8 @@ def read_data_with_scan( Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. + The name(s) of the Elasticsearch indices or their + aliases to search. query : dict A dictionary containing the search query parameters. Query can start with `query` key and contain other @@ -359,11 +383,13 @@ def read_data_with_scan( include_fields : list[str], optional A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results + If not provided, only _index, _id and _score fields will + be included. Columns _index, _id, _score + are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll - API during each iteration. MAX: 10,000. + The number of documents to be returned by the query or + scroll API during each iteration. + MAX: 10,000. request_timeout : int, optional, default=300 The time in seconds to wait for a response from Elasticsearch before timing out. @@ -381,14 +407,16 @@ def read_data_with_scan( """ try: if len(index) == 0: - raise ValueError("Provide at least one index or index alias name") + raise ValueError( + "Provide at least one index or index alias name") self.__validate_size(size=size) if "query" not in query.keys(): temp_query = query.copy() query.clear() query["query"] = temp_query pr_bar = tqdm.tqdm( - desc="CogStack retrieved...", disable=not show_progress, colour="green" + desc="CogStack retrieved...", + disable=not show_progress, colour="green" ) scan_results = es_helpers.scan( @@ -403,9 +431,8 @@ def read_data_with_scan( ) all_mapped_results = [] pr_bar.iterable = scan_results - pr_bar.total = self.elastic.count(index=index, query=query["query"])[ - "count" - ] + pr_bar.total = self.elastic.count( + index=index, query=query["query"])["count"] all_mapped_results = self.__map_search_results(hits=pr_bar) except BaseException as err: if isinstance(err, KeyboardInterrupt): @@ -417,7 +444,8 @@ def read_data_with_scan( pr_bar.set_description( "CogStack read cancelled! Processed", refresh=True ) - print("Request cancelled and current search_scroll_id deleted...") + print("Request cancelled and current " + "search_scroll_id deleted...") else: if pr_bar is not None: pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( @@ -430,8 +458,8 @@ def read_data_with_scan( ) print( Exception( - f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}" - ) + f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}") ) return self.__create_dataframe(all_mapped_results, include_fields) @@ -446,14 +474,15 @@ def read_data_with_scroll( show_progress: Optional[bool] = True, ): """ - Retrieves documents from an Elasticsearch index using search query and scroll API. - Default scroll timeout is set to 10 minutes. + Retrieves documents from an Elasticsearch index using search query + and scroll API. Default scroll timeout is set to 10 minutes. The function converts search results to a Pandas DataFrame. Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. + The name(s) of the Elasticsearch indices or their + aliases to search. query : dict A dictionary containing the search query parameters. Query can start with `query` key @@ -468,8 +497,10 @@ def read_data_with_scroll( include_fields : list[str], optional A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results + If not provided, only _index, _id and _score fields + will be included. + Columns _index, _id, _score are present + in all search results size : int, optional, default = 1000 The number of documents to be returned by the query or scroll API during each iteration. @@ -481,13 +512,16 @@ def read_data_with_scroll( The value of scroll_id times out after 10 minutes. After which the search will have to be restarted. - Note: Absence of this parameter indicates a new search. + Note: Absence of this parameter indicates + a new search. request_timeout : int, optional, default=300 - The time in seconds to wait for a response from Elasticsearch before timing out. + The time in seconds to wait for a response from + Elasticsearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. - IMPORTANT: The progress bar displays the total hits - for the query even if continuing the search using `search_scroll_id`. + IMPORTANT: The progress bar displays + the total hits for the query even if continuing the + search using `search_scroll_id`. Returns ------ pandas.DataFrame @@ -497,15 +531,17 @@ def read_data_with_scroll( ------ Exception If the search fails or cancelled by the user. - If the search fails, error message includes the value of current `search_scroll_id` - which can be used as a function parameter to continue the search. - IMPORTANT: If the function fails after `scroll` request, - the subsequent request will skip results of the failed scroll by the - value of `size` parameter. + If the search fails, error message includes the value of + current `search_scroll_id` which can be used as a function + parameter to continue the search. IMPORTANT: + If the function fails after `scroll` request, the subsequent + request will skip results of the failed scroll by the value + of `size` parameter. """ try: if len(index) == 0: - raise ValueError("Provide at least one index or index alias name") + raise ValueError( + "Provide at least one index or index alias name") self.__validate_size(size=size) query = self.__extract_query(query=query) result_count = size @@ -518,8 +554,8 @@ def read_data_with_scroll( ) pr_bar = tqdm.tqdm( - desc="CogStack retrieved...", disable=not show_progress, colour="green" - ) + desc="CogStack retrieved...", + disable=not show_progress, colour="green") if search_scroll_id is None: search_result = self.elastic.search( @@ -572,7 +608,8 @@ def read_data_with_scroll( "CogStack read cancelled! Processed", refresh=True ) self.elastic.clear_scroll(scroll_id=search_scroll_id) - print("Request cancelled and current search_scroll_id deleted...") + print("Request cancelled and current " + "search_scroll_id deleted...") else: if pr_bar is not None: pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( @@ -585,8 +622,8 @@ def read_data_with_scroll( ) print( Exception( - f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}" - ), + f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}"), f"{search_scroll_id=}", sep="\n", ) @@ -605,15 +642,18 @@ def read_data_with_sorting( show_progress: Optional[bool] = True, ): """ - Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. + Retrieve documents from an Elasticsearch index using search query + and convert them to a Pandas DataFrame. Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. + The name(s) of the Elasticsearch indices or their + aliases to search. query : dict A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored + Query can start with `query` key and contain other + query options which will be ignored .. code-block:: json {"query": {"match": {"title": "python"}}}} @@ -621,25 +661,33 @@ def read_data_with_sorting( .. code-block:: json {"match": {"title": "python"}}} include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results + A list of fields to be included in search results + and presented as columns in the DataFrame. + If not provided, only _index, _id and _score + fields will be included. + Columns _index, _id, _score are + present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. + The number of documents to be returned by the query + or scroll API during each iteration. MAX: 10,000. sort : dict|list[str], optional, default = {"id": "asc"} - Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. - `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. - Default sorting order is `asc` - >Example: + Sort field name(s) and order (`asc` or `desc`) + in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a + tiebreaker field. Default sorting order is `asc` + Example: - `dict : {"filed_Name" : "desc", "id" : "asc"}` - `list : ["filed_Name", "id"]` search_after : list[str|int|float|Any|None], optional The sort value of the last record in search results. - Can be provided if the a search fails and needs to be restarted from the last successful search. - Use the value of `search_after_value` from the error message + Can be provided if the a search fails and needs to + be restarted from the last successful search. + Use the value of `search_after_value` from the + error message request_timeout : int, optional, default = 300 - The time in seconds to wait for a response from Elasticsearch before timing out. + The time in seconds to wait for a response from + Elasticsearch before timing out. show_progress : bool, optional Whether to show the progress in console. Defaults to true. @@ -652,12 +700,14 @@ def read_data_with_sorting( ------ Exception If the search fails or cancelled by the user. - Error message includes the value of current `search_after_value` - which can be used as a function parameter to continue the search. + Error message includes the value of current + `search_after_value` which can be used as a function + parameter to continue the search. """ try: if len(index) == 0: - raise ValueError("Provide at least one index or index alias name") + raise ValueError( + "Provide at least one index or index alias name") result_count = size all_mapped_results = [] if sort is None: @@ -680,8 +730,8 @@ def read_data_with_sorting( else: sort.append("id") pr_bar = tqdm.tqdm( - desc="CogStack retrieved...", disable=not show_progress, colour="green" - ) + desc="CogStack retrieved...", + disable=not show_progress, colour="green") while result_count == size: search_result = self.elastic.search( @@ -731,7 +781,8 @@ def read_data_with_sorting( pr_bar.set_description( "CogStack read failed! Processed", refresh=True ) - print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") + print(f"Unexpected {err=},\n {traceback.format_exc()}, " + f"{type(err)=}") print(f"The last {search_after_value=}") return self.__create_dataframe(all_mapped_results, include_fields) @@ -754,7 +805,8 @@ def __map_search_results(self, hits: Iterable): row["_score"] = hit["_score"] if "fields" in hit.keys(): row.update( - {k: ", ".join(map(str, v)) for k, v in dict(hit["fields"]).items()} + {k: ", ".join(map(str, v)) for + k, v in dict(hit["fields"]).items()} ) yield row @@ -783,8 +835,8 @@ def __create_dataframe(self, all_mapped_results, column_headers): def print_dataframe(df: pd.DataFrame, separator: str = "\\n"): """ - Replace separator string with HTML <br/> - tag for printing in Notebook + Replace separator string with HTML + <br/> tag for printing in Notebook Parameters: ----------- @@ -809,7 +861,8 @@ def list_chunker(user_list: list[Any], n: int) -> list[list[Any]]: Returns: -------- - list[list[Any]]: A list of sublists containing the elements of the input list. + list[list[Any]]: A list of sublists containing the elements of + the input list. """ n = max(1, n) return [user_list[i : i + n] for i in range(0, len(user_list), n)] From 780a3fdf4d9b51ecc0500e81e65b1ffdcb2f7118 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:48:04 +0100 Subject: [PATCH 15/40] CU-869aa22g2: Refactor cogstack module to make a little more sense --- cogstack-es/cogstack.py | 63 ++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 388a26da..f5fd7fdf 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -18,15 +18,14 @@ class CogStack: Parameters ------------ - hosts : list[str] - A list of Elasticsearch host URLs. + elastic : elasticsearch.Elasticsearch + The ElasticSearch instance. """ ES_TIMEOUT = 300 - def __init__(self, hosts: list[str]): - self.hosts = hosts - self.elastic: elasticsearch.Elasticsearch + def __init__(self, elastic: elasticsearch.Elasticsearch) -> None: + self.elastic = elastic @classmethod def with_basic_auth( @@ -52,9 +51,8 @@ def with_basic_auth( ------- CogStack: An instance of the CogStack class. """ - cs = cls(hosts) - cs.use_basic_auth(username, password) - return cs + elastic = CogStack.use_basic_auth(hosts, username, password) + return cls(elastic) @classmethod def with_api_key_auth( @@ -89,13 +87,14 @@ def with_api_key_auth( ------- CogStack: An instance of the CogStack class. """ - cs = cls(hosts) - cs.use_api_key_auth(api_key) - return cs + elastic = CogStack.use_api_key_auth(hosts, api_key) + return cls(elastic) + @staticmethod def use_basic_auth( - self, username: Optional[str] = None, password: Optional[str] = None - ) -> "CogStack": + hosts: list[str], username: Optional[str] = None, + password: Optional[str] = None + ) -> elasticsearch.Elasticsearch: """ Create an instance of CogStack using basic authentication. If the `username` or `password` parameters are not provided, @@ -103,6 +102,8 @@ def use_basic_auth( Parameters ---------- + hosts : list[str] + A list of Elasticsearch host URLs. username : str, optional The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. @@ -112,23 +113,29 @@ def use_basic_auth( Returns ------- - CogStack: An instance of the CogStack class. + elasticsearch.Elasticsearch: An instance of the Elasticsearch. """ if username is None: username = input("Username: ") if password is None: password = getpass.getpass("Password: ") - return self.__connect( + return CogStack.__connect( + hosts, basic_auth=(username, password) if username and password else None ) - def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": + @staticmethod + def use_api_key_auth(hosts: list[str], + api_key: Optional[dict] = None + ) -> elasticsearch.Elasticsearch: """ Create an instance of CogStack using API key authentication. Parameters ---------- + hosts : list[str] + A list of Elasticsearch host URLs. apiKey : Dict, optional API key object with string fields either: @@ -153,7 +160,7 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": Returns ------- - CogStack: An instance of the CogStack class. + elasticsearch.Elasticsearch: An instance of the Elasticsearch. """ has_encoded_value = False api_id_value: str @@ -199,19 +206,23 @@ def use_api_key_auth(self, api_key: Optional[dict] = None) -> "CogStack": else getpass.getpass("API Key: ") ) - return self.__connect( + return CogStack.__connect( + hosts, api_key=encoded if has_encoded_value else (api_id_value, api_key_value) ) + @staticmethod def __connect( - self, + hosts: list[str], basic_auth: Optional[tuple[str, str]] = None, api_key: Optional[Union[str, tuple[str, str]]] = None, - ) -> "CogStack": + ) -> elasticsearch.Elasticsearch: """Connect to Elasticsearch using the provided credentials. Parameters ---------- + hosts : list[str] + A list of Elasticsearch host URLs. basic_auth : Tuple[str, str], optional A tuple containing the username and password for basic authentication. @@ -220,25 +231,25 @@ def __connect( for API key authentication. Returns ------- - CogStack: An instance of the CogStack class. + elasticsearch.Elasticsearch: An instance of the Elasticsearch. Raises ------ Exception: If the connection to Elasticsearch fails. """ - self.elastic = elasticsearch.Elasticsearch( - hosts=self.hosts, + elastic = elasticsearch.Elasticsearch( + hosts=hosts, api_key=api_key, basic_auth=basic_auth, verify_certs=False, - request_timeout=self.ES_TIMEOUT, + request_timeout=CogStack.ES_TIMEOUT, ) - if not self.elastic.ping(): + if not elastic.ping(): raise ConnectionError( "CogStack connection failed. " "Please check your host list and credentials and try again." ) print("CogStack connection established successfully.") - return self + return elastic def get_indices_and_aliases(self): """ From 403a2e808b6a3e63c15922582462433c09b1a170 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:49:33 +0100 Subject: [PATCH 16/40] CU-869aa22g2: Rename some methods for better descriptions --- cogstack-es/cogstack.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index f5fd7fdf..4209f0e8 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -51,7 +51,7 @@ def with_basic_auth( ------- CogStack: An instance of the CogStack class. """ - elastic = CogStack.use_basic_auth(hosts, username, password) + elastic = CogStack.get_es_basic_auth(hosts, username, password) return cls(elastic) @classmethod @@ -87,11 +87,11 @@ def with_api_key_auth( ------- CogStack: An instance of the CogStack class. """ - elastic = CogStack.use_api_key_auth(hosts, api_key) + elastic = CogStack.get_es_with_api_key(hosts, api_key) return cls(elastic) @staticmethod - def use_basic_auth( + def get_es_basic_auth( hosts: list[str], username: Optional[str] = None, password: Optional[str] = None ) -> elasticsearch.Elasticsearch: @@ -126,9 +126,9 @@ def use_basic_auth( ) @staticmethod - def use_api_key_auth(hosts: list[str], - api_key: Optional[dict] = None - ) -> elasticsearch.Elasticsearch: + def get_es_with_api_key(hosts: list[str], + api_key: Optional[dict] = None + ) -> elasticsearch.Elasticsearch: """ Create an instance of CogStack using API key authentication. From 2fd8507e55b3167db2e280178f046c896160ef0e Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:50:11 +0100 Subject: [PATCH 17/40] CU-869aa22g2: Rename a method name for better descriptions --- cogstack-es/cogstack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 4209f0e8..72639251 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -51,7 +51,7 @@ def with_basic_auth( ------- CogStack: An instance of the CogStack class. """ - elastic = CogStack.get_es_basic_auth(hosts, username, password) + elastic = CogStack.get_es_with_basic_auth(hosts, username, password) return cls(elastic) @classmethod @@ -91,7 +91,7 @@ def with_api_key_auth( return cls(elastic) @staticmethod - def get_es_basic_auth( + def get_es_with_basic_auth( hosts: list[str], username: Optional[str] = None, password: Optional[str] = None ) -> elasticsearch.Elasticsearch: From 7e04913802b46819b23888919b46af05f7956041 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:50:59 +0100 Subject: [PATCH 18/40] CU-869aa22g2: Minor whitespace fix --- cogstack-es/cogstack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 72639251..733a843a 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -876,4 +876,4 @@ def list_chunker(user_list: list[Any], n: int) -> list[list[Any]]: the input list. """ n = max(1, n) - return [user_list[i : i + n] for i in range(0, len(user_list), n)] + return [user_list[i: i + n] for i in range(0, len(user_list), n)] From 62b60cac45dd69d6fc9e5bb334901bb0a159ebfe Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:53:04 +0100 Subject: [PATCH 19/40] CU-869aa22g2: Remove path add in notebook --- cogstack-es/search_template.ipynb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index c442f0a6..ca1c1a7a 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -15,13 +15,18 @@ "metadata": {}, "outputs": [], "source": [ - "import sys\n", - "sys.path.append('..')\n", "import pandas as pd\n", "from credentials import *\n", - "from cogstack2 import CogStack, print_dataframe" + "from cogstack import CogStack, print_dataframe" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "metadata": {}, From 92aaabdec38e5f9fb82f6525941e42d8f5ca51ec Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:54:20 +0100 Subject: [PATCH 20/40] CU-869aa22g2: Remove unused import from notebook --- cogstack-es/search_template.ipynb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index ca1c1a7a..4e13eaff 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -15,8 +15,7 @@ "metadata": {}, "outputs": [], "source": [ - "import pandas as pd\n", - "from credentials import *\n", + "from credentials import hosts, api_key\n", "from cogstack import CogStack, print_dataframe" ] }, From 651a55b2a5f94161dbe0bc661e3bdf9d85cfbf7d Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 15:58:38 +0100 Subject: [PATCH 21/40] CU-869aa22g2: Import username and password from credentials in case they are neded --- cogstack-es/search_template.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index 4e13eaff..c2160971 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -15,7 +15,7 @@ "metadata": {}, "outputs": [], "source": [ - "from credentials import hosts, api_key\n", + "from credentials import hosts, api_key, username, password\n", "from cogstack import CogStack, print_dataframe" ] }, From e98f6479f7ae4b936c9a556d2fd12e980be3dcde Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 16:27:32 +0100 Subject: [PATCH 22/40] CU-869aa22g2: Add nbconvert dev-dependency --- cogstack-es/requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cogstack-es/requirements-dev.txt b/cogstack-es/requirements-dev.txt index 7ad36053..6896deb3 100644 --- a/cogstack-es/requirements-dev.txt +++ b/cogstack-es/requirements-dev.txt @@ -3,3 +3,4 @@ pandas-stubs types-tqdm pytest ruff +nbconvert From 4bd6246131455cb1c7a8c7ac23dadc858cb5ff0e Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 16:27:47 +0100 Subject: [PATCH 23/40] CU-869aa22g2: Add default indices to get fields for --- cogstack-es/search_template.ipynb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index c2160971..ff1f5359 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -82,7 +82,8 @@ "metadata": {}, "outputs": [], "source": [ - "cs.get_index_fields([])" + "indices = [\"example_index\"] # <- CHANGE THIS\n", + "cs.get_index_fields(indices)" ] }, { From ab5dee8035f3001d6618c0b9e735b3f178fd2d36 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 16:38:05 +0100 Subject: [PATCH 24/40] CU-869aa22g2: Improve error handling (avoid hiding stack trace) --- cogstack-es/cogstack.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 733a843a..103e6830 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -323,7 +323,8 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): ) index_mappings_coll.append(index_mapping) except Exception as err: - raise Exception(f"Unexpected {err=}, {type(err)=}") + raise Exception( + "Unexpected issue while getting index fields") from err with pd.option_context( "display.max_rows", len(index_mappings_coll) + 1): return display(pd.DataFrame( From ab8da2cc9a3d1ac9a5f0f8f84f221e83c160ddd7 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 16:39:35 +0100 Subject: [PATCH 25/40] CU-869aa22g2: Add default indices in notebook example --- cogstack-es/search_template.ipynb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index ff1f5359..3c849cae 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -164,7 +164,8 @@ "outputs": [], "source": [ "# Count the number of documents matching the search query\n", - "cs.count_search_results(index=[], query=search_query)\n" + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "cs.count_search_results(index=example_indices, query=search_query)\n" ] }, { From 4b771828017b31ad7ee1becf5ff97eaa7fa21b75 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 16:42:08 +0100 Subject: [PATCH 26/40] CU-869aa22g2: Update progress bar handling during exception handling --- cogstack-es/cogstack.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 103e6830..c32abd70 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -417,6 +417,7 @@ def read_data_with_scan( Exception If the search fails or cancelled by the user. """ + pr_bar: Optional[tqdm.tqdm] = None try: if len(index) == 0: raise ValueError( @@ -448,14 +449,15 @@ def read_data_with_scan( all_mapped_results = self.__map_search_results(hits=pr_bar) except BaseException as err: if isinstance(err, KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( - "\033[0;33m", - "\033[0;33m", - "\033[0;33m", - ) - pr_bar.set_description( - "CogStack read cancelled! Processed", refresh=True - ) + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ( + "\033[0;33m", + "\033[0;33m", + "\033[0;33m", + ) + pr_bar.set_description( + "CogStack read cancelled! Processed", refresh=True + ) print("Request cancelled and current " "search_scroll_id deleted...") else: From 3a604cfe1829bcb973a573a7340506fd9c64da88 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:05:08 +0100 Subject: [PATCH 27/40] CU-869aa22g2: Add default indices in notebook examples --- cogstack-es/search_template.ipynb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index 3c849cae..0b52a36e 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -186,7 +186,8 @@ "source": [ "# Read data using scan helper function.\n", "# Does not provide a scroll id, so cannot be resumed if search fails midway.\n", - "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_scan(index=example_indices, query=search_query, include_fields=columns)\n", "print(df)" ] }, @@ -198,7 +199,8 @@ "source": [ "# Read data with scroll API and get scroll id if search fails midway. \n", "# Can be used to resume the search from the failed scroll id.\n", - "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_scroll(index=example_indices, query=search_query, include_fields=columns)\n", "print(df)" ] }, @@ -211,7 +213,8 @@ "# Read data with sorting and get search_after value if search fails midway.\n", "# Can be used to resume the search from the failed search_after value but can be slower than scan or scroll methods for large datasets.\n", "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", - "df = cs.read_data_with_sorting(index=[], query=search_query, \n", + "example_indices = [\"example-index\"] # <- CHANGE THIS\n", + "df = cs.read_data_with_sorting(index=example_indices, query=search_query, \n", " include_fields=columns)\n", "print(df)" ] From 921d68227654dab1f11b79dd0850da2b3a54362b Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:08:28 +0100 Subject: [PATCH 28/40] CU-869aa22g2: Add data folder --- cogstack-es/data/.keep | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 cogstack-es/data/.keep diff --git a/cogstack-es/data/.keep b/cogstack-es/data/.keep new file mode 100644 index 00000000..e69de29b From bd4f5bd7d9f7460cd186b049e679fcdc9aba7670 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:08:51 +0100 Subject: [PATCH 29/40] CU-869aa22g2 Fix data folder in notebook --- cogstack-es/search_template.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cogstack-es/search_template.ipynb b/cogstack-es/search_template.ipynb index 0b52a36e..8975e4f6 100644 --- a/cogstack-es/search_template.ipynb +++ b/cogstack-es/search_template.ipynb @@ -251,7 +251,7 @@ "outputs": [], "source": [ "# Save the DataFrame to a CSV file\n", - "path_to_results = \"../data/cogstack_search_results\"\n", + "path_to_results = \"data/cogstack_search_results\"\n", "file_name = \"file_name.csv\"" ] }, From b1790e01277dc5521513fad3824a0ca8b6a01b62 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:10:30 +0100 Subject: [PATCH 30/40] CU-869aa22g2: Add initial notebook tests --- cogstack-es/tests/test_search_template_nb.py | 82 ++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 cogstack-es/tests/test_search_template_nb.py diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py new file mode 100644 index 00000000..61119682 --- /dev/null +++ b/cogstack-es/tests/test_search_template_nb.py @@ -0,0 +1,82 @@ +import nbformat +from nbconvert import PythonExporter +from unittest.mock import Mock, patch, MagicMock +from contextlib import contextmanager +import tempfile + + +@contextmanager +def all_mocked(python_code: str): + with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: + temp_file.write(python_code) + with patch('elasticsearch.Elasticsearch') as mock_es: + with patch('credentials.hosts', ['http://localhost:9200']): + with patch('credentials.api_key', {"encoded": "test_api_key"}): + with patch('elasticsearch.helpers.scan') as mock_scan: + with patch('tqdm.tqdm') as mock_tqdm: + yield (temp_file.name, mock_es, + mock_scan, mock_tqdm) + + +def setup_mocks(mock_es: MagicMock, mock_scan: MagicMock, mock_tqdm: MagicMock): + # Setup mocks + mock_client = Mock() + mock_es.return_value = mock_client + mock_client.ping.return_value = True + + mock_aliases = Mock() + mock_aliases.body = { + 'index1': {'aliases': {'alias1': {}}} + } + mock_mapping = Mock() + mock_mapping.body = { + 'index1': {'mappings': {'properties': {}}} + } + # Mock Elasticsearch responses + mock_client.indices.get_alias.return_value = mock_aliases + mock_client.indices.get_mapping.return_value = mock_mapping + mock_client.count.return_value = {'count': 10} + + # Mock scan results + mock_hits = MagicMock() + mock_hits.__iter__.return_value = [{ + '_index': 'test', '_id': '1', '_score': 1.0, + 'fields': {'test_field': ['value']} + }] + mock_scan.return_value = mock_hits + mock_tqdm.return_value = mock_hits + mock_tqdm.total = 1 + + +def test_notebook_execution(): + """Execute the notebook with mocked dependencies""" + + # Read the notebook + notebook_path = 'search_template.ipynb' + with open(notebook_path, 'r') as f: + notebook = nbformat.read(f, as_version=4) + + # Convert to Python code + exporter = PythonExporter() + python_code, _ = exporter.from_notebook_node(notebook) + + # Mock all the dependencies + with all_mocked(python_code) as (temp_code_path, mock_es, + mock_scan, mock_tqdm): + setup_mocks(mock_es, mock_scan, mock_tqdm) + + print("TEMP FILE PATH", temp_code_path) + import os + print("EXISTS?", os.path.exists(temp_code_path)) + if os.path.exists(temp_code_path): + with open(temp_code_path) as f: + lines = f.readlines() + print("Lines", len(lines)) + try: + # Execute the notebook code + exec(python_code, { + '__file__': temp_code_path, + '__name__': '__main__' + }) + except Exception as err: + raise ValueError() from err From 42eb02429cfcf5506ffb007da6903092fb618517 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:11:16 +0100 Subject: [PATCH 31/40] CU-869aa22g2: Simplify test slightly --- cogstack-es/tests/test_search_template_nb.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py index 61119682..9f7772c3 100644 --- a/cogstack-es/tests/test_search_template_nb.py +++ b/cogstack-es/tests/test_search_template_nb.py @@ -72,11 +72,8 @@ def test_notebook_execution(): with open(temp_code_path) as f: lines = f.readlines() print("Lines", len(lines)) - try: - # Execute the notebook code - exec(python_code, { - '__file__': temp_code_path, - '__name__': '__main__' - }) - except Exception as err: - raise ValueError() from err + # Execute the notebook code + exec(python_code, { + '__file__': temp_code_path, + '__name__': '__main__' + }) From e5705216478491ae25b5b1b0a3e587f49c826fc4 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:11:35 +0100 Subject: [PATCH 32/40] CU-869aa22g2: Remove test-time debug output --- cogstack-es/tests/test_search_template_nb.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py index 9f7772c3..56366b14 100644 --- a/cogstack-es/tests/test_search_template_nb.py +++ b/cogstack-es/tests/test_search_template_nb.py @@ -65,13 +65,6 @@ def test_notebook_execution(): mock_scan, mock_tqdm): setup_mocks(mock_es, mock_scan, mock_tqdm) - print("TEMP FILE PATH", temp_code_path) - import os - print("EXISTS?", os.path.exists(temp_code_path)) - if os.path.exists(temp_code_path): - with open(temp_code_path) as f: - lines = f.readlines() - print("Lines", len(lines)) # Execute the notebook code exec(python_code, { '__file__': temp_code_path, From 35cce29507562317bc114e698ff7799c2b20a096 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:18:53 +0100 Subject: [PATCH 33/40] CU-869aa22g2: Add assertion and removal of data file created by notebook --- cogstack-es/tests/test_search_template_nb.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py index 56366b14..1933753b 100644 --- a/cogstack-es/tests/test_search_template_nb.py +++ b/cogstack-es/tests/test_search_template_nb.py @@ -3,6 +3,11 @@ from unittest.mock import Mock, patch, MagicMock from contextlib import contextmanager import tempfile +import os +import pytest + + +EXPECTED_TEMP_FILE_PATH = "data/cogstack_search_results\\file_name.csv" @contextmanager @@ -48,7 +53,14 @@ def setup_mocks(mock_es: MagicMock, mock_scan: MagicMock, mock_tqdm: MagicMock): mock_tqdm.total = 1 -def test_notebook_execution(): +@pytest.fixture +def temp_file_remover(): + yield + if os.path.exists(EXPECTED_TEMP_FILE_PATH): + os.remove(EXPECTED_TEMP_FILE_PATH) + + +def test_notebook_execution(temp_file_remover): """Execute the notebook with mocked dependencies""" # Read the notebook @@ -70,3 +82,4 @@ def test_notebook_execution(): '__file__': temp_code_path, '__name__': '__main__' }) + assert os.path.exists(EXPECTED_TEMP_FILE_PATH) From a0c6fa302c832128271d811001d2ef723921764c Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:23:01 +0100 Subject: [PATCH 34/40] CU-869aa22g2: Add initial workflow --- .github/workflows/cogstack-es_main.yml | 42 ++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 .github/workflows/cogstack-es_main.yml diff --git a/.github/workflows/cogstack-es_main.yml b/.github/workflows/cogstack-es_main.yml new file mode 100644 index 00000000..0c09d7ad --- /dev/null +++ b/.github/workflows/cogstack-es_main.yml @@ -0,0 +1,42 @@ +name: cogstack-es - Test + +on: + push: + branches: [ main ] + pull_request: + paths: + - 'cogstack-es/**' + - '.github/workflows/cogstack-es**' + +defaults: + run: + working-directory: ./medcat-demo-app + +jobs: + types-lint-tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ '3.9', '3.10', '3.11', '3.12' ] + max-parallel: 4 + + steps: + - uses: actions/checkout@v5 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + - name: Check types + run: | + python -m mypy --follow-imports=normal *.py + - name: Lint + run: | + ruff check *.py + - name: Test + run: | + pytest tests From 95fffd150e94c1f4c29c405544c01c6d50c03afd Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 29 Aug 2025 17:26:44 +0100 Subject: [PATCH 35/40] CU-869aa22g2: Fix workflow working directory --- .github/workflows/cogstack-es_main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cogstack-es_main.yml b/.github/workflows/cogstack-es_main.yml index 0c09d7ad..91ab1b85 100644 --- a/.github/workflows/cogstack-es_main.yml +++ b/.github/workflows/cogstack-es_main.yml @@ -10,7 +10,7 @@ on: defaults: run: - working-directory: ./medcat-demo-app + working-directory: ./cogstack-es jobs: types-lint-tests: From fdea8d8d57c3d067293fd41bdf0f1b12933a9a09 Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 14:56:17 +0100 Subject: [PATCH 36/40] CU-869aa22g2: Add OpenSearch dependency --- cogstack-es/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cogstack-es/requirements.txt b/cogstack-es/requirements.txt index 1e06ec9d..588e322b 100644 --- a/cogstack-es/requirements.txt +++ b/cogstack-es/requirements.txt @@ -1,4 +1,5 @@ elasticsearch>=9.0.0,<10.0 +opensearch-py>=2.0.0 ipython tqdm>=4.64,<5.0 pandas>=2.2,<3.0 From 329f047f74b0a3169d8d26f4c953ac79c3b10094 Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 15:21:35 +0100 Subject: [PATCH 37/40] CU-869aa22g2: Allow OpenSearch to be used instead of ES --- cogstack-es/cogstack.py | 130 +++++++++++++++++++++++++++------------- 1 file changed, 88 insertions(+), 42 deletions(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index c32abd70..2d653b05 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -1,10 +1,28 @@ from collections.abc import Mapping import getpass import traceback -from typing import Any, Optional, Iterable, Sequence, Union +from typing import Any, Optional, Iterable, Sequence, Union, Protocol +from typing import cast, TYPE_CHECKING import warnings -import elasticsearch -import elasticsearch.helpers as es_helpers + + +if TYPE_CHECKING: + from elasticsearch import Elasticsearch as _Elasticsearch + # from opensearchpy import OpenSearch as _OpenSearch + import elasticsearch.helpers + # import opensearchpy.helpers + # ElasticClient = Union[_Elasticsearch, _OpenSearch] + ElasticClient = _Elasticsearch + es_cls = _Elasticsearch + es_helpers = elasticsearch.helpers +else: + try: + from elasticsearch import Elasticsearch as ElasticClient + helpers = elasticsearch.helpers + except ImportError: + from opensearchpy import OpenSearch as ElasticClient + es_helpers = opensearchpy.helpers + es_cls = ElasticClient from IPython.display import display, HTML import pandas as pd import tqdm @@ -12,19 +30,47 @@ warnings.filterwarnings("ignore") +class IndicesClientProto(Protocol): + + def get_alias(self, *args: Any, **kwargs: Any) -> Any: + pass + + def get_mapping(self, *args: Any, **kwargs: Any) -> Any: + pass + + +class ESClient(Protocol): + + @property + def indices(self) -> IndicesClientProto: + pass + + def count(self, *args: Any, **kwargs: Any) -> Any: + pass + + def search(self, *args: Any, **kwargs: Any) -> Any: + pass + + def scroll(self, *args: Any, **kwargs: Any) -> Any: + pass + + def clear_scroll(self, *args: Any, **kwargs: Any) -> Any: + pass + + class CogStack: """ - A class for interacting with Elasticsearch. + A class for interacting with Elasticsearch or OpenSearch. Parameters ------------ - elastic : elasticsearch.Elasticsearch - The ElasticSearch instance. + elastic : ESClient + The ElasticSearch or OpenSearch instance. """ ES_TIMEOUT = 300 - def __init__(self, elastic: elasticsearch.Elasticsearch) -> None: + def __init__(self, elastic: ESClient) -> None: self.elastic = elastic @classmethod @@ -94,7 +140,7 @@ def with_api_key_auth( def get_es_with_basic_auth( hosts: list[str], username: Optional[str] = None, password: Optional[str] = None - ) -> elasticsearch.Elasticsearch: + ) -> ESClient: """ Create an instance of CogStack using basic authentication. If the `username` or `password` parameters are not provided, @@ -103,17 +149,17 @@ def get_es_with_basic_auth( Parameters ---------- hosts : list[str] - A list of Elasticsearch host URLs. + A list of Elasticsearch or OpenSearch host URLs. username : str, optional - The username to use when connecting to Elasticsearch. + The username to use when connecting to Elasticsearch or OpenSearch. If not provided, the user will be prompted to enter a username. password : str, optional - The password to use when connecting to Elasticsearch. + The password to use when connecting to Elasticsearch or OpenSearch. If not provided, the user will be prompted to enter a password. Returns ------- - elasticsearch.Elasticsearch: An instance of the Elasticsearch. + ESClient: An instance of the Elasticsearch or OpenSearch. """ if username is None: username = input("Username: ") @@ -128,21 +174,21 @@ def get_es_with_basic_auth( @staticmethod def get_es_with_api_key(hosts: list[str], api_key: Optional[dict] = None - ) -> elasticsearch.Elasticsearch: + ) -> ESClient: """ Create an instance of CogStack using API key authentication. Parameters ---------- hosts : list[str] - A list of Elasticsearch host URLs. + A list of Elasticsearch or OpenSearch host URLs. apiKey : Dict, optional API key object with string fields either: - A: "id" and "api_key" - B: "encoded" - Generated in Elasticsearch or Kibana and provided by your - CogStack administrator. + Generated in Elasticsearch, OpenSearch, or Kibana and provided by + your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -160,7 +206,7 @@ def get_es_with_api_key(hosts: list[str], Returns ------- - elasticsearch.Elasticsearch: An instance of the Elasticsearch. + ESClient: An instance of the Elasticsearch or OpenSearch. """ has_encoded_value = False api_id_value: str @@ -217,12 +263,12 @@ def __connect( hosts: list[str], basic_auth: Optional[tuple[str, str]] = None, api_key: Optional[Union[str, tuple[str, str]]] = None, - ) -> elasticsearch.Elasticsearch: - """Connect to Elasticsearch using the provided credentials. + ) -> ESClient: + """Connect to Elasticsearch or OpenSearch using the credentials. Parameters ---------- hosts : list[str] - A list of Elasticsearch host URLs. + A list of Elasticsearch or OpenSearch host URLs. basic_auth : Tuple[str, str], optional A tuple containing the username and password for basic authentication. @@ -231,12 +277,12 @@ def __connect( for API key authentication. Returns ------- - elasticsearch.Elasticsearch: An instance of the Elasticsearch. + ESClient: An instance of the Elasticsearch or OpenSearch. Raises ------ - Exception: If the connection to Elasticsearch fails. + Exception: If the connection to Elasticsearch or OpenSearch fails. """ - elastic = elasticsearch.Elasticsearch( + elastic = es_cls( hosts=hosts, api_key=api_key, basic_auth=basic_auth, @@ -369,16 +415,16 @@ def read_data_with_scan( show_progress: bool = True, ): """ - Retrieve documents from an Elasticsearch index or - indices using search query and elasticsearch scan helper function. - The function converts search results to a Pandas DataFrame and does - not return current scroll id if the process fails. + Retrieve documents from an Elasticsearch or OpenSearch index or + indices using search query and elasticsearch or OpenSearch scan helper + function. The function converts search results to a Pandas DataFrame + and does not return current scroll id if the process fails. Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their - aliases to search. + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. query : dict A dictionary containing the search query parameters. Query can start with `query` key and contain other @@ -404,7 +450,7 @@ def read_data_with_scan( MAX: 10,000. request_timeout : int, optional, default=300 The time in seconds to wait for a response - from Elasticsearch before timing out. + from Elasticsearch or OpenSearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. Returns @@ -433,7 +479,7 @@ def read_data_with_scan( ) scan_results = es_helpers.scan( - self.elastic, + cast(es_cls, self.elastic), index=index, query=query, size=size, @@ -488,15 +534,15 @@ def read_data_with_scroll( show_progress: Optional[bool] = True, ): """ - Retrieves documents from an Elasticsearch index using search query - and scroll API. Default scroll timeout is set to 10 minutes. - The function converts search results to a Pandas DataFrame. + Retrieves documents from an Elasticsearch or OpenSearch index using + search query and scroll API. Default scroll timeout is set to 10 + minutes. The function converts search results to a Pandas DataFrame. Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their - aliases to search. + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. query : dict A dictionary containing the search query parameters. Query can start with `query` key @@ -530,7 +576,7 @@ def read_data_with_scroll( a new search. request_timeout : int, optional, default=300 The time in seconds to wait for a response from - Elasticsearch before timing out. + Elasticsearch or OpenSearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. IMPORTANT: The progress bar displays @@ -656,14 +702,14 @@ def read_data_with_sorting( show_progress: Optional[bool] = True, ): """ - Retrieve documents from an Elasticsearch index using search query - and convert them to a Pandas DataFrame. + Retrieve documents from an Elasticsearch or OpenSearch index using + search query and convert them to a Pandas DataFrame. Parameters ---------- index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their - aliases to search. + The name(s) of the Elasticsearch or OpenSearch indices or + their aliases to search. query : dict A dictionary containing the search query parameters. Query can start with `query` key and contain other @@ -701,7 +747,7 @@ def read_data_with_sorting( error message request_timeout : int, optional, default = 300 The time in seconds to wait for a response from - Elasticsearch before timing out. + Elasticsearch or OpenSearch before timing out. show_progress : bool, optional Whether to show the progress in console. Defaults to true. From 379eb06f36f32c31decb27024afc7dc9d9044b64 Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 16:56:22 +0100 Subject: [PATCH 38/40] CU-869aa22g2: Add missing ES/OS helpers import --- cogstack-es/cogstack.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 2d653b05..72eae968 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -18,9 +18,11 @@ else: try: from elasticsearch import Elasticsearch as ElasticClient + import elasticsearch.helpers helpers = elasticsearch.helpers except ImportError: from opensearchpy import OpenSearch as ElasticClient + import opensearchpy.helpers es_helpers = opensearchpy.helpers es_cls = ElasticClient from IPython.display import display, HTML From 349b7f3e82160cde6f0197aa75ced1fe892204be Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 17:04:41 +0100 Subject: [PATCH 39/40] CU-869aa22g2: Fix typo in variable name --- cogstack-es/cogstack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cogstack-es/cogstack.py b/cogstack-es/cogstack.py index 72eae968..37965194 100644 --- a/cogstack-es/cogstack.py +++ b/cogstack-es/cogstack.py @@ -19,7 +19,7 @@ try: from elasticsearch import Elasticsearch as ElasticClient import elasticsearch.helpers - helpers = elasticsearch.helpers + es_helpers = elasticsearch.helpers except ImportError: from opensearchpy import OpenSearch as ElasticClient import opensearchpy.helpers From 96857f02ac7e11a195297afabeab719fea6abcc7 Mon Sep 17 00:00:00 2001 From: mart-r Date: Wed, 24 Sep 2025 10:56:31 +0100 Subject: [PATCH 40/40] CU-869aa22g2: Fix test time mocking --- cogstack-es/tests/test_cogstack.py | 2 +- cogstack-es/tests/test_search_template_nb.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cogstack-es/tests/test_cogstack.py b/cogstack-es/tests/test_cogstack.py index 7fea96a5..e025f1f6 100644 --- a/cogstack-es/tests/test_cogstack.py +++ b/cogstack-es/tests/test_cogstack.py @@ -7,7 +7,7 @@ @pytest.fixture def mock_elasticsearch(): """Fixture to mock Elasticsearch client""" - with patch('cogstack.elasticsearch.Elasticsearch') as mock_es: + with patch('cogstack.es_cls') as mock_es: mock_client = Mock() mock_es.return_value = mock_client mock_client.ping.return_value = True diff --git a/cogstack-es/tests/test_search_template_nb.py b/cogstack-es/tests/test_search_template_nb.py index 1933753b..b5ffa3fb 100644 --- a/cogstack-es/tests/test_search_template_nb.py +++ b/cogstack-es/tests/test_search_template_nb.py @@ -14,7 +14,7 @@ def all_mocked(python_code: str): with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: temp_file.write(python_code) - with patch('elasticsearch.Elasticsearch') as mock_es: + with patch('cogstack.es_cls') as mock_es: with patch('credentials.hosts', ['http://localhost:9200']): with patch('credentials.api_key', {"encoded": "test_api_key"}): with patch('elasticsearch.helpers.scan') as mock_scan: