From 30e39bef81b4f62fbaabefff5c14fe0b5e76b982 Mon Sep 17 00:00:00 2001 From: Harel Ben-Attia Date: Mon, 13 Feb 2017 16:56:23 +0200 Subject: [PATCH 1/3] Fail-fast on timeout constraint violations during KafkaConsumer creation --- kafka/consumer/group.py | 10 ++++++++++ test/test_consumer.py | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 47c721ff3..890e84eb2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,6 +6,8 @@ import sys import time +from kafka.errors import KafkaConfigurationError + from kafka.vendor import six from kafka.client_async import KafkaClient, selectors @@ -267,6 +269,14 @@ def __init__(self, *topics, **configs): new_config, self.config['auto_offset_reset']) self.config['auto_offset_reset'] = new_config + request_timeout_ms = self.config['request_timeout_ms'] + session_timeout_ms = self.config['session_timeout_ms'] + fetch_max_wait_ms = self.config['fetch_max_wait_ms'] + if request_timeout_ms <= session_timeout_ms: + raise KafkaConfigurationError("Request timeout ({}) must be larger than session timeout ({})".format(request_timeout_ms, session_timeout_ms)) + if request_timeout_ms <= fetch_max_wait_ms: + raise KafkaConfigurationError("Request timeout ({}) must be larger than fetch-max-wait-ms ({})".format(request_timeout_ms, fetch_max_wait_ms)) + metrics_tags = {'client-id': self.config['client_id']} metric_config = MetricConfig(samples=self.config['metrics_num_samples'], time_window_ms=self.config['metrics_sample_window_ms'], diff --git a/test/test_consumer.py b/test/test_consumer.py index f3dad1622..073a3af86 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -16,6 +16,14 @@ def test_non_integer_partitions(self): with self.assertRaises(AssertionError): SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + def test_session_timeout_larger_than_request_timeout_raises(self): + with self.assertRaises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000) + + def test_fetch_max_wait_larger_than_request_timeout_raises(self): + with self.assertRaises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000) + class TestMultiProcessConsumer(unittest.TestCase): @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows') From a96d7bb72aab5011ff1eedb71d5c01ea73a2144d Mon Sep 17 00:00:00 2001 From: Harel Ben-Attia Date: Mon, 13 Feb 2017 17:23:29 +0200 Subject: [PATCH 2/3] Removed .format() usage to comply with python 2.6 --- kafka/consumer/group.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 890e84eb2..6092006f0 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -273,9 +273,13 @@ def __init__(self, *topics, **configs): session_timeout_ms = self.config['session_timeout_ms'] fetch_max_wait_ms = self.config['fetch_max_wait_ms'] if request_timeout_ms <= session_timeout_ms: - raise KafkaConfigurationError("Request timeout ({}) must be larger than session timeout ({})".format(request_timeout_ms, session_timeout_ms)) + raise KafkaConfigurationError( + "Request timeout (" + str(request_timeout_ms) + ") must be larger than session timeout (" + + str(session_timeout_ms) + ")") if request_timeout_ms <= fetch_max_wait_ms: - raise KafkaConfigurationError("Request timeout ({}) must be larger than fetch-max-wait-ms ({})".format(request_timeout_ms, fetch_max_wait_ms)) + raise KafkaConfigurationError( + "Request timeout (" + str(request_timeout_ms) + ") must be larger than fetch-max-wait-ms (" + + str(fetch_max_wait_ms) + ")") metrics_tags = {'client-id': self.config['client_id']} metric_config = MetricConfig(samples=self.config['metrics_num_samples'], From 3b2c72ba7892f6a538495a48c79492df09618737 Mon Sep 17 00:00:00 2001 From: Harel Ben-Attia Date: Sat, 25 Feb 2017 17:41:29 +0200 Subject: [PATCH 3/3] Use more readable string interpolation --- kafka/consumer/group.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6092006f0..a300c8333 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -274,12 +274,11 @@ def __init__(self, *topics, **configs): fetch_max_wait_ms = self.config['fetch_max_wait_ms'] if request_timeout_ms <= session_timeout_ms: raise KafkaConfigurationError( - "Request timeout (" + str(request_timeout_ms) + ") must be larger than session timeout (" + - str(session_timeout_ms) + ")") + "Request timeout (%s) must be larger than session timeout (%s)" % + (request_timeout_ms, session_timeout_ms)) if request_timeout_ms <= fetch_max_wait_ms: - raise KafkaConfigurationError( - "Request timeout (" + str(request_timeout_ms) + ") must be larger than fetch-max-wait-ms (" + - str(fetch_max_wait_ms) + ")") + raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" % + (request_timeout_ms, fetch_max_wait_ms)) metrics_tags = {'client-id': self.config['client_id']} metric_config = MetricConfig(samples=self.config['metrics_num_samples'],