Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change SimpleProducer to use async_send (async is reserved in py37) #1454

Merged
merged 2 commits into from Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/simple.rst
Expand Up @@ -49,7 +49,7 @@ Asynchronous Mode

# To send messages asynchronously
client = SimpleClient('localhost:9092')
producer = SimpleProducer(client, async=True)
producer = SimpleProducer(client, async_send=True)
producer.send_messages('my-topic', b'async message')

# To send messages in batch. You can use any of the available
Expand All @@ -60,7 +60,7 @@ Asynchronous Mode
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(client,
async=True,
async_send=True,
batch_send_every_n=20,
batch_send_every_t=60)

Expand All @@ -73,7 +73,7 @@ Synchronous Mode

# To send messages synchronously
client = SimpleClient('localhost:9092')
producer = SimpleProducer(client, async=False)
producer = SimpleProducer(client, async_send=False)

# Note that the application is responsible for encoding messages to type bytes
producer.send_messages('my-topic', b'some message')
Expand All @@ -88,7 +88,7 @@ Synchronous Mode
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
producer = SimpleProducer(client,
async=False,
async_send=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000,
sync_fail_on_error=False)
Expand Down
38 changes: 23 additions & 15 deletions kafka/producer/base.py
Expand Up @@ -226,7 +226,7 @@ class Producer(object):

Arguments:
client (kafka.SimpleClient): instance to use for broker
communications. If async=True, the background thread will use
communications. If async_send=True, the background thread will use
:meth:`client.copy`, which is expected to return a thread-safe
object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
Expand All @@ -238,11 +238,11 @@ class Producer(object):
sync_fail_on_error (bool, optional): whether sync producer should
raise exceptions (True), or just return errors (False),
defaults to True.
async (bool, optional): send message using a background thread,
async_send (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
batch_send_every_n (int, optional): If async_send is True, messages are
sent in batches of this size, defaults to 20.
batch_send_every_t (int or float, optional): If async is True,
batch_send_every_t (int or float, optional): If async_send is True,
messages are sent immediately after this timeout in seconds, even
if there are fewer than batch_send_every_n, defaults to 20.
async_retry_limit (int, optional): number of retries for failed messages
Expand All @@ -268,8 +268,10 @@ class Producer(object):
defaults to 30.

Deprecated Arguments:
async (bool, optional): send message using a background thread,
defaults to False. Deprecated, use 'async_send'
batch_send (bool, optional): If True, messages are sent by a background
thread in batches, defaults to False. Deprecated, use 'async'
thread in batches, defaults to False. Deprecated, use 'async_send'
"""
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
Expand All @@ -282,8 +284,8 @@ def __init__(self, client,
codec=None,
codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
async_send=False,
batch_send=False, # deprecated, use async_send
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
Expand All @@ -292,15 +294,21 @@ def __init__(self, client,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
**kwargs):

# async renamed async_send for python3.7 support
if 'async' in kwargs:
log.warning('Deprecated async option found -- use async_send')
async_send = kwargs['async']

if async:
if async_send:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0

self.client = client
self.async = async
self.async_send = async_send
self.req_acks = req_acks
self.ack_timeout = ack_timeout
self.stopped = False
Expand All @@ -313,7 +321,7 @@ def __init__(self, client,
self.codec = codec
self.codec_compresslevel = codec_compresslevel

if self.async:
if self.async_send:
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
Expand Down Expand Up @@ -400,7 +408,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")

if self.async:
if self.async_send:
for idx, m in enumerate(msg):
try:
item = (TopicPartition(topic, partition), m, key)
Expand Down Expand Up @@ -435,15 +443,15 @@ def stop(self, timeout=None):
log.warning('timeout argument to stop() is deprecated - '
'it will be removed in future release')

if not self.async:
if not self.async_send:
log.warning('producer.stop() called, but producer is not async')
return

if self.stopped:
log.warning('producer.stop() called, but producer is already stopped')
return

if self.async:
if self.async_send:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread_stop_event.set()
self.thread.join()
Expand Down Expand Up @@ -471,5 +479,5 @@ def stop(self, timeout=None):
self.stopped = True

def __del__(self):
if self.async and not self.stopped:
if self.async_send and not self.stopped:
self.stop()
2 changes: 1 addition & 1 deletion kafka/producer/keyed.py
Expand Up @@ -46,4 +46,4 @@ def send(self, topic, key, msg):
return self.send_messages(topic, key, msg)

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async
return '<KeyedProducer batch=%s>' % self.async_send
2 changes: 1 addition & 1 deletion kafka/producer/simple.py
Expand Up @@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
)

def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async
return '<SimpleProducer batch=%s>' % self.async_send
8 changes: 4 additions & 4 deletions test/test_failover_integration.py
Expand Up @@ -60,7 +60,7 @@ def test_switch_leader(self):
# require that the server commit messages to all in-sync replicas
# so that failover doesn't lose any messages on server-side
# and we can assert that server-side message count equals client-side
producer = Producer(self.client, async=False,
producer = Producer(self.client, async_send=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)

# Send 100 random messages to a specific partition
Expand Down Expand Up @@ -101,7 +101,7 @@ def test_switch_leader_async(self):
partition = 0

# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True,
producer = Producer(self.client, async_send=True,
batch_send_every_n=15,
batch_send_every_t=3,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
Expand Down Expand Up @@ -146,7 +146,7 @@ def test_switch_leader_async(self):
def test_switch_leader_keyed_producer(self):
topic = self.topic

producer = KeyedProducer(self.client, async=False)
producer = KeyedProducer(self.client, async_send=False)

# Send 10 random messages
for _ in range(10):
Expand Down Expand Up @@ -182,7 +182,7 @@ def test_switch_leader_keyed_producer(self):
producer.send_messages(topic, key, msg)

def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=False)
producer = Producer(self.client, async_send=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
self._send_random_messages(producer, self.topic, 0, 2)
consumer.get_messages()
Expand Down
8 changes: 4 additions & 4 deletions test/test_producer_integration.py
Expand Up @@ -210,7 +210,7 @@ def test_async_simple_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)

producer = SimpleProducer(self.client, async=True, random_start=False)
producer = SimpleProducer(self.client, async_send=True, random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)

Expand All @@ -229,7 +229,7 @@ def test_batched_simple_producer__triggers_by_message(self):
batch_interval = 5
producer = SimpleProducer(
self.client,
async=True,
async_send=True,
batch_send_every_n=batch_messages,
batch_send_every_t=batch_interval,
random_start=False)
Expand Down Expand Up @@ -294,7 +294,7 @@ def test_batched_simple_producer__triggers_by_time(self):
batch_interval = 5
producer = SimpleProducer(
self.client,
async=True,
async_send=True,
batch_send_every_n=100,
batch_send_every_t=batch_interval,
random_start=False)
Expand Down Expand Up @@ -426,7 +426,7 @@ def test_async_keyed_producer(self):

producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner,
async=True,
async_send=True,
batch_send_every_t=1)

resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
Expand Down
10 changes: 5 additions & 5 deletions test/test_producer_legacy.py
Expand Up @@ -73,7 +73,7 @@ def partitions(topic):
@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
producer = Producer(MagicMock(), async=True,
producer = Producer(MagicMock(), async_send=True,
async_queue_maxsize=queue_size)

topic = b'test-topic'
Expand All @@ -95,25 +95,25 @@ def test_producer_sync_fail_on_error(self):
with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):

client = SimpleClient(MagicMock())
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)

# This should not raise
(response,) = producer.send_messages('foobar', b'test message')
self.assertEqual(response, error)

producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')

def test_cleanup_is_not_called_on_stopped_producer(self):
producer = Producer(MagicMock(), async=True)
producer = Producer(MagicMock(), async_send=True)
producer.stopped = True
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
self.assertEqual(mocked_stop.call_count, 0)

def test_cleanup_is_called_on_running_producer(self):
producer = Producer(MagicMock(), async=True)
producer = Producer(MagicMock(), async_send=True)
producer.stopped = False
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
Expand Down