Skip to content

Commit

Permalink
Add boto3_kwargs parameter (#52)
Browse files Browse the repository at this point in the history
Add the boto3_kwargs parameter to the get_df, put_df and various
trasactions module functions to allow passing parameters to the
underlying boto3.client() or boto3.resource() functions.

Add corresponding unit tests.
  • Loading branch information
DrGFreeman committed Oct 3, 2021
1 parent 5b3f684 commit 7fb8dc5
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 21 deletions.
26 changes: 20 additions & 6 deletions dynamo_pandas/dynamo_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .transactions import put_items


def get_df(*, table, keys=None, attributes=None, dtype=None):
def get_df(*, table, keys=None, attributes=None, dtype=None, boto3_kwargs={}):
"""Get items from a table into a dataframe.
Parameters
Expand All @@ -26,6 +26,11 @@ def get_df(*, table, keys=None, attributes=None, dtype=None):
numpy.dtype or Python type to cast one or more of the DataFrame’s columns to
column-specific types.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.resource('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_
for details).
Returns
-------
pandas.DataFrame
Expand Down Expand Up @@ -129,9 +134,13 @@ def get_df(*, table, keys=None, attributes=None, dtype=None):
3 player_two 3.8
""" # noqa: E501
if keys is not None:
items = get_items(keys=keys, table=table, attributes=attributes)
items = get_items(
keys=keys, table=table, attributes=attributes, boto3_kwargs=boto3_kwargs
)
else:
items = get_all_items(table=table, attributes=attributes)
items = get_all_items(
table=table, attributes=attributes, boto3_kwargs=boto3_kwargs
)

return _to_df(items=items, dtype=dtype)

Expand Down Expand Up @@ -169,7 +178,7 @@ def keys(**kwargs):
return [{k: v} for v in kwargs[k]]


def put_df(df, *, table):
def put_df(df, *, table, boto3_kwargs={}):
"""Put rows of a dataframe as items into a table. If the item(s) do not exist in the
table they are created, otherwise the existing items are replaced with the new ones.
Expand All @@ -182,6 +191,11 @@ def put_df(df, *, table):
table : str
Name of the DynamoDB table.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.client('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_
for details).
Examples
--------
Assume with have the following dataframe:
Expand All @@ -197,8 +211,8 @@ def put_df(df, *, table):
``players``:
>>> put_df(players_df, table="players")
"""
put_items(items=_to_items(df), table=table)
""" # noqa: E501
put_items(items=_to_items(df), table=table, boto3_kwargs=boto3_kwargs)


def _to_df(items, *, dtype=None):
Expand Down
55 changes: 40 additions & 15 deletions dynamo_pandas/transactions/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def _batches(items, batch_size):
start += batch_size


def get_item(*, key, table, attributes=None):
def get_item(*, key, table, attributes=None, boto3_kwargs={}):
"""Get a single item from a table.
Parameters
Expand All @@ -37,6 +37,11 @@ def get_item(*, key, table, attributes=None):
Names of the item attributes to return. If None (default), all attributes are
returned.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.resource('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_
for details).
Returns
-------
dict, None
Expand All @@ -63,8 +68,8 @@ def get_item(*, key, table, attributes=None):
... )
>>> print(item)
{'rating': 3.8, 'play_time': '0 days 22:07:34'}
"""
table = boto3.resource("dynamodb").Table(table)
""" # noqa: E501
table = boto3.resource("dynamodb", **boto3_kwargs).Table(table)

kwargs = {}
if attributes is not None:
Expand All @@ -75,7 +80,7 @@ def get_item(*, key, table, attributes=None):
return _deserialize(item)


def get_items(*, keys, table, attributes=None):
def get_items(*, keys, table, attributes=None, boto3_kwargs={}):
"""Get multiple items from a table.
Parameters
Expand All @@ -90,6 +95,11 @@ def get_items(*, keys, table, attributes=None):
Names of the item attributes to return. If None (default), all attributes are
returned.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.resource('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_
for details).
Returns
-------
list[dict]
Expand Down Expand Up @@ -141,7 +151,7 @@ def _get_items(keys, table=table, attributes=attributes):

return items

resource = boto3.resource("dynamodb")
resource = boto3.resource("dynamodb", **boto3_kwargs)

key_batches = _batches(keys, batch_size=100)

Expand All @@ -152,7 +162,7 @@ def _get_items(keys, table=table, attributes=attributes):
return _deserialize(items)


def get_all_items(*, table, attributes=None):
def get_all_items(*, table, attributes=None, boto3_kwargs={}):
"""Get all the items in a table.
This function performs a scan of the table.
Expand All @@ -166,6 +176,11 @@ def get_all_items(*, table, attributes=None):
Names of the item attributes to return. If None (default), all attributes are
returned.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.resource('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_
for details).
Returns
-------
list[dict]
Expand All @@ -190,7 +205,7 @@ def get_all_items(*, table, attributes=None):
{'player_id': 'player_one', 'play_time': '2 days 17:41:55'},
{'player_id': 'player_two', 'play_time': '0 days 22:07:34'}]
""" # noqa: E501
table = boto3.resource("dynamodb").Table(table)
table = boto3.resource("dynamodb", **boto3_kwargs).Table(table)

kwargs = {}
if attributes is not None:
Expand All @@ -207,7 +222,7 @@ def get_all_items(*, table, attributes=None):
return _deserialize(items)


def put_item(*, item, table, return_response=False):
def put_item(*, item, table, return_response=False, boto3_kwargs={}):
"""Add or update an item in a table. If the item does not exist in the table it is
created, otherwise the existing item is replaced with the new one.
Expand All @@ -225,6 +240,11 @@ def put_item(*, item, table, return_response=False):
return_response : bool
If True, the response from the boto3 API call will be returned.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.client('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_
for details).
Returns
-------
None, dict
Expand All @@ -242,22 +262,22 @@ def put_item(*, item, table, return_response=False):
>>> response = put_item(item=item, table="players", return_response=True)
>>> print(response["ResponseMetadata"]["HTTPStatusCode"])
200
"""
""" # noqa: E501
if not isinstance(item, dict):
raise TypeError("item must be a non-empty dictionary")

client = boto3.client("dynamodb")
client = boto3.client("dynamodb", **boto3_kwargs)

response = client.put_item(TableName=table, Item=ts.serialize(item)["M"])

if return_response:
return response


def _put_items(items, table):
def _put_items(items, table, boto3_kwargs):
"""Adapter function to format the items, call the client batch_write_item function
and return the unprocessed items (if any) in the format they were provided."""
client = boto3.client("dynamodb")
client = boto3.client("dynamodb", **boto3_kwargs)

response = client.batch_write_item(
RequestItems={table: [{"PutRequest": {"Item": item}} for item in items]}
Expand All @@ -271,7 +291,7 @@ def _put_items(items, table):
return []


def put_items(*, items, table):
def put_items(*, items, table, boto3_kwargs={}):
"""Add or update multiple items in a table. If the item(s) do not exist in the
table they are created, otherwise the existing items are replaced with the new ones.
Expand All @@ -285,6 +305,11 @@ def put_items(*, items, table):
table : str
Name of the DynamoDB table.
boto3_kwargs : dict
Keyword arguments to pass to the underlying ``boto3.client('dynamodb')``
function call (see `boto3 docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_
for details).
Examples
--------
Expand All @@ -300,7 +325,7 @@ def put_items(*, items, table):
'player_id': 'player_four',
'rating': 4.8}]
>>> put_items(items=items, table="players)
"""
""" # noqa: E501
if not isinstance(items, list):
raise TypeError("items must be a list of non-empty dictionaries")

Expand All @@ -311,7 +336,7 @@ def put_items(*, items, table):
batch_items = items_to_process[:batch_size]
items_to_process = items_to_process[batch_size:]

unprocessed_items = _put_items(batch_items, table)
unprocessed_items = _put_items(batch_items, table, boto3_kwargs)

if len(unprocessed_items) > batch_size // 2:
batch_size = max(batch_size // 2, 1)
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def aws_credentials():
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture()
Expand Down
40 changes: 40 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from unittest import mock

import pandas as pd
import pytest
Expand Down Expand Up @@ -187,6 +188,29 @@ def test_dtype(self, test_df_table):
"id": "int64",
}

@pytest.mark.parametrize("keys", (None, [{"id": 0}]))
def test_boto3_kwargs_are_passed(self, ddb_client, test_df_table, keys):
"""Test that the boto3_kwargs are passed to the boto3.resource() function
call."""
# test_df_table is defined in us-east-1. By setting the region_name to
# ca-central-1, we expect a ResourceNotFoundException.
# Moto raises a ValueError instead so expect it as well
# (see https://github.com/spulec/moto/issues/4344)
with pytest.raises(
(ddb_client.exceptions.ResourceNotFoundException, ValueError)
):
get_df(
keys=keys,
table=test_df_table,
boto3_kwargs=dict(region_name="ca-central-1"),
)

# With the correct region we expect to get the items.
df = get_df(
keys=keys, table=test_df_table, boto3_kwargs=dict(region_name="us-east-1"),
)
assert not df.empty


class Test_put_df:
"""Test the put_df function."""
Expand Down Expand Up @@ -228,6 +252,22 @@ def test_with_pd_types(self, ddb_client, empty_table):
)
)

def test_boto3_kwargs_are_passed(self, ddb_client, empty_table):
"""Test that the boto3_kwargs are passed to the boto3.client() function call."""
# Moto does not raise the expected ResourceNotFoundError (see
# https://github.com/spulec/moto/issues/4347)
# We mock the boto3.client call instead and verify the boto3_kwargs are passed
with mock.patch(
"dynamo_pandas.transactions.transactions.boto3.client"
) as client:
put_df(
test_df,
table=empty_table,
boto3_kwargs=dict(region_name="ca-central-1"),
)

assert client.call_args[1] == dict(region_name="ca-central-1")


class Test__to_df:
"""Test the _to_df function."""
Expand Down
Loading

0 comments on commit 7fb8dc5

Please sign in to comment.