Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions awswrangler/dynamodb/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import pandas as pd
from boto3.dynamodb.types import TypeDeserializer

from awswrangler import _utils
from awswrangler._config import apply_configs

from ._utils import get_client

_logger: logging.Logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -42,22 +41,25 @@ def _get_terms_groups(terms: List[str]) -> Tuple[List[str], List[str], List[str]
return select_terms, from_terms, where_terms


def _get_scan_response(table_name: str, select_terms: List[str], client: boto3.resource) -> List[Dict[str, Any]]:
def _get_scan_response(
table_name: str, select_terms: List[str], boto3_session: Optional[boto3.Session] = None
) -> List[Dict[str, Any]]:
"""Perform a scan to the Dynamo DB table and returns the data fetched."""
client_dynamodb = _utils.client(service_name="dynamodb", session=boto3_session)
scan_config: Dict[str, Any] = {"TableName": table_name}
if len(select_terms) > 1 or select_terms[0] != "*":
scan_config["AttributesToGet"] = select_terms
# get all responses even if pagination is necessary
response = client.scan(**scan_config)
response = client_dynamodb.scan(**scan_config)
data: List[Dict[str, Any]] = response["Items"]
while "LastEvaluatedKey" in response:
scan_config["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = client.scan(**scan_config)
response = client_dynamodb.scan(**scan_config)
data.extend(response["Items"])
return data


def _get_items(client: boto3.resource, query: str) -> List[Dict[str, Any]]:
def _get_items(query: str, boto3_session: Optional[boto3.Session] = None) -> List[Dict[str, Any]]:
# clean input query from possible excessive whitespace
query = re.sub(" +", " ", query).strip()
# generate terms list from query
Expand All @@ -70,8 +72,7 @@ def _get_items(client: boto3.resource, query: str) -> List[Dict[str, Any]]:
if len(from_terms) == 0:
raise ValueError("The PartiQL query contains no tables.")
table_name = from_terms[0]
data = _get_scan_response(table_name, select_terms, client)
return data
return _get_scan_response(table_name=table_name, select_terms=select_terms, boto3_session=boto3_session)


def _deserialize_value(value: Any) -> Any:
Expand All @@ -87,11 +88,13 @@ def _deserialize_data(df: pd.DataFrame, columns: pd.Index) -> pd.DataFrame:
return df


def _parse_dynamodb_results(results: List[Dict[str, Any]]) -> pd.DataFrame:
df = pd.DataFrame(results)
columns = df.columns
df = _deserialize_data(df, columns)
return df
def _parse_dynamodb_items(
items: List[Dict[str, Any]],
dtype: Optional[Dict[str, str]] = None,
) -> pd.DataFrame:
df = pd.DataFrame(items)
df = _deserialize_data(df, df.columns)
return df.astype(dtype=dtype) if dtype else df


@apply_configs
Expand Down Expand Up @@ -140,10 +143,6 @@ def read_partiql_query(
... dtype={'key': int}
... )
"""
client = get_client(boto3_session)
_logger.debug("Reading results for PartiQL query: %s", query)
items = _get_items(client, query)
df = _parse_dynamodb_results(items)
if dtype:
df = df.astype(dtype=dtype)
return df
items = _get_items(query=query, boto3_session=boto3_session)
return _parse_dynamodb_items(items=items, dtype=dtype)
17 changes: 0 additions & 17 deletions awswrangler/dynamodb/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,6 @@ def get_table(
return dynamodb_table


@apply_configs
def get_client(boto3_session: Optional[boto3.Session] = None) -> boto3.client:
"""Get DynamoDB client.

Parameters
----------
boto3_session : Optional[boto3.Session()]
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Returns
-------
client : boto3.Client
Boto3 DynamoDB client.
"""
return _utils.client(service_name="dynamodb", session=boto3_session)


def _validate_items(
items: Union[List[Dict[str, Any]], List[Mapping[str, Any]]], dynamodb_table: boto3.resource
) -> None:
Expand Down