diff --git a/awswrangler/athena/_cache.py b/awswrangler/athena/_cache.py new file mode 100644 index 000000000..536a85047 --- /dev/null +++ b/awswrangler/athena/_cache.py @@ -0,0 +1,214 @@ +"""Cache Module for Amazon Athena.""" +import datetime +import logging +import re +from heapq import heappop, heappush +from typing import Any, Dict, List, Match, NamedTuple, Optional, Tuple, Union + +import boto3 + +from awswrangler import _utils + +_logger: logging.Logger = logging.getLogger(__name__) + + +class _CacheInfo(NamedTuple): + has_valid_cache: bool + file_format: Optional[str] = None + query_execution_id: Optional[str] = None + query_execution_payload: Optional[Dict[str, Any]] = None + + +class _LocalMetadataCacheManager: + def __init__(self) -> None: + self._cache: Dict[str, Any] = {} + self._pqueue: List[Tuple[datetime.datetime, str]] = [] + self._max_cache_size = 100 + + def update_cache(self, items: List[Dict[str, Any]]) -> None: + """ + Update the local metadata cache with new query metadata. + + Parameters + ---------- + items : List[Dict[str, Any]] + List of query execution metadata which is returned by boto3 `batch_get_query_execution()`. + + Returns + ------- + None + None. + """ + if self._pqueue: + oldest_item = self._cache[self._pqueue[0][1]] + items = list( + filter(lambda x: x["Status"]["SubmissionDateTime"] > oldest_item["Status"]["SubmissionDateTime"], items) + ) + + cache_oversize = len(self._cache) + len(items) - self._max_cache_size + for _ in range(cache_oversize): + _, query_execution_id = heappop(self._pqueue) + del self._cache[query_execution_id] + + for item in items[: self._max_cache_size]: + heappush(self._pqueue, (item["Status"]["SubmissionDateTime"], item["QueryExecutionId"])) + self._cache[item["QueryExecutionId"]] = item + + def sorted_successful_generator(self) -> List[Dict[str, Any]]: + """ + Sorts the entries in the local cache based on query Completion DateTime. + + This is useful to guarantee LRU caching rules. + + Returns + ------- + List[Dict[str, Any]] + Returns successful DDL and DML queries sorted by query completion time. + """ + filtered: List[Dict[str, Any]] = [] + for query in self._cache.values(): + if (query["Status"].get("State") == "SUCCEEDED") and (query.get("StatementType") in ["DDL", "DML"]): + filtered.append(query) + return sorted(filtered, key=lambda e: str(e["Status"]["CompletionDateTime"]), reverse=True) + + def __contains__(self, key: str) -> bool: + return key in self._cache + + @property + def max_cache_size(self) -> int: + """Property max_cache_size.""" + return self._max_cache_size + + @max_cache_size.setter + def max_cache_size(self, value: int) -> None: + self._max_cache_size = value + + +def _parse_select_query_from_possible_ctas(possible_ctas: str) -> Optional[str]: + """Check if `possible_ctas` is a valid parquet-generating CTAS and returns the full SELECT statement.""" + possible_ctas = possible_ctas.lower() + parquet_format_regex: str = r"format\s*=\s*\'parquet\'\s*," + is_parquet_format: Optional[Match[str]] = re.search(pattern=parquet_format_regex, string=possible_ctas) + if is_parquet_format is not None: + unstripped_select_statement_regex: str = r"\s+as\s+\(*(select|with).*" + unstripped_select_statement_match: Optional[Match[str]] = re.search( + unstripped_select_statement_regex, possible_ctas, re.DOTALL + ) + if unstripped_select_statement_match is not None: + stripped_select_statement_match: Optional[Match[str]] = re.search( + r"(select|with).*", unstripped_select_statement_match.group(0), re.DOTALL + ) + if stripped_select_statement_match is not None: + return stripped_select_statement_match.group(0) + return None + + +def _compare_query_string(sql: str, other: str) -> bool: + comparison_query = _prepare_query_string_for_comparison(query_string=other) + _logger.debug("sql: %s", sql) + _logger.debug("comparison_query: %s", comparison_query) + if sql == comparison_query: + return True + return False + + +def _prepare_query_string_for_comparison(query_string: str) -> str: + """To use cached data, we need to compare queries. Returns a query string in canonical form.""" + # for now this is a simple complete strip, but it could grow into much more sophisticated + # query comparison data structures + query_string = "".join(query_string.split()).strip("()").lower() + query_string = query_string[:-1] if query_string.endswith(";") else query_string + return query_string + + +def _get_last_query_infos( + max_remote_cache_entries: int, + boto3_session: Optional[boto3.Session] = None, + workgroup: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Return an iterator of `query_execution_info`s run by the workgroup in Athena.""" + client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session) + page_size = 50 + args: Dict[str, Union[str, Dict[str, int]]] = { + "PaginationConfig": {"MaxItems": max_remote_cache_entries, "PageSize": page_size} + } + if workgroup is not None: + args["WorkGroup"] = workgroup + paginator = client_athena.get_paginator("list_query_executions") + uncached_ids = [] + for page in paginator.paginate(**args): + _logger.debug("paginating Athena's queries history...") + query_execution_id_list: List[str] = page["QueryExecutionIds"] + for query_execution_id in query_execution_id_list: + if query_execution_id not in _cache_manager: + uncached_ids.append(query_execution_id) + if uncached_ids: + new_execution_data = [] + for i in range(0, len(uncached_ids), page_size): + new_execution_data.extend( + client_athena.batch_get_query_execution(QueryExecutionIds=uncached_ids[i : i + page_size]).get( + "QueryExecutions" + ) + ) + _cache_manager.update_cache(new_execution_data) + return _cache_manager.sorted_successful_generator() + + +def _check_for_cached_results( + sql: str, + boto3_session: boto3.Session, + workgroup: Optional[str], + max_cache_seconds: int, + max_cache_query_inspections: int, + max_remote_cache_entries: int, +) -> _CacheInfo: + """ + Check whether `sql` has been run before, within the `max_cache_seconds` window, by the `workgroup`. + + If so, returns a dict with Athena's `query_execution_info` and the data format. + """ + if max_cache_seconds <= 0: + return _CacheInfo(has_valid_cache=False) + num_executions_inspected: int = 0 + comparable_sql: str = _prepare_query_string_for_comparison(sql) + current_timestamp: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) + _logger.debug("current_timestamp: %s", current_timestamp) + for query_info in _get_last_query_infos( + max_remote_cache_entries=max_remote_cache_entries, + boto3_session=boto3_session, + workgroup=workgroup, + ): + query_execution_id: str = query_info["QueryExecutionId"] + query_timestamp: datetime.datetime = query_info["Status"]["CompletionDateTime"] + _logger.debug("query_timestamp: %s", query_timestamp) + if (current_timestamp - query_timestamp).total_seconds() > max_cache_seconds: + return _CacheInfo( + has_valid_cache=False, query_execution_id=query_execution_id, query_execution_payload=query_info + ) + statement_type: Optional[str] = query_info.get("StatementType") + if statement_type == "DDL" and query_info["Query"].startswith("CREATE TABLE"): + parsed_query: Optional[str] = _parse_select_query_from_possible_ctas(possible_ctas=query_info["Query"]) + if parsed_query is not None: + if _compare_query_string(sql=comparable_sql, other=parsed_query): + return _CacheInfo( + has_valid_cache=True, + file_format="parquet", + query_execution_id=query_execution_id, + query_execution_payload=query_info, + ) + elif statement_type == "DML" and not query_info["Query"].startswith("INSERT"): + if _compare_query_string(sql=comparable_sql, other=query_info["Query"]): + return _CacheInfo( + has_valid_cache=True, + file_format="csv", + query_execution_id=query_execution_id, + query_execution_payload=query_info, + ) + num_executions_inspected += 1 + _logger.debug("num_executions_inspected: %s", num_executions_inspected) + if num_executions_inspected >= max_cache_query_inspections: + return _CacheInfo(has_valid_cache=False) + return _CacheInfo(has_valid_cache=False) + + +_cache_manager = _LocalMetadataCacheManager() diff --git a/awswrangler/athena/_read.py b/awswrangler/athena/_read.py index 62b43c1cb..ae1354d64 100644 --- a/awswrangler/athena/_read.py +++ b/awswrangler/athena/_read.py @@ -1,12 +1,10 @@ """Amazon Athena Module gathering all read_sql_* function.""" import csv -import datetime import logging -import re import sys import uuid -from typing import Any, Dict, Iterator, List, Match, NamedTuple, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import boto3 import botocore.exceptions @@ -21,20 +19,14 @@ _get_query_metadata, _get_s3_output, _get_workgroup_config, - _LocalMetadataCacheManager, _QueryMetadata, _start_query_execution, _WorkGroupConfig, ) -_logger: logging.Logger = logging.getLogger(__name__) - +from ._cache import _cache_manager, _CacheInfo, _check_for_cached_results -class _CacheInfo(NamedTuple): - has_valid_cache: bool - file_format: Optional[str] = None - query_execution_id: Optional[str] = None - query_execution_payload: Optional[Dict[str, Any]] = None +_logger: logging.Logger = logging.getLogger(__name__) def _extract_ctas_manifest_paths(path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]: @@ -86,133 +78,6 @@ def _delete_after_iterate( ) -def _prepare_query_string_for_comparison(query_string: str) -> str: - """To use cached data, we need to compare queries. Returns a query string in canonical form.""" - # for now this is a simple complete strip, but it could grow into much more sophisticated - # query comparison data structures - query_string = "".join(query_string.split()).strip("()").lower() - query_string = query_string[:-1] if query_string.endswith(";") else query_string - return query_string - - -def _compare_query_string(sql: str, other: str) -> bool: - comparison_query = _prepare_query_string_for_comparison(query_string=other) - _logger.debug("sql: %s", sql) - _logger.debug("comparison_query: %s", comparison_query) - if sql == comparison_query: - return True - return False - - -def _get_last_query_infos( - max_remote_cache_entries: int, - boto3_session: Optional[boto3.Session] = None, - workgroup: Optional[str] = None, -) -> List[Dict[str, Any]]: - """Return an iterator of `query_execution_info`s run by the workgroup in Athena.""" - client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session) - page_size = 50 - args: Dict[str, Union[str, Dict[str, int]]] = { - "PaginationConfig": {"MaxItems": max_remote_cache_entries, "PageSize": page_size} - } - if workgroup is not None: - args["WorkGroup"] = workgroup - paginator = client_athena.get_paginator("list_query_executions") - uncached_ids = [] - for page in paginator.paginate(**args): - _logger.debug("paginating Athena's queries history...") - query_execution_id_list: List[str] = page["QueryExecutionIds"] - for query_execution_id in query_execution_id_list: - if query_execution_id not in _cache_manager: - uncached_ids.append(query_execution_id) - if uncached_ids: - new_execution_data = [] - for i in range(0, len(uncached_ids), page_size): - new_execution_data.extend( - client_athena.batch_get_query_execution(QueryExecutionIds=uncached_ids[i : i + page_size]).get( - "QueryExecutions" - ) - ) - _cache_manager.update_cache(new_execution_data) - return _cache_manager.sorted_successful_generator() - - -def _parse_select_query_from_possible_ctas(possible_ctas: str) -> Optional[str]: - """Check if `possible_ctas` is a valid parquet-generating CTAS and returns the full SELECT statement.""" - possible_ctas = possible_ctas.lower() - parquet_format_regex: str = r"format\s*=\s*\'parquet\'\s*," - is_parquet_format: Optional[Match[str]] = re.search(pattern=parquet_format_regex, string=possible_ctas) - if is_parquet_format is not None: - unstripped_select_statement_regex: str = r"\s+as\s+\(*(select|with).*" - unstripped_select_statement_match: Optional[Match[str]] = re.search( - unstripped_select_statement_regex, possible_ctas, re.DOTALL - ) - if unstripped_select_statement_match is not None: - stripped_select_statement_match: Optional[Match[str]] = re.search( - r"(select|with).*", unstripped_select_statement_match.group(0), re.DOTALL - ) - if stripped_select_statement_match is not None: - return stripped_select_statement_match.group(0) - return None - - -def _check_for_cached_results( - sql: str, - boto3_session: boto3.Session, - workgroup: Optional[str], - max_cache_seconds: int, - max_cache_query_inspections: int, - max_remote_cache_entries: int, -) -> _CacheInfo: - """ - Check whether `sql` has been run before, within the `max_cache_seconds` window, by the `workgroup`. - - If so, returns a dict with Athena's `query_execution_info` and the data format. - """ - if max_cache_seconds <= 0: - return _CacheInfo(has_valid_cache=False) - num_executions_inspected: int = 0 - comparable_sql: str = _prepare_query_string_for_comparison(sql) - current_timestamp: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) - _logger.debug("current_timestamp: %s", current_timestamp) - for query_info in _get_last_query_infos( - max_remote_cache_entries=max_remote_cache_entries, - boto3_session=boto3_session, - workgroup=workgroup, - ): - query_execution_id: str = query_info["QueryExecutionId"] - query_timestamp: datetime.datetime = query_info["Status"]["CompletionDateTime"] - _logger.debug("query_timestamp: %s", query_timestamp) - if (current_timestamp - query_timestamp).total_seconds() > max_cache_seconds: - return _CacheInfo( - has_valid_cache=False, query_execution_id=query_execution_id, query_execution_payload=query_info - ) - statement_type: Optional[str] = query_info.get("StatementType") - if statement_type == "DDL" and query_info["Query"].startswith("CREATE TABLE"): - parsed_query: Optional[str] = _parse_select_query_from_possible_ctas(possible_ctas=query_info["Query"]) - if parsed_query is not None: - if _compare_query_string(sql=comparable_sql, other=parsed_query): - return _CacheInfo( - has_valid_cache=True, - file_format="parquet", - query_execution_id=query_execution_id, - query_execution_payload=query_info, - ) - elif statement_type == "DML" and not query_info["Query"].startswith("INSERT"): - if _compare_query_string(sql=comparable_sql, other=query_info["Query"]): - return _CacheInfo( - has_valid_cache=True, - file_format="csv", - query_execution_id=query_execution_id, - query_execution_payload=query_info, - ) - num_executions_inspected += 1 - _logger.debug("num_executions_inspected: %s", num_executions_inspected) - if num_executions_inspected >= max_cache_query_inspections: - return _CacheInfo(has_valid_cache=False) - return _CacheInfo(has_valid_cache=False) - - def _fetch_parquet_result( query_metadata: _QueryMetadata, keep_files: bool, @@ -1114,6 +979,3 @@ def read_sql_table( s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs, ) - - -_cache_manager = _LocalMetadataCacheManager() diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index c27fa9b6e..fe7dcb6c3 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -1,13 +1,11 @@ """Utilities Module for Amazon Athena.""" import csv -import datetime import logging import pprint import time import warnings from decimal import Decimal -from heapq import heappop, heappush -from typing import Any, Dict, Generator, List, NamedTuple, Optional, Tuple, Union, cast +from typing import Any, Dict, Generator, List, NamedTuple, Optional, Union, cast import boto3 import botocore.exceptions @@ -16,6 +14,8 @@ from awswrangler import _data_types, _utils, exceptions, s3, sts from awswrangler._config import apply_configs +from ._cache import _cache_manager, _CacheInfo, _check_for_cached_results, _LocalMetadataCacheManager + _QUERY_FINAL_STATES: List[str] = ["FAILED", "SUCCEEDED", "CANCELLED"] _QUERY_WAIT_POLLING_DELAY: float = 0.25 # SECONDS @@ -41,71 +41,6 @@ class _WorkGroupConfig(NamedTuple): kms_key: Optional[str] -class _LocalMetadataCacheManager: - def __init__(self) -> None: - self._cache: Dict[str, Any] = {} - self._pqueue: List[Tuple[datetime.datetime, str]] = [] - self._max_cache_size = 100 - - def update_cache(self, items: List[Dict[str, Any]]) -> None: - """ - Update the local metadata cache with new query metadata. - - Parameters - ---------- - items : List[Dict[str, Any]] - List of query execution metadata which is returned by boto3 `batch_get_query_execution()`. - - Returns - ------- - None - None. - """ - if self._pqueue: - oldest_item = self._cache[self._pqueue[0][1]] - items = list( - filter(lambda x: x["Status"]["SubmissionDateTime"] > oldest_item["Status"]["SubmissionDateTime"], items) - ) - - cache_oversize = len(self._cache) + len(items) - self._max_cache_size - for _ in range(cache_oversize): - _, query_execution_id = heappop(self._pqueue) - del self._cache[query_execution_id] - - for item in items[: self._max_cache_size]: - heappush(self._pqueue, (item["Status"]["SubmissionDateTime"], item["QueryExecutionId"])) - self._cache[item["QueryExecutionId"]] = item - - def sorted_successful_generator(self) -> List[Dict[str, Any]]: - """ - Sorts the entries in the local cache based on query Completion DateTime. - - This is useful to guarantee LRU caching rules. - - Returns - ------- - List[Dict[str, Any]] - Returns successful DDL and DML queries sorted by query completion time. - """ - filtered: List[Dict[str, Any]] = [] - for query in self._cache.values(): - if (query["Status"].get("State") == "SUCCEEDED") and (query.get("StatementType") in ["DDL", "DML"]): - filtered.append(query) - return sorted(filtered, key=lambda e: str(e["Status"]["CompletionDateTime"]), reverse=True) - - def __contains__(self, key: str) -> bool: - return key in self._cache - - @property - def max_cache_size(self) -> int: - """Property max_cache_size.""" - return self._max_cache_size - - @max_cache_size.setter - def max_cache_size(self, value: int) -> None: - self._max_cache_size = value - - def _get_s3_output(s3_output: Optional[str], wg_config: _WorkGroupConfig, boto3_session: boto3.Session) -> str: if wg_config.enforced and wg_config.s3_output is not None: return wg_config.s3_output @@ -422,8 +357,13 @@ def start_query_execution( kms_key: Optional[str] = None, params: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None, + max_cache_seconds: int = 0, + max_cache_query_inspections: int = 50, + max_remote_cache_entries: int = 50, + max_local_cache_entries: int = 100, data_source: Optional[str] = None, -) -> str: + wait: bool = False, +) -> Union[str, Dict[str, Any]]: """Start a SQL Query against AWS Athena. Note @@ -451,13 +391,34 @@ def start_query_execution( `:name;`. Note that for varchar columns and similar, you must surround the value in single quotes. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. + max_cache_seconds: int + Wrangler can look up in Athena's history if this query has been run before. + If so, and its completion time is less than `max_cache_seconds` before now, wrangler + skips query execution and just returns the same results as last time. + If cached results are valid, wrangler ignores the `s3_output`, `encryption` and `kms_key` params. + If reading cached data fails for any reason, execution falls back to the usual query run path. + max_cache_query_inspections : int + Max number of queries that will be inspected from the history to try to find some result to reuse. + The bigger the number of inspection, the bigger will be the latency for not cached queries. + Only takes effect if max_cache_seconds > 0. + max_remote_cache_entries : int + Max number of queries that will be retrieved from AWS for cache inspection. + The bigger the number of inspection, the bigger will be the latency for not cached queries. + Only takes effect if max_cache_seconds > 0 and default value is 50. + max_local_cache_entries : int + Max number of queries for which metadata will be cached locally. This will reduce the latency and also + enables keeping more than `max_remote_cache_entries` available for the cache. This value should not be + smaller than max_remote_cache_entries. + Only takes effect if max_cache_seconds > 0 and default value is 100. data_source : str, optional Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default. + wait : bool, default False + Indicates whether to wait for the query to finish and return a dictionary with the query execution response. Returns ------- - str - Query execution ID + Union[str, Dict[str, Any]] + Query execution ID if `wait` is set to `False`, dictionary with the get_query_execution response otherwise. Examples -------- @@ -477,19 +438,39 @@ def start_query_execution( for key, value in params.items(): sql = sql.replace(f":{key};", str(value)) session: boto3.Session = _utils.ensure_session(session=boto3_session) - wg_config: _WorkGroupConfig = _get_workgroup_config(session=session, workgroup=workgroup) - return _start_query_execution( + + max_remote_cache_entries = min(max_remote_cache_entries, max_local_cache_entries) + + _cache_manager.max_cache_size = max_local_cache_entries + cache_info: _CacheInfo = _check_for_cached_results( sql=sql, - wg_config=wg_config, - database=database, - data_source=data_source, - s3_output=s3_output, - workgroup=workgroup, - encryption=encryption, - kms_key=kms_key, boto3_session=session, + workgroup=workgroup, + max_cache_seconds=max_cache_seconds, + max_cache_query_inspections=max_cache_query_inspections, + max_remote_cache_entries=max_remote_cache_entries, ) + if cache_info.has_valid_cache and cache_info.query_execution_id is not None: + query_execution_id = cache_info.query_execution_id + else: + wg_config: _WorkGroupConfig = _get_workgroup_config(session=session, workgroup=workgroup) + query_execution_id = _start_query_execution( + sql=sql, + wg_config=wg_config, + database=database, + data_source=data_source, + s3_output=s3_output, + workgroup=workgroup, + encryption=encryption, + kms_key=kms_key, + boto3_session=session, + ) + if wait: + return wait_query(query_execution_id=query_execution_id, boto3_session=session) + + return query_execution_id + @apply_configs def repair_table( diff --git a/tests/test_athena.py b/tests/test_athena.py index 7ddf8f913..c88f0a077 100644 --- a/tests/test_athena.py +++ b/tests/test_athena.py @@ -979,3 +979,29 @@ def test_bucketing_combined_csv_saving(path, glue_database, glue_table): assert df2.equals(df3) assert scanned_regular >= scanned_bucketed * nb_of_buckets + + +def test_start_query_execution_wait(path, glue_database, glue_table): + wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) + wr.s3.to_parquet( + df=get_df(), + path=path, + index=True, + use_threads=True, + dataset=True, + mode="overwrite", + database=glue_database, + table=glue_table, + partition_cols=["par0", "par1"], + ) + + sql = f"SELECT * FROM {glue_table}" + query_id = wr.athena.start_query_execution(sql=sql, database=glue_database, wait=False) + + query_execution_result = wr.athena.start_query_execution(sql=sql, database=glue_database, wait=True) + + assert isinstance(query_id, str) + assert isinstance(query_execution_result, dict) + assert query_execution_result["Query"] == sql + assert query_execution_result["StatementType"] == "DML" + assert query_execution_result["QueryExecutionContext"]["Database"] == glue_database diff --git a/tests/test_athena_cache.py b/tests/test_athena_cache.py index d4cb08830..90f8c6c42 100644 --- a/tests/test_athena_cache.py +++ b/tests/test_athena_cache.py @@ -170,3 +170,36 @@ def test_paginated_remote_cache(path, glue_database, glue_table, workgroup1): ) assert df.shape == df2.shape assert df.c0.sum() == df2.c0.sum() + + +@pytest.mark.parametrize("data_source", [None, "AwsDataCatalog"]) +def test_cache_start_query(path, glue_database, glue_table, data_source): + df = pd.DataFrame({"c0": [0, None]}, dtype="Int64") + wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite", + database=glue_database, + table=glue_table, + description="c0", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))}, + columns_comments={"c0": "0"}, + ) + + with patch( + "awswrangler.athena._utils._check_for_cached_results", + return_value=wr.athena._read._CacheInfo(has_valid_cache=False), + ) as mocked_cache_attempt: + query_id = wr.athena.start_query_execution(sql=f"SELECT * FROM {glue_table}", database=glue_database) + mocked_cache_attempt.assert_called() + + # Wait for query to finish in order to successfully check cache + wr.athena.wait_query(query_execution_id=query_id) + + with patch("awswrangler.athena._utils._start_query_execution") as internal_start_query: + query_id_2 = wr.athena.start_query_execution( + sql=f"SELECT * FROM {glue_table}", database=glue_database, max_cache_seconds=900 + ) + internal_start_query.assert_not_called() + assert query_id == query_id_2