Skip to content

Commit

Permalink
Packaging improvements / More producer control (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
borgstrom committed Dec 31, 2017
1 parent 9cc9dec commit 2ed1c1d
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 35 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -2,8 +2,10 @@

*.pyc

/build
/build/
/dist/

/.cache
/.coverage

*.egg-info/
29 changes: 29 additions & 0 deletions .travis.yml
@@ -0,0 +1,29 @@
language: python
sudo: false

branches:
only:
- master

python:
- 2.7
- 3.5
- 3.6
- pypy

install: false
script:
- pip install -e .
- pip install codecov pytest pytest-mock
- coverage run --source=src $(which py.test) test/
after_success:
- bash <(curl -s https://codecov.io/bash)

deploy:
provider: pypi
user: nerdwallet
password:
secure: "Sn+HB7i8sdissJIR5oVtjRG3ek3SkEtVjE/+qbksglUBhCXPPErxtmO5yjPHO0rsux0SE+XkOdTKbiI7Hg+NkK54Slv8GQl5tn9qqi7FY0IoElXqfh/yu6sLk9JZ5aQyhHT6LPsJM9SqP/AjG/+/u00maY0eyNSUzjiEC2ND6djNdQfuViOTX2Co9ojXFN41m4YT6hGmresqfdNK5huIN2nQ4xX9xysohlsCDgCUP4vJ4qF/+aB/ozmJCz1p0HnAprbNgIMIJD9QpAwP3Brikh6YDzLHR6/Wxr/7RtaYrw4BNtTJOe5vzzIoQXSCv89w3dmwcBIyPzlz5+VgequcwLV21+qEJpK/RQL8yszup3aY4VrECzNfQej78x9uBZ7m9XFO1Un9XULDGPFDur4RCIoFsn3I85LSRJIm1IBTNmAnfucRjWvO/EriArlgVQNj8eom0M/8nTPCmU7wU7C36OvCKDb5dSzLlkKYlp3DmfoY39Rjfofn/02XQT/z0GL8a4CkxiU1jnXI+uA+nG0P3b3TudGjai/FVa9qp0nZvgNEBW7syI2XQE8wfkIIN2TLJATzBD3oszeQU4J2SCN86YJVatnR1eQyKn42xfV7MbWf8Dr1A/SqVa8lDrUKaTP2hUzdK3956jfhUo6/pynK1NCMiyRJoJ5hgHVJT2m1sbw="
on:
branch: master
distributions: "sdist bdist_wheel"
1 change: 1 addition & 0 deletions MANIFEST.in
@@ -0,0 +1 @@
include VERSION
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.1.5
0.1.5
15 changes: 7 additions & 8 deletions requirements.txt
@@ -1,9 +1,8 @@
boto3==1.4.4
botocore==1.5.19 # via boto3, s3transfer
docutils==0.13.1 # via botocore
futures==3.0.5 # via s3transfer
jmespath==0.9.1 # via boto3, botocore
boto3==1.4.8
botocore==1.8.9 # via boto3, s3transfer
docutils==0.14 # via botocore
jmespath==0.9.3 # via boto3, botocore
offspring==0.0.3
python-dateutil==2.6.0 # via botocore
s3transfer==0.1.10 # via boto3
six==1.10.0 # via python-dateutil
python-dateutil==2.6.1 # via botocore
s3transfer==0.1.12 # via boto3
six==1.11.0
3 changes: 3 additions & 0 deletions setup.cfg
@@ -0,0 +1,3 @@
[flake8]
exclude=./venv/*,./build/*,./configs/*,./logs/*,*.egg-info/*
max_line_length=120
28 changes: 21 additions & 7 deletions setup.py
Expand Up @@ -3,19 +3,33 @@
with open('VERSION') as version_fd:
version = version_fd.read().strip()

install_requires = [
'boto3>=1.4.4,<2.0',
'offspring>=0.0.3,<0.9',
]
with open('README.md', 'r') as readme_fd:
long_description = readme_fd.read()

setup(
name='kinesis-python',
version=version,
install_requires=install_requires,
description='Low level, multiprocessing based AWS Kinesis producer & consumer library',
long_description=long_description,
url='https://github.com/NerdWalletOSS/kinesis-python',

install_requires=[
'boto3>=1.4.4,<2.0',
'offspring>=0.0.3,<1.0',
'six>=1.11.0,<2.0',
],
packages=find_packages('src'),
package_dir={'': 'src'},
author='Evan Borgstrom',
author_email='eborgstrom@nerdwallet.com',
license='Apache 2',
description='Low level, multiprocessing based AWS Kinesis producer & consumer library'
license='Apache License Version 2.0',
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Natural Language :: English',
'Programming Language :: Python',
'Topic :: Internet',
'Topic :: Software Development :: Libraries'
]
)
75 changes: 58 additions & 17 deletions src/kinesis/producer.py
@@ -1,3 +1,4 @@
import collections
import logging
import multiprocessing
try:
Expand All @@ -9,31 +10,68 @@
import time

import boto3
import six

from offspring.process import SubprocessLoop

log = logging.getLogger(__name__)


def sizeof(obj, seen=None):
"""Recursively and fully calculate the size of an object"""
obj_id = id(obj)
try:
if obj_id in seen:
return 0
except TypeError:
seen = set()

seen.add(obj_id)

size = sys.getsizeof(obj)

# since strings are containers we return their size before we check for a container
if isinstance(obj, six.string_types):
return size

if isinstance(obj, collections.Container):
return size + sum(
sizeof(item, seen)
for item in obj
)

if isinstance(obj, collections.Mapping):
return size + sum(
sizeof(key, seen) + sizeof(val, seen)
for key, val in six.iteritems(obj)
)

return size


class AsyncProducer(SubprocessLoop):
"""Async accumulator and producer based on a multiprocessing Queue"""
# Tell our subprocess loop that we don't want to terminate on shutdown since we want to drain our queue first
TERMINATE_ON_SHUTDOWN = False

# This it the max size data that we'll send in a single call. We use 99% of 1Mb to account for the extra overhead
# of JSON syntax that is not taken into account when we use sys.getsizeof since it's measuring the python object
MAX_SIZE = int((2 ** 20) * .99)

# This is the max number of messages that we'll send in a single call.
# Max size & count
# Per: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
#
# * The maximum size of a data blob (the data payload before base64-encoding) is up to 1 MB.
# * Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB
# per second (including partition keys).
MAX_SIZE = (2 ** 20)
MAX_COUNT = 1000

def __init__(self, stream_name, buffer_time, queue, boto3_session=None):
def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None):
self.stream_name = stream_name
self.buffer_time = buffer_time
self.queue = queue
self.records = []
self.next_records = []
self.alive = True
self.max_count = max_count or self.MAX_COUNT
self.max_size = max_size or self.MAX_SIZE

if boto3_session is None:
boto3_session = boto3.Session()
Expand All @@ -52,27 +90,29 @@ def loop(self):
queue_timeout = self.buffer_time - (time.time() - timer_start)
try:
log.debug("Fetching from queue with timeout: %s", queue_timeout)
data = self.queue.get(block=True, timeout=queue_timeout)
data, explicit_hash_key, partition_key = self.queue.get(block=True, timeout=queue_timeout)
except Queue.Empty:
continue

record = {
'Data': data,
'PartitionKey': '{0}{1}'.format(time.clock(), time.time()),
'PartitionKey': partition_key or '{0}{1}'.format(time.clock(), time.time()),
}
if explicit_hash_key is not None:
record['ExplicitHashKey'] = explicit_hash_key

records_size += sys.getsizeof(record)
if records_size >= self.MAX_SIZE:
log.debug("Records exceed MAX_SIZE! Adding to next_records: %s", record)
records_size += sizeof(record)
if records_size >= self.max_size:
log.debug("Records exceed MAX_SIZE (%s)! Adding to next_records: %s", self.max_size, record)
self.next_records = [record]
break

log.debug("Adding to records (%d bytes): %s", records_size, record)
self.records.append(record)

records_count += 1
if records_count == self.MAX_COUNT:
log.debug("Records have reached MAX_COUNT! Flushing records.")
if records_count == self.max_count:
log.debug("Records have reached MAX_COUNT (%s)! Flushing records.", self.max_count)
break

self.flush_records()
Expand All @@ -97,9 +137,10 @@ def flush_records(self):

class KinesisProducer(object):
"""Produce to Kinesis streams via an AsyncProducer"""
def __init__(self, stream_name, buffer_time=1.0, boto3_session=None):
def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None):
self.queue = multiprocessing.Queue()
self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, boto3_session=boto3_session)
self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, max_count=max_count,
max_size=max_size, boto3_session=boto3_session)

def put(self, data):
self.queue.put(data)
def put(self, data, explicit_hash_key=None, partition_key=None):
self.queue.put((data, explicit_hash_key, partition_key))
File renamed without changes.
2 changes: 1 addition & 1 deletion test/test-producer.py → test-producer.py
Expand Up @@ -7,5 +7,5 @@
logging.getLogger('botocore.vendored.requests.packages.urllib3').level = logging.WARN
producer = KinesisProducer('borgstrom-test')

for idx in xrange(1):
for idx in xrange(100):
producer.put(str(idx))
46 changes: 46 additions & 0 deletions test/test_consumer.py
@@ -0,0 +1,46 @@
from kinesis.consumer import KinesisConsumer

try:
from unittest.mock import MagicMock
except ImportError:
from mock import MagicMock


def test_setup_shards(mocker):
mock_boto3_session = MagicMock()
mock_shard_reader = mocker.patch('kinesis.consumer.ShardReader')

consumer = KinesisConsumer('testing', boto3_session=mock_boto3_session)

mock_boto3_session.client.assert_called_with('kinesis')

consumer.kinesis_client.describe_stream.return_value = {
'StreamDescription': {
'Shards': [
{
'ShardId': 'test-shard',
}
]
}
}
consumer.kinesis_client.get_shard_iterator.return_value = {
'ShardIterator': 'test-iter'
}

consumer.setup_shards()

consumer.kinesis_client.describe_stream.assert_called_with(StreamName='testing')
consumer.kinesis_client.get_shard_iterator.assert_called_with(
StreamName='testing',
ShardId='test-shard',
ShardIteratorType='LATEST'
)

mock_shard_reader.assert_called_with(
'test-shard',
'test-iter',
consumer.record_queue,
consumer.error_queue,
boto3_session=consumer.boto3_session,
sleep_time=consumer.reader_sleep_time
)
18 changes: 18 additions & 0 deletions test/test_producer.py
@@ -0,0 +1,18 @@
from kinesis.producer import KinesisProducer


def test_producer(mocker):
mocked_async_producer = mocker.patch('kinesis.producer.AsyncProducer')
producer = KinesisProducer('testing')
mocked_async_producer.assert_called_with(
'testing',
0.5,
producer.queue,
max_count=None,
max_size=None,
boto3_session=None
)

mocked_queue = mocker.patch.object(producer, 'queue')
producer.put('foo', explicit_hash_key='hash', partition_key='partition')
mocked_queue.put.assert_called_with(('foo', 'hash', 'partition'))

0 comments on commit 2ed1c1d

Please sign in to comment.