From 7fb6889e69d109c9deaafd96141a71f3ac47e35e Mon Sep 17 00:00:00 2001 From: Abdel Jaidi Date: Thu, 16 Jun 2022 15:24:38 +0100 Subject: [PATCH] (refactor): Refactor DynamoDB read_partiql method --- awswrangler/dynamodb/_read.py | 37 +++++++++++++++++----------------- awswrangler/dynamodb/_utils.py | 17 ---------------- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/awswrangler/dynamodb/_read.py b/awswrangler/dynamodb/_read.py index 0e1f28067..160401d58 100644 --- a/awswrangler/dynamodb/_read.py +++ b/awswrangler/dynamodb/_read.py @@ -8,10 +8,9 @@ import pandas as pd from boto3.dynamodb.types import TypeDeserializer +from awswrangler import _utils from awswrangler._config import apply_configs -from ._utils import get_client - _logger: logging.Logger = logging.getLogger(__name__) @@ -42,22 +41,25 @@ def _get_terms_groups(terms: List[str]) -> Tuple[List[str], List[str], List[str] return select_terms, from_terms, where_terms -def _get_scan_response(table_name: str, select_terms: List[str], client: boto3.resource) -> List[Dict[str, Any]]: +def _get_scan_response( + table_name: str, select_terms: List[str], boto3_session: Optional[boto3.Session] = None +) -> List[Dict[str, Any]]: """Perform a scan to the Dynamo DB table and returns the data fetched.""" + client_dynamodb = _utils.client(service_name="dynamodb", session=boto3_session) scan_config: Dict[str, Any] = {"TableName": table_name} if len(select_terms) > 1 or select_terms[0] != "*": scan_config["AttributesToGet"] = select_terms # get all responses even if pagination is necessary - response = client.scan(**scan_config) + response = client_dynamodb.scan(**scan_config) data: List[Dict[str, Any]] = response["Items"] while "LastEvaluatedKey" in response: scan_config["ExclusiveStartKey"] = response["LastEvaluatedKey"] - response = client.scan(**scan_config) + response = client_dynamodb.scan(**scan_config) data.extend(response["Items"]) return data -def _get_items(client: boto3.resource, query: str) -> List[Dict[str, Any]]: +def _get_items(query: str, boto3_session: Optional[boto3.Session] = None) -> List[Dict[str, Any]]: # clean input query from possible excessive whitespace query = re.sub(" +", " ", query).strip() # generate terms list from query @@ -70,8 +72,7 @@ def _get_items(client: boto3.resource, query: str) -> List[Dict[str, Any]]: if len(from_terms) == 0: raise ValueError("The PartiQL query contains no tables.") table_name = from_terms[0] - data = _get_scan_response(table_name, select_terms, client) - return data + return _get_scan_response(table_name=table_name, select_terms=select_terms, boto3_session=boto3_session) def _deserialize_value(value: Any) -> Any: @@ -87,11 +88,13 @@ def _deserialize_data(df: pd.DataFrame, columns: pd.Index) -> pd.DataFrame: return df -def _parse_dynamodb_results(results: List[Dict[str, Any]]) -> pd.DataFrame: - df = pd.DataFrame(results) - columns = df.columns - df = _deserialize_data(df, columns) - return df +def _parse_dynamodb_items( + items: List[Dict[str, Any]], + dtype: Optional[Dict[str, str]] = None, +) -> pd.DataFrame: + df = pd.DataFrame(items) + df = _deserialize_data(df, df.columns) + return df.astype(dtype=dtype) if dtype else df @apply_configs @@ -140,10 +143,6 @@ def read_partiql_query( ... dtype={'key': int} ... ) """ - client = get_client(boto3_session) _logger.debug("Reading results for PartiQL query: %s", query) - items = _get_items(client, query) - df = _parse_dynamodb_results(items) - if dtype: - df = df.astype(dtype=dtype) - return df + items = _get_items(query=query, boto3_session=boto3_session) + return _parse_dynamodb_items(items=items, dtype=dtype) diff --git a/awswrangler/dynamodb/_utils.py b/awswrangler/dynamodb/_utils.py index bb960a82e..0baecf726 100644 --- a/awswrangler/dynamodb/_utils.py +++ b/awswrangler/dynamodb/_utils.py @@ -34,23 +34,6 @@ def get_table( return dynamodb_table -@apply_configs -def get_client(boto3_session: Optional[boto3.Session] = None) -> boto3.client: - """Get DynamoDB client. - - Parameters - ---------- - boto3_session : Optional[boto3.Session()] - Boto3 Session. The default boto3 Session will be used if boto3_session receive None. - - Returns - ------- - client : boto3.Client - Boto3 DynamoDB client. - """ - return _utils.client(service_name="dynamodb", session=boto3_session) - - def _validate_items( items: Union[List[Dict[str, Any]], List[Mapping[str, Any]]], dynamodb_table: boto3.resource ) -> None: