Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #751 from Parsely/enhancement/producer_speed
Browse files Browse the repository at this point in the history
[WIP] Producer speed enhancements
  • Loading branch information
Emmett J. Butler committed Dec 7, 2017
2 parents fb5bb06 + 7e0e7e3 commit b1d958b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 16 deletions.
21 changes: 15 additions & 6 deletions pykafka/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
__all__ = ["random_partitioner", "BasePartitioner", "HashingPartitioner",
__all__ = ["RandomPartitioner", "BasePartitioner", "HashingPartitioner",
"hashing_partitioner", "GroupHashingPartitioner"]
import random

from hashlib import sha1


def random_partitioner(partitions, key):
"""Returns a random partition out of all of the available partitions."""
return random.choice(partitions)


class BasePartitioner(object):
"""Base class for custom class-based partitioners.
Expand All @@ -39,6 +34,20 @@ def __call__(self, partitions, key=None):
' partitioner implementation')


class RandomPartitioner(BasePartitioner):
"""Returns a random partition out of all of the available partitions.
Uses a non-random incrementing counter to provide even distribution across partitions
without wasting CPU cycles
"""
def __init__(self):
self.idx = 0

def __call__(self, partitions, key):
self.idx = (self.idx + 1) % len(partitions)
return partitions[self.idx]


class HashingPartitioner(BasePartitioner):
"""
Returns a (relatively) consistent partition out of all available partitions
Expand Down
12 changes: 6 additions & 6 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
ProducerStoppedException,
SocketDisconnectedError,
)
from .partitioners import random_partitioner
from .partitioners import RandomPartitioner
from .protocol import Message, ProduceRequest
from .utils.compat import iteritems, itervalues, Empty
from .utils.error_handlers import valid_int
Expand All @@ -62,7 +62,7 @@ class Producer(object):
def __init__(self,
cluster,
topic,
partitioner=random_partitioner,
partitioner=None,
compression=CompressionType.NONE,
max_retries=3,
retry_backoff_ms=100,
Expand Down Expand Up @@ -160,7 +160,7 @@ def __init__(self,
self._cluster = cluster
self._protocol_version = msg_protocol_version(cluster._broker_version)
self._topic = topic
self._partitioner = partitioner
self._partitioner = partitioner or RandomPartitioner()
self._compression = compression
if self._compression == CompressionType.SNAPPY and \
platform.python_implementation == "PyPy":
Expand Down Expand Up @@ -317,13 +317,13 @@ def produce(self, message, partition_key=None, timestamp=None):
:return: The :class:`pykafka.protocol.Message` instance that was
added to the internal message queue
"""
if not (isinstance(partition_key, bytes) or partition_key is None):
if partition_key is not None and type(partition_key) is not bytes:
raise TypeError("Producer.produce accepts a bytes object as partition_key, "
"but it got '%s'", type(partition_key))
if not (isinstance(message, bytes) or message is None):
if message is not None and type(message) is not bytes:
raise TypeError("Producer.produce accepts a bytes object as message, but it "
"got '%s'", type(message))
if self._protocol_version < 1 and timestamp:
if timestamp is not None and self._protocol_version < 1:
raise RuntimeError("Producer.produce got a timestamp with protocol 0")
if not self._running:
raise ProducerStoppedException()
Expand Down
5 changes: 3 additions & 2 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class Message(Message, Serializable):
"protocol_version",
"timestamp"
]
VALID_TS_TYPES = integer_types + (float, type(None))

def __init__(self,
value,
Expand Down Expand Up @@ -276,9 +277,9 @@ def timestamp_dt(self, dt):
1000 * (dt - datetime(1970, 1, 1)).total_seconds())

def set_timestamp(self, ts):
if isinstance(ts, integer_types + (float, type(None))):
if type(ts) in self.VALID_TS_TYPES:
self.timestamp = ts
elif isinstance(ts, datetime):
elif type(ts) == datetime:
self.timestamp_dt = ts
else:
raise RuntimeError()
Expand Down
4 changes: 2 additions & 2 deletions pykafka/rdkafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pkg_resources import parse_version

from pykafka.exceptions import RdKafkaStoppedException, ProducerStoppedException
from pykafka.producer import Producer, CompressionType, random_partitioner
from pykafka.producer import Producer, CompressionType
from pykafka.utils.compat import get_bytes
from . import _rd_kafka
from . import helpers
Expand Down Expand Up @@ -37,7 +37,7 @@ class RdKafkaProducer(Producer):
def __init__(self,
cluster,
topic,
partitioner=random_partitioner,
partitioner=None,
compression=CompressionType.NONE,
max_retries=3,
retry_backoff_ms=100,
Expand Down

0 comments on commit b1d958b

Please sign in to comment.