Skip to content

Commit

Permalink
bump deps (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod committed Oct 25, 2019
1 parent 2a9426c commit 957c91b
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 134 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ python:
- 3.5
- 3.6
- 3.7
- 3.8

matrix:
include:
Expand Down
7 changes: 5 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
Changes
-------

0.10.4 (2019-XX-XX)
0.10.4 (2019-10-24)
^^^^^^^^^^^^^^^^^^^
* Pin aws-xray-sdk to aiobotocore
* Make AioBaseClient.close method async #724 (thanks @bsitruk)
* Bump awscli, boto3, botocore #735 (thanks @bbrendon)
* switch paginator to async_generator, add result_key_iters
(deprecate next_page method)

0.10.3 (2019-07-17)
^^^^^^^^^^^^^^^^^^^
Expand Down
4 changes: 1 addition & 3 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ How to Upgrade Botocore
-----------------------
aiobotocore's file names try to match the botocore files they functionally match.
For the most part botocore classes are sub-classed with the majority of the
botocore calls eventually called...however certain methods like
`PageIterator.next_page` had to be re-implemented so watch for changes in those
types of methods.
botocore calls eventually called.

The best way I've seen to upgrade botocore support is by downloading the sources
of the release of botocore you're trying to upgrade to, and the version
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ vtest:
python3 -m pytest -s -v $(FLAGS) ./tests/

checkrst:
python setup.py check -rms
python3 setup.py check -rms

cov cover coverage: flake
python3 -m pytest -s -v --cov-report term --cov-report html --cov aiobotocore ./tests
Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .session import get_session, AioSession

__all__ = ['get_session', 'AioSession']
__version__ = '0.10.4a0'
__version__ = '0.10.4'
10 changes: 3 additions & 7 deletions aiobotocore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import botocore.client
from botocore.exceptions import OperationNotPageableError
from botocore.history import get_global_history_recorder
from botocore.paginate import Paginator
from botocore.utils import get_service_module_name
from botocore.waiter import xform_name

from .paginate import AioPageIterator
from .paginate import AioPaginator
from .args import AioClientArgsCreator
from . import waiter

Expand Down Expand Up @@ -143,16 +142,13 @@ def get_paginator(self, operation_name):
if not self.can_paginate(operation_name):
raise OperationNotPageableError(operation_name=operation_name)
else:
# substitute iterator with async one
Paginator.PAGE_ITERATOR_CLS = AioPageIterator

actual_operation_name = self._PY_TO_OP_NAME[operation_name]

# Create a new paginate method that will serve as a proxy to
# the underlying Paginator.paginate method. This is needed to
# attach a docstring to the method.
def paginate(self, **kwargs):
return Paginator.paginate(self, **kwargs)
return AioPaginator.paginate(self, **kwargs)

paginator_config = self._cache['page_config'][
actual_operation_name]
Expand All @@ -164,7 +160,7 @@ def paginate(self, **kwargs):

# Create the new paginator class
documented_paginator_cls = type(
paginator_class_name, (Paginator,), {'paginate': paginate})
paginator_class_name, (AioPaginator,), {'paginate': paginate})

operation_model = self._service_model.\
operation_model(actual_operation_name)
Expand Down
243 changes: 152 additions & 91 deletions aiobotocore/paginate.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,141 @@
import asyncio
import warnings

from botocore.exceptions import PaginationError
from botocore.paginate import PageIterator
from botocore.paginate import Paginator, PageIterator
from botocore.utils import set_value_from_jmespath, merge_dicts
from botocore.compat import six

from async_generator import async_generator, yield_

class AioPageIterator(PageIterator):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._init_pager()
# switch to aioitertools.tee after we're 3.6+
def aio_tee(itr, n: int = 2):
assert n > 0
sentinel = object()
queues = [asyncio.Queue() for _ in range(n)]

def __iter__(self):
raise NotImplementedError(
"{self} is an AsyncIterable: use `async for`".format(self=self)
)
@async_generator
async def gen(k: int, q: asyncio.Queue):
if k == 0:
async for value in itr:
await asyncio.gather(*[queue.put(value)
for queue in queues[1:]])
await yield_(value)

def _init_pager(self):
self._is_stop = False
self._current_kwargs = self._op_kwargs
self._previous_next_token = None
self._next_token = dict((key, None) for key in self._input_token)
await asyncio.gather(*[queue.put(sentinel)
for queue in queues[1:]])
return

if self._starting_token is not None:
self._next_token = self._parse_starting_token()[0]
while True:
value = await q.get()
if value is sentinel:
break

await yield_(value)

return tuple(gen(k, q) for k, q in enumerate(queues))

# The number of items from result_key we've seen so far.
self._total_items = 0
self._first_request = True
self._primary_result_key = self.result_keys[0]
self._starting_truncation = 0
self._inject_starting_params(self._current_kwargs)

class AioPageIterator(PageIterator):
async def next_page(self):
if self._is_stop:
return None
itr = getattr(self, '_iter', None)
if itr is None:
warnings.warn("next_page is deprecated, use async for instead",
DeprecationWarning)

response = await self._make_request(self._current_kwargs)
parsed = self._extract_parsed_response(response)
if self._first_request:
# The first request is handled differently. We could
# possibly have a resume/starting token that tells us where
# to index into the retrieved page.
if self._starting_token is not None:
self._starting_truncation = self._handle_first_request(
parsed, self._primary_result_key,
self._starting_truncation)
self._first_request = False
self._record_non_aggregate_key_values(parsed)
else:
# If this isn't the first request, we have already sliced into
# the first request and had to make additional requests after.
# We no longer need to add this to truncation.
self._starting_truncation = 0
current_response = self._primary_result_key.search(parsed)
if current_response is None:
current_response = []
num_current_response = len(current_response)
truncate_amount = 0
if self._max_items is not None:
truncate_amount = (self._total_items + num_current_response) \
- self._max_items

if truncate_amount > 0:
self._truncate_response(parsed, self._primary_result_key,
truncate_amount, self._starting_truncation,
self._next_token)
self._is_stop = True
return response
else:
self._total_items += num_current_response
self._next_token = self._get_next_token(parsed)
if all(t is None for t in self._next_token.values()):
self._is_stop = True
return response
if self._max_items is not None and \
self._total_items == self._max_items:
# We're on a page boundary so we can set the current
# next token to be the resume token.
self.resume_token = self._next_token
self._is_stop = True
return response
if self._previous_next_token is not None and \
self._previous_next_token == self._next_token:
message = ("The same next token was received "
"twice: %s" % self._next_token)
raise PaginationError(message=message)
self._inject_token_into_kwargs(self._current_kwargs,
self._next_token)
self._previous_next_token = self._next_token
return response
self._iter = self.__anext__()

try:
return await self._iter.__anext__()
except StopAsyncIteration:
self._iter = None
return None
except: # noqa: E722
self._iter = None
raise

def __aiter__(self):
return self
return self.__anext__()

@async_generator
async def __anext__(self):
if self._is_stop:
raise StopAsyncIteration # noqa
current_kwargs = self._op_kwargs
previous_next_token = None
next_token = dict((key, None) for key in self._input_token)
if self._starting_token is not None:
# If the starting token exists, populate the next_token with the
# values inside it. This ensures that we have the service's
# pagination token on hand if we need to truncate after the
# first response.
next_token = self._parse_starting_token()[0]
# The number of items from result_key we've seen so far.
total_items = 0
first_request = True
primary_result_key = self.result_keys[0]
starting_truncation = 0
self._inject_starting_params(current_kwargs)

return await self.next_page()
while True:
response = await self._make_request(current_kwargs)
parsed = self._extract_parsed_response(response)
if first_request:
# The first request is handled differently. We could
# possibly have a resume/starting token that tells us where
# to index into the retrieved page.
if self._starting_token is not None:
starting_truncation = self._handle_first_request(
parsed, primary_result_key, starting_truncation)
first_request = False
self._record_non_aggregate_key_values(parsed)
else:
# If this isn't the first request, we have already sliced into
# the first request and had to make additional requests after.
# We no longer need to add this to truncation.
starting_truncation = 0
current_response = primary_result_key.search(parsed)
if current_response is None:
current_response = []
num_current_response = len(current_response)
truncate_amount = 0
if self._max_items is not None:
truncate_amount = (total_items + num_current_response) \
- self._max_items

if truncate_amount > 0:
self._truncate_response(parsed, primary_result_key,
truncate_amount, starting_truncation,
next_token)
await yield_(response)
break
else:
await yield_(response)
total_items += num_current_response
next_token = self._get_next_token(parsed)
if all(t is None for t in next_token.values()):
break
if self._max_items is not None and \
total_items == self._max_items:
# We're on a page boundary so we can set the current
# next token to be the resume token.
self.resume_token = next_token
break
if previous_next_token is not None and \
previous_next_token == next_token:
message = ("The same next token was received "
"twice: %s" % next_token)
raise PaginationError(message=message)
self._inject_token_into_kwargs(current_kwargs, next_token)
previous_next_token = next_token

def result_key_iters(self):
raise NotImplementedError
# teed_results = tee(self, len(self.result_keys))
# return [ResultKeyIterator(i, result_key) for i, result_key in
# zip(teed_results, self.result_keys)]
teed_results = aio_tee(self, len(self.result_keys))
return [ResultKeyIterator(i, result_key) for i, result_key
in zip(teed_results, self.result_keys)]

async def build_full_result(self):
complete_result = {}
while True:
response = await self.next_page()
if response is None:
break
async for response in self:
page = response
# We want to try to catch operation object pagination
# and format correctly for those. They come in the form
Expand Down Expand Up @@ -142,7 +168,7 @@ async def build_full_result(self):
# Now both result_value and existing_value contain something
if isinstance(result_value, list):
existing_value.extend(result_value)
elif isinstance(result_value, (int, float, str)):
elif isinstance(result_value, (int, float, six.string_types)):
# Modify the existing result with the sum or concatenation
set_value_from_jmespath(
complete_result, result_expression.expression,
Expand All @@ -151,3 +177,38 @@ async def build_full_result(self):
if self.resume_token is not None:
complete_result['NextToken'] = self.resume_token
return complete_result


class AioPaginator(Paginator):
PAGE_ITERATOR_CLS = AioPageIterator


class ResultKeyIterator:
"""Iterates over the results of paginated responses.
Each iterator is associated with a single result key.
Iterating over this object will give you each element in
the result key list.
:param pages_iterator: An iterator that will give you
pages of results (a ``PageIterator`` class).
:param result_key: The JMESPath expression representing
the result key.
"""

def __init__(self, pages_iterator, result_key):
self._pages_iterator = pages_iterator
self.result_key = result_key

def __aiter__(self):
return self.__anext__()

@async_generator
async def __anext__(self):
async for page in self._pages_iterator:
results = self.result_key.search(page)
if results is None:
results = []
for result in results:
await yield_(result)

0 comments on commit 957c91b

Please sign in to comment.