Skip to content

Commit

Permalink
Merge pull request #379 from dpkp/deprecate_keyed_producer_send
Browse files Browse the repository at this point in the history
Deprecate KeyedProducer.send in favor of send_messages
  • Loading branch information
dpkp committed Jun 8, 2015
2 parents ce702ef + 45c2630 commit 062ddff
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
4 changes: 2 additions & 2 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Keyed messages
# HashedPartitioner is default
producer = KeyedProducer(kafka)
producer.send("my-topic", "key1", "some message")
producer.send("my-topic", "key2", "this methode")
producer.send_messages("my-topic", "key1", "some message")
producer.send_messages("my-topic", "key2", "this methode")
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
Expand Down
11 changes: 6 additions & 5 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import

import logging
import warnings

from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring
Expand Down Expand Up @@ -69,15 +70,15 @@ def _next_partition(self, topic, key):
partitioner = self.partitioners[topic]
return partitioner.partition(key)

def send_messages(self,topic,key,*msg):
def send_messages(self, topic, key, *msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key)
return self._send_messages(topic, partition, *msg, key=key)

# DEPRECATED
def send(self, topic, key, msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, msg, key=key)
warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
return self.send_messages(topic, key, msg)

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async
20 changes: 10 additions & 10 deletions test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ def test_round_robin_partitioner(self):
start_offsets = [self.current_offset(self.topic, p) for p in partitions]

producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two"))
resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three"))
resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))

self.assert_produce_response(resp1, start_offsets[0]+0)
self.assert_produce_response(resp2, start_offsets[1]+0)
Expand All @@ -367,11 +367,11 @@ def test_hashed_partitioner(self):
start_offsets = [self.current_offset(self.topic, p) for p in partitions]

producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one"))
resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two"))
resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three"))
resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four"))
resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five"))

offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
messages = {partitions[0]: [], partitions[1]: []}
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_async_keyed_producer(self):

producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)

resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
self.assertEqual(len(resp), 0)

# wait for the server to report a new highwatermark
Expand Down

0 comments on commit 062ddff

Please sign in to comment.