From 45c263037cba15e120a103a9e21753c1c7a277b2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 17 May 2015 16:24:20 -0700 Subject: [PATCH] Deprecate KeyedProducer.send in favor of send_messages -- keep interface consistent --- docs/usage.rst | 4 ++-- kafka/producer/keyed.py | 11 ++++++----- test/test_producer_integration.py | 20 ++++++++++---------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/docs/usage.rst b/docs/usage.rst index 150d1212b..3e5f4348b 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -63,8 +63,8 @@ Keyed messages # HashedPartitioner is default producer = KeyedProducer(kafka) - producer.send("my-topic", "key1", "some message") - producer.send("my-topic", "key2", "this methode") + producer.send_messages("my-topic", "key1", "some message") + producer.send_messages("my-topic", "key2", "this methode") producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 333b6c0cf..bc428034d 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import logging +import warnings from kafka.partitioner import HashedPartitioner from kafka.util import kafka_bytestring @@ -58,15 +59,15 @@ def _next_partition(self, topic, key): partitioner = self.partitioners[topic] return partitioner.partition(key) - def send_messages(self,topic,key,*msg): + def send_messages(self, topic, key, *msg): topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *msg,key=key) + return self._send_messages(topic, partition, *msg, key=key) + # DEPRECATED def send(self, topic, key, msg): - topic = kafka_bytestring(topic) - partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, msg, key=key) + warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning) + return self.send_messages(topic, key, msg) def __repr__(self): return '' % self.async diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index e3f7767f9..c81716df4 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -332,10 +332,10 @@ def test_round_robin_partitioner(self): start_offsets = [self.current_offset(self.topic, p) for p in partitions] producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) + resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two")) + resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three")) + resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) self.assert_produce_response(resp1, start_offsets[0]+0) self.assert_produce_response(resp2, start_offsets[1]+0) @@ -353,11 +353,11 @@ def test_hashed_partitioner(self): start_offsets = [self.current_offset(self.topic, p) for p in partitions] producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) + resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one")) + resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two")) + resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three")) + resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four")) + resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five")) offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} messages = {partitions[0]: [], partitions[1]: []} @@ -386,7 +386,7 @@ def test_async_keyed_producer(self): producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) - resp = producer.send(self.topic, self.key("key1"), self.msg("one")) + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) self.assertEqual(len(resp), 0) # wait for the server to report a new highwatermark