diff --git a/docs/usage.rst b/docs/usage.rst index 6417cd853..a14cce4ec 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -66,8 +66,8 @@ Keyed messages # HashedPartitioner is default (currently uses python hash()) producer = KeyedProducer(kafka) - producer.send_messages(b'my-topic', b'key1', b'some message') - producer.send_messages(b'my-topic', b'key2', b'this methode') + producer.send_messages(b'my-topic', b'some message', key=b'key1') + producer.send_messages(b'my-topic', b'this methode', key=b'key2') # Murmur2Partitioner attempts to mirror the java client hashing producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cdb9..f85773c32 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -329,7 +329,7 @@ def cleanup(obj): else: self.sync_fail_on_error = sync_fail_on_error - def send_messages(self, topic, partition, *msg): + def send_messages(self, topic, partition, *msg, **kwargs): """ Helper method to send produce requests @param: topic, name of topic for produce request -- type str @@ -346,7 +346,7 @@ def send_messages(self, topic, partition, *msg): All messages produced via this method will set the message 'key' to Null """ topic = kafka_bytestring(topic) - return self._send_messages(topic, partition, *msg) + return self._send_messages(topic, partition, *msg, **kwargs) def _send_messages(self, topic, partition, *msg, **kwargs): key = kwargs.pop('key', None) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 13e60d984..c07c2be53 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -45,13 +45,13 @@ def _next_partition(self, topic): return next(self.partition_cycles[topic]) - def send_messages(self, topic, *msg): + def send_messages(self, topic, *msg, **kwargs): if not isinstance(topic, six.binary_type): topic = topic.encode('utf-8') partition = self._next_partition(topic) return super(SimpleProducer, self).send_messages( - topic, partition, *msg + topic, partition, *msg, **kwargs ) def __repr__(self):