Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
docstrings in utils
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Apr 24, 2015
1 parent fcf7322 commit da449ee
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 53 deletions.
5 changes: 5 additions & 0 deletions doc/utils/compression.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.utils.compression
=========================

.. automodule:: pykafka.utils.compression
:members:
5 changes: 5 additions & 0 deletions doc/utils/error_handlers.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.utils.error_handlers
============================

.. automodule:: pykafka.utils.error_handlers
:members:
5 changes: 5 additions & 0 deletions doc/utils/functional.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.utils.functional
========================

.. automodule:: pykafka.utils.functional
:members:
5 changes: 5 additions & 0 deletions doc/utils/socket.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.utils.socket
====================

.. automodule:: pykafka.utils.socket
:members:
5 changes: 5 additions & 0 deletions doc/utils/struct_helpers.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.utils.struct_helpers
============================

.. automodule:: pykafka.utils.struct_helpers
:members:
11 changes: 8 additions & 3 deletions pykafka/utils/compression.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""
Author: Keith Bourgoin
"""
__license__ = """
Copyright 2014 Parse.ly, Inc.
Copyright 2015 Parse.ly, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,22 +41,24 @@ def encode_gzip(buff):
sio.close()
return output


def decode_gzip(buff):
"""Decode a gzip buffer"""
"""Decode a buffer using gzip"""
sio = StringIO(buff)
f = gzip.GzipFile(fileobj=sio, mode='r')
output = f.read()
f.close()
sio.close()
return output


def encode_snappy(buff):
"""Encode a buffer using Snappy"""
if snappy is None:
raise ImportError("Please install python-snappy")
output = snappy.compress(buff)
return snappy.compress(buff)


def decode_snappy(buff):
"""Decode a buffer using Snappy"""
if snappy is None:
Expand Down
26 changes: 23 additions & 3 deletions pykafka/utils/error_handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
"""
Author: Emmett Butler
"""
__license__ = """
Copyright 2015 Parse.ly, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import defaultdict


Expand All @@ -8,14 +26,15 @@ def handle_partition_responses(response,
"""Call the appropriate handler for each errored partition
:param response: a Response object containing partition responses
:type response: pykafka.protocol.Response
:type response: :class:`pykafka.protocol.Response`
:param success_handler: function to call for successful partitions
:type success_handler: callable(parts)
:type success_handler: callable accepting an iterable of partition responses
:param error_handlers: mapping of error code to handler
:type error_handlers: dict {int: callable(parts)}
:param partitions_by_id: a dict mapping partition ids to OwnedPartition
instances
:type partitions_by_id: dict {int: pykafka.simpleconsumer.OwnedPartition}
:type partitions_by_id: dict
{int: :class:`pykafka.simpleconsumer.OwnedPartition`}
"""
error_handlers = error_handlers.copy()
if success_handler is not None:
Expand All @@ -38,4 +57,5 @@ def handle_partition_responses(response,


def raise_error(error, info=""):
"""Raise the given error"""
raise error(info)
11 changes: 8 additions & 3 deletions pykafka/utils/functional.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""
Author: Keith Bourgoin, Emmett Butler
This module is used only for testing.
"""
__license__ = """
Copyright 2012 DISQUS
Copyright 2015 Parse.ly, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,7 +25,7 @@

def methodmap(name, values, *args, **kwargs):
"""
Maps over all members in ``values``, invoking the method named ``name``.
Maps over all members in `values`, invoking the method named `name`.
Usage:
>>> methodmap('strip', ['hello ', 'world '])
Expand All @@ -32,7 +37,7 @@ def methodmap(name, values, *args, **kwargs):

def methodimap(name, values, *args, **kwargs):
"""
Iterator-based implementation of :func:``.methodmap``.
Iterator-based implementation of :func:`.methodmap`.
"""
fn = methodcaller(name, *args, **kwargs)
return imap(fn, values)
22 changes: 0 additions & 22 deletions pykafka/utils/log.py

This file was deleted.

9 changes: 6 additions & 3 deletions pykafka/utils/socket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""
Author: Keith Bourgoin, Emmett Butler
"""
__license__ = """
Copyright 2012 DISQUS
Copyright 2015 Parse.ly, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,13 +25,13 @@ def recvall_into(socket, bytea, size):
Reads `size` bytes from the socket into the provided bytearray (modifies
in-place.)
This is basically a hack around the fact that ``socket.recv_into`` doesn't
This is basically a hack around the fact that `socket.recv_into` doesn't
allow buffer offsets.
:type socket: :class:`socket.Socket`
:type bytea: ``bytearray``
:type size: int
:rtype: ``bytearray``
:rtype: `bytearray`
"""
offset = 0
if size > len(bytea):
Expand Down
63 changes: 44 additions & 19 deletions pykafka/utils/struct_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""
Author: Keith Bourgoin, Emmett Butler
"""
__license__ = """
Copyright 2014 Parse.ly, Inc.
Copyright 2015 Parse.ly, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,30 +20,30 @@
import itertools
import struct


def unpack_from(fmt, buff, offset=0):
"""A customized version of ``struct.unpack_from``
"""A customized version of `struct.unpack_from`
This is a conveinence function that makes decoding the arrays,
strings, and byte arrays that we get from Kafka significantly
easier. It takes the same arguments as ``struct.unpack_from`` but
easier. It takes the same arguments as `struct.unpack_from` but
adds 3 new formats:
* Wrap a section in `[]` to indicate an array. e.g.: `[ii]`
* `S` for strings (int16 followed by byte array)
* `Y` for byte arrays (int32 followed by byte array)
Spacees are ignored in the format string, allowing more readable formats
Spaces are ignored in the format string, allowing more readable formats
NOTE: This may be a performance bottleneck. We're avoiding a lot of memory
allocations by using the same buffer, but if we could call
``struct.unpack_from`` only once, that's about an order of magnitude
`struct.unpack_from` only once, that's about an order of magnitude
faster. However, constructing the format string to do so would erase
any gains we got from having the single call. Ultimately, this will
be a good candidate for rewriting in C.
any gains we got from having the single call.
"""
fmt = fmt.replace(' ', '')
if fmt[0] in '!><':
fmt = fmt[1:] # It's always network ordering
fmt = fmt[1:] # It's always network ordering

output = _unpack(fmt, buff, offset, 1)[0]

Expand All @@ -50,25 +53,36 @@ def unpack_from(fmt, buff, offset=0):

return output


def _unpack(fmt, buff, offset, count=1):
"""Recursive call for unpacking"""
"""Recursive call for unpacking
:param fmt: The struct format string
:type fmt: str
:param buff: The buffer into which to unpack
:type buff: buffer
:param offset: The offset at which to start unpacking
:type offset: int
:param count: The number of items in the array
:type count: int
"""
items = []
array_fmt = None
for i,ch in enumerate(fmt):
for i, ch in enumerate(fmt):
if array_fmt is not None:
if ch == ']':
if array_fmt.count('[') == array_fmt.count(']'):
# array format done, call _unpack for this format string
count = struct.unpack_from('!i', buff, offset)[0]
array_item,offset = _unpack_array(array_fmt, buff,
offset+4, count)
array_item, offset = _unpack_array(array_fmt, buff,
offset + 4, count)
items.append(array_item)
array_fmt = None
continue # done with this
continue # done with this
# not done yet, append to ongoing format
array_fmt += ch
elif ch == '[':
array_fmt = '' # starts building string for array unpack
array_fmt = '' # starts building string for array unpack
else:
if ch in 'SY':
len_fmt = '!h' if ch == 'S' else '!i'
Expand All @@ -78,16 +92,27 @@ def _unpack(fmt, buff, offset, count=1):
items.append(None)
continue
ch = '%ds' % len_
items.extend(struct.unpack_from('!'+ch, buff, offset))
items.extend(struct.unpack_from('!' + ch, buff, offset))
offset += struct.calcsize(ch)
return tuple(items),offset
return tuple(items), offset


def _unpack_array(fmt, buff, offset, count):
"""Unpack an array of items"""
"""Unpack an array of items.
:param fmt: The struct format string
:type fmt: str
:param buff: The buffer into which to unpack
:type buff: buffer
:param offset: The offset at which to start unpacking
:type offset: int
:param count: The number of items in the array
:type count: int
"""
output = []
for i in xrange(count):
item,offset = _unpack(fmt, buff, offset)
item, offset = _unpack(fmt, buff, offset)
output.append(item)
if len(fmt) == 1:
output = list(itertools.chain.from_iterable(output))
return output,offset
return output, offset

0 comments on commit da449ee

Please sign in to comment.