diff --git a/.gitignore b/.gitignore index 7dc3380..c97eebf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,8 +2,10 @@ *.pyc -/build +/build/ +/dist/ /.cache +/.coverage *.egg-info/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e3b9a0d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,29 @@ +language: python +sudo: false + +branches: + only: + - master + +python: + - 2.7 + - 3.5 + - 3.6 + - pypy + +install: false +script: + - pip install -e . + - pip install codecov pytest pytest-mock + - coverage run --source=src $(which py.test) test/ +after_success: + - bash <(curl -s https://codecov.io/bash) + +deploy: + provider: pypi + user: nerdwallet + password: + secure: "Sn+HB7i8sdissJIR5oVtjRG3ek3SkEtVjE/+qbksglUBhCXPPErxtmO5yjPHO0rsux0SE+XkOdTKbiI7Hg+NkK54Slv8GQl5tn9qqi7FY0IoElXqfh/yu6sLk9JZ5aQyhHT6LPsJM9SqP/AjG/+/u00maY0eyNSUzjiEC2ND6djNdQfuViOTX2Co9ojXFN41m4YT6hGmresqfdNK5huIN2nQ4xX9xysohlsCDgCUP4vJ4qF/+aB/ozmJCz1p0HnAprbNgIMIJD9QpAwP3Brikh6YDzLHR6/Wxr/7RtaYrw4BNtTJOe5vzzIoQXSCv89w3dmwcBIyPzlz5+VgequcwLV21+qEJpK/RQL8yszup3aY4VrECzNfQej78x9uBZ7m9XFO1Un9XULDGPFDur4RCIoFsn3I85LSRJIm1IBTNmAnfucRjWvO/EriArlgVQNj8eom0M/8nTPCmU7wU7C36OvCKDb5dSzLlkKYlp3DmfoY39Rjfofn/02XQT/z0GL8a4CkxiU1jnXI+uA+nG0P3b3TudGjai/FVa9qp0nZvgNEBW7syI2XQE8wfkIIN2TLJATzBD3oszeQU4J2SCN86YJVatnR1eQyKn42xfV7MbWf8Dr1A/SqVa8lDrUKaTP2hUzdK3956jfhUo6/pynK1NCMiyRJoJ5hgHVJT2m1sbw=" + on: + branch: master + distributions: "sdist bdist_wheel" diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..ceeea23 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include VERSION diff --git a/VERSION b/VERSION index def9a01..9faa1b7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.5 \ No newline at end of file +0.1.5 diff --git a/requirements.txt b/requirements.txt index 68898ef..2973c3a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,8 @@ -boto3==1.4.4 -botocore==1.5.19 # via boto3, s3transfer -docutils==0.13.1 # via botocore -futures==3.0.5 # via s3transfer -jmespath==0.9.1 # via boto3, botocore +boto3==1.4.8 +botocore==1.8.9 # via boto3, s3transfer +docutils==0.14 # via botocore +jmespath==0.9.3 # via boto3, botocore offspring==0.0.3 -python-dateutil==2.6.0 # via botocore -s3transfer==0.1.10 # via boto3 -six==1.10.0 # via python-dateutil +python-dateutil==2.6.1 # via botocore +s3transfer==0.1.12 # via boto3 +six==1.11.0 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..29dfbe7 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[flake8] +exclude=./venv/*,./build/*,./configs/*,./logs/*,*.egg-info/* +max_line_length=120 diff --git a/setup.py b/setup.py index fe2e15f..218666b 100644 --- a/setup.py +++ b/setup.py @@ -3,19 +3,33 @@ with open('VERSION') as version_fd: version = version_fd.read().strip() -install_requires = [ - 'boto3>=1.4.4,<2.0', - 'offspring>=0.0.3,<0.9', -] +with open('README.md', 'r') as readme_fd: + long_description = readme_fd.read() setup( name='kinesis-python', version=version, - install_requires=install_requires, + description='Low level, multiprocessing based AWS Kinesis producer & consumer library', + long_description=long_description, + url='https://github.com/NerdWalletOSS/kinesis-python', + + install_requires=[ + 'boto3>=1.4.4,<2.0', + 'offspring>=0.0.3,<1.0', + 'six>=1.11.0,<2.0', + ], packages=find_packages('src'), package_dir={'': 'src'}, author='Evan Borgstrom', author_email='eborgstrom@nerdwallet.com', - license='Apache 2', - description='Low level, multiprocessing based AWS Kinesis producer & consumer library' + license='Apache License Version 2.0', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: Apache Software License', + 'Natural Language :: English', + 'Programming Language :: Python', + 'Topic :: Internet', + 'Topic :: Software Development :: Libraries' + ] ) diff --git a/src/kinesis/producer.py b/src/kinesis/producer.py index 1bfc911..8db0d13 100644 --- a/src/kinesis/producer.py +++ b/src/kinesis/producer.py @@ -1,3 +1,4 @@ +import collections import logging import multiprocessing try: @@ -9,31 +10,68 @@ import time import boto3 +import six from offspring.process import SubprocessLoop log = logging.getLogger(__name__) +def sizeof(obj, seen=None): + """Recursively and fully calculate the size of an object""" + obj_id = id(obj) + try: + if obj_id in seen: + return 0 + except TypeError: + seen = set() + + seen.add(obj_id) + + size = sys.getsizeof(obj) + + # since strings are containers we return their size before we check for a container + if isinstance(obj, six.string_types): + return size + + if isinstance(obj, collections.Container): + return size + sum( + sizeof(item, seen) + for item in obj + ) + + if isinstance(obj, collections.Mapping): + return size + sum( + sizeof(key, seen) + sizeof(val, seen) + for key, val in six.iteritems(obj) + ) + + return size + + class AsyncProducer(SubprocessLoop): """Async accumulator and producer based on a multiprocessing Queue""" # Tell our subprocess loop that we don't want to terminate on shutdown since we want to drain our queue first TERMINATE_ON_SHUTDOWN = False - # This it the max size data that we'll send in a single call. We use 99% of 1Mb to account for the extra overhead - # of JSON syntax that is not taken into account when we use sys.getsizeof since it's measuring the python object - MAX_SIZE = int((2 ** 20) * .99) - - # This is the max number of messages that we'll send in a single call. + # Max size & count + # Per: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + # + # * The maximum size of a data blob (the data payload before base64-encoding) is up to 1 MB. + # * Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB + # per second (including partition keys). + MAX_SIZE = (2 ** 20) MAX_COUNT = 1000 - def __init__(self, stream_name, buffer_time, queue, boto3_session=None): + def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None): self.stream_name = stream_name self.buffer_time = buffer_time self.queue = queue self.records = [] self.next_records = [] self.alive = True + self.max_count = max_count or self.MAX_COUNT + self.max_size = max_size or self.MAX_SIZE if boto3_session is None: boto3_session = boto3.Session() @@ -52,18 +90,20 @@ def loop(self): queue_timeout = self.buffer_time - (time.time() - timer_start) try: log.debug("Fetching from queue with timeout: %s", queue_timeout) - data = self.queue.get(block=True, timeout=queue_timeout) + data, explicit_hash_key, partition_key = self.queue.get(block=True, timeout=queue_timeout) except Queue.Empty: continue record = { 'Data': data, - 'PartitionKey': '{0}{1}'.format(time.clock(), time.time()), + 'PartitionKey': partition_key or '{0}{1}'.format(time.clock(), time.time()), } + if explicit_hash_key is not None: + record['ExplicitHashKey'] = explicit_hash_key - records_size += sys.getsizeof(record) - if records_size >= self.MAX_SIZE: - log.debug("Records exceed MAX_SIZE! Adding to next_records: %s", record) + records_size += sizeof(record) + if records_size >= self.max_size: + log.debug("Records exceed MAX_SIZE (%s)! Adding to next_records: %s", self.max_size, record) self.next_records = [record] break @@ -71,8 +111,8 @@ def loop(self): self.records.append(record) records_count += 1 - if records_count == self.MAX_COUNT: - log.debug("Records have reached MAX_COUNT! Flushing records.") + if records_count == self.max_count: + log.debug("Records have reached MAX_COUNT (%s)! Flushing records.", self.max_count) break self.flush_records() @@ -97,9 +137,10 @@ def flush_records(self): class KinesisProducer(object): """Produce to Kinesis streams via an AsyncProducer""" - def __init__(self, stream_name, buffer_time=1.0, boto3_session=None): + def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None): self.queue = multiprocessing.Queue() - self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, boto3_session=boto3_session) + self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, max_count=max_count, + max_size=max_size, boto3_session=boto3_session) - def put(self, data): - self.queue.put(data) + def put(self, data, explicit_hash_key=None, partition_key=None): + self.queue.put((data, explicit_hash_key, partition_key)) diff --git a/test/test-consumer.py b/test-consumer.py similarity index 100% rename from test/test-consumer.py rename to test-consumer.py diff --git a/test/test-producer.py b/test-producer.py similarity index 94% rename from test/test-producer.py rename to test-producer.py index c1622d7..715b979 100644 --- a/test/test-producer.py +++ b/test-producer.py @@ -7,5 +7,5 @@ logging.getLogger('botocore.vendored.requests.packages.urllib3').level = logging.WARN producer = KinesisProducer('borgstrom-test') -for idx in xrange(1): +for idx in xrange(100): producer.put(str(idx)) diff --git a/test/test_consumer.py b/test/test_consumer.py new file mode 100644 index 0000000..e160cc4 --- /dev/null +++ b/test/test_consumer.py @@ -0,0 +1,46 @@ +from kinesis.consumer import KinesisConsumer + +try: + from unittest.mock import MagicMock +except ImportError: + from mock import MagicMock + + +def test_setup_shards(mocker): + mock_boto3_session = MagicMock() + mock_shard_reader = mocker.patch('kinesis.consumer.ShardReader') + + consumer = KinesisConsumer('testing', boto3_session=mock_boto3_session) + + mock_boto3_session.client.assert_called_with('kinesis') + + consumer.kinesis_client.describe_stream.return_value = { + 'StreamDescription': { + 'Shards': [ + { + 'ShardId': 'test-shard', + } + ] + } + } + consumer.kinesis_client.get_shard_iterator.return_value = { + 'ShardIterator': 'test-iter' + } + + consumer.setup_shards() + + consumer.kinesis_client.describe_stream.assert_called_with(StreamName='testing') + consumer.kinesis_client.get_shard_iterator.assert_called_with( + StreamName='testing', + ShardId='test-shard', + ShardIteratorType='LATEST' + ) + + mock_shard_reader.assert_called_with( + 'test-shard', + 'test-iter', + consumer.record_queue, + consumer.error_queue, + boto3_session=consumer.boto3_session, + sleep_time=consumer.reader_sleep_time + ) diff --git a/test/test_producer.py b/test/test_producer.py new file mode 100644 index 0000000..6a5adaf --- /dev/null +++ b/test/test_producer.py @@ -0,0 +1,18 @@ +from kinesis.producer import KinesisProducer + + +def test_producer(mocker): + mocked_async_producer = mocker.patch('kinesis.producer.AsyncProducer') + producer = KinesisProducer('testing') + mocked_async_producer.assert_called_with( + 'testing', + 0.5, + producer.queue, + max_count=None, + max_size=None, + boto3_session=None + ) + + mocked_queue = mocker.patch.object(producer, 'queue') + producer.put('foo', explicit_hash_key='hash', partition_key='partition') + mocked_queue.put.assert_called_with(('foo', 'hash', 'partition'))