Skip to content

Commit 432f00e

Browse files
harelbadpkp
authored andcommitted
Fail-fast on timeout constraint violations during KafkaConsumer creation (#986)
1 parent bcb4009 commit 432f00e

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

kafka/consumer/group.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import sys
77
import time
88

9+
from kafka.errors import KafkaConfigurationError
10+
911
from kafka.vendor import six
1012

1113
from kafka.client_async import KafkaClient, selectors
@@ -267,6 +269,17 @@ def __init__(self, *topics, **configs):
267269
new_config, self.config['auto_offset_reset'])
268270
self.config['auto_offset_reset'] = new_config
269271

272+
request_timeout_ms = self.config['request_timeout_ms']
273+
session_timeout_ms = self.config['session_timeout_ms']
274+
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
275+
if request_timeout_ms <= session_timeout_ms:
276+
raise KafkaConfigurationError(
277+
"Request timeout (%s) must be larger than session timeout (%s)" %
278+
(request_timeout_ms, session_timeout_ms))
279+
if request_timeout_ms <= fetch_max_wait_ms:
280+
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
281+
(request_timeout_ms, fetch_max_wait_ms))
282+
270283
metrics_tags = {'client-id': self.config['client_id']}
271284
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
272285
time_window_ms=self.config['metrics_sample_window_ms'],

test/test_consumer.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ def test_non_integer_partitions(self):
1616
with self.assertRaises(AssertionError):
1717
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
1818

19+
def test_session_timeout_larger_than_request_timeout_raises(self):
20+
with self.assertRaises(KafkaConfigurationError):
21+
KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
22+
23+
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
24+
with self.assertRaises(KafkaConfigurationError):
25+
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
26+
1927

2028
class TestMultiProcessConsumer(unittest.TestCase):
2129
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')

0 commit comments

Comments
 (0)