Skip to content

Commit

Permalink
Event streams (#747)
Browse files Browse the repository at this point in the history
* Add async-for for event streams

Will fix terricain/aioboto3#178

* Simplified AioEventStream with async_generator
  • Loading branch information
Terry Cain authored and thehesiod committed Jan 4, 2020
1 parent 65320c3 commit a8127f8
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changes
-------

0.11.1 (TBD)
^^^^^^^^^^^^
* Fixed event streaming API calls like S3 Select.

0.11.0 (2019-11-12)
^^^^^^^^^^^^^^^^^^^
* replace CaseInsensitiveDict with urllib3 equivalent #744
Expand Down
25 changes: 25 additions & 0 deletions aiobotocore/eventstream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from botocore.eventstream import EventStream, EventStreamBuffer
from async_generator import async_generator, yield_


class AioEventStream(EventStream):
@async_generator
async def _create_raw_event_generator(self):
event_stream_buffer = EventStreamBuffer()
async for chunk, _ in self._raw_stream.iter_chunks():
event_stream_buffer.add_data(chunk)
for event in event_stream_buffer:
await yield_(event)

def __iter__(self):
raise NotImplementedError('Use async-for instead')

def __aiter__(self):
return self.__anext__()

@async_generator
async def __anext__(self):
async for event in self._event_generator:
parsed_event = self._parse_event(event)
if parsed_event:
await yield_(parsed_event)
53 changes: 53 additions & 0 deletions aiobotocore/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from botocore.parsers import ResponseParserFactory, RestXMLParser, \
RestJSONParser, JSONParser, QueryParser, EC2QueryParser
from .eventstream import AioEventStream


class AioRestXMLParser(RestXMLParser):
def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)


class AioEC2QueryParser(EC2QueryParser):
def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)


class AioQueryParser(QueryParser):
def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)


class AioJSONParser(JSONParser):
def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)


class AioRestJSONParser(RestJSONParser):
def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)


PROTOCOL_PARSERS = {
'ec2': AioEC2QueryParser,
'query': AioQueryParser,
'json': AioJSONParser,
'rest-json': AioRestJSONParser,
'rest-xml': AioRestXMLParser,
}


class AioResponseParserFactory(ResponseParserFactory):
def create_parser(self, protocol_name):
parser_cls = PROTOCOL_PARSERS[protocol_name]
return parser_cls(**self._defaults)
4 changes: 4 additions & 0 deletions aiobotocore/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from botocore import retryhandler, translate
from botocore.exceptions import PartialCredentialsError
from .client import AioClientCreator
from .parsers import AioResponseParserFactory


class AioSession(botocore.session.Session):
Expand All @@ -15,6 +16,9 @@ def __init__(self, *args, loop=None, **kwargs):

super().__init__(*args, **kwargs)

# Register the AioResponseParserFactory so event streams will be async'd
self.register_component('response_parser_factory', AioResponseParserFactory())

def create_client(self, service_name, region_name=None, api_version=None,
use_ssl=True, verify=None, endpoint_url=None,
aws_access_key_id=None, aws_secret_access_key=None,
Expand Down
99 changes: 99 additions & 0 deletions tests/test_eventstreams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import pytest
# TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then
# this can be tested against a real AWS API

import botocore.parsers
from aiobotocore.eventstream import AioEventStream

TEST_STREAM_DATA = (
b'\x00\x00\x00w\x00\x00\x00U5\xd1F\xcd\r:message-type\x07\x00\x05event\x0b:event-'
b'type\x07\x00\x07Records\r:content-type\x07\x00\x18application/octet-stream{"hel'
b'lo":"world"}\nF\x0e\x9a2',

b'\x00\x00\x00\xce\x00\x00\x00C\xdc\xd2\x99\xf9\r:message-type\x07\x00\x05event'
b'\x0b:event-type\x07\x00\x05Stats\r:content-type\x07\x00\x08text/xml<Stats xml'
b'ns=""><BytesScanned>19</BytesScanned><BytesProcessed>19</BytesProcessed><Byte'
b'sReturned>18</BytesReturned></Stats>\x92\xd0?\xa5\x00\x00\x008\x00\x00\x00(\xc1'
b'\xc6\x84\xd4\r:message-type\x07\x00\x05event\x0b:event-type\x07\x00\x03End\xcf'
b'\x97\xd3\x92'
)


class FakeStreamReader(object):
class ChunkedIterator(object):
def __init__(self, chunks):
self.iter = iter(chunks)

def __aiter__(self):
return self

async def __anext__(self):
try:
result = next(self.iter)
return result, True
except StopIteration:
raise StopAsyncIteration()

def __init__(self, chunks):
self.chunks = chunks

def iter_chunks(self):
return self.ChunkedIterator(self.chunks)


@pytest.mark.moto
@pytest.mark.asyncio
async def test_eventstream_chunking(s3_client):
# These are the options passed to the EventStream class
# during a normal run with botocore.
operation_name = 'SelectObjectContent'
outputshape = (s3_client._service_model.operation_model(operation_name)
.output_shape.members['Payload'])
parser = botocore.parsers.EventStreamXMLParser()
sr = FakeStreamReader(TEST_STREAM_DATA)

event_stream = AioEventStream(
sr,
outputshape,
parser,
operation_name
)

events = []
# {'Records': {'Payload': b'{"hello":"world"}\n'}}
# {'Stats': {'Details': {'BytesScanned': 19,
# 'BytesProcessed': 19,
# 'BytesReturned': 18}}}
# {'End': {}}
async for event in event_stream:
events.append(event)

assert len(events) == 3
event1, event2, event3 = events

assert 'Records' in event1
assert 'Stats' in event2
assert 'End' in event3


@pytest.mark.moto
@pytest.mark.asyncio
async def test_eventstream_no_iter(s3_client):
# These are the options passed to the EventStream class
# during a normal run with botocore.
operation_name = 'SelectObjectContent'
outputshape = (s3_client._service_model.operation_model(operation_name)
.output_shape.members['Payload'])
parser = botocore.parsers.EventStreamXMLParser()
sr = FakeStreamReader(TEST_STREAM_DATA)

event_stream = AioEventStream(
sr,
outputshape,
parser,
operation_name
)

with pytest.raises(NotImplementedError):
for _ in event_stream:
print('fail')
16 changes: 16 additions & 0 deletions tests/test_patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from botocore.paginate import PageIterator
from botocore.session import Session, get_session
from botocore.waiter import NormalizedOperationMethod
from botocore.eventstream import EventStream
from botocore.parsers import ResponseParserFactory, PROTOCOL_PARSERS, \
ResponseParser


# This file ensures that our private patches will work going forward. If a
Expand Down Expand Up @@ -62,8 +65,21 @@
Session: {'16b4a08b3b5792d5d9c639b7a07d01902205b238'},
get_session: {'c47d588f5da9b8bde81ccc26eaef3aee19ddd901'},
NormalizedOperationMethod: {'ee88834b123c6c77dfea0b4208308cd507a6ba36'},
EventStream: {'0e68633755a7dd4ff79c6d7ca778800a7bc86d3b'},
ResponseParserFactory: {'db484fd7e743611b9657c8f1acc84e76597e96b7'},
ResponseParser: {'d16826f7e815a62d7a5ca0d2ca5d936c64e0da88'},

}

_PROTOCOL_PARSER_CONTENT = {'ec2', 'query', 'json', 'rest-json', 'rest-xml'}


@pytest.mark.moto
def test_protocol_parsers():
# Check that no new parsers have been added
current_parsers = set(PROTOCOL_PARSERS.keys())
assert current_parsers == _PROTOCOL_PARSER_CONTENT


# NOTE: this doesn't require moto but needs to be marked to run with coverage
@pytest.mark.moto
Expand Down

0 comments on commit a8127f8

Please sign in to comment.