Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import sys
import time

from kafka.errors import KafkaConfigurationError

from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
Expand Down Expand Up @@ -267,6 +269,17 @@ def __init__(self, *topics, **configs):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config

request_timeout_ms = self.config['request_timeout_ms']
session_timeout_ms = self.config['session_timeout_ms']
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
if request_timeout_ms <= session_timeout_ms:
raise KafkaConfigurationError(
"Request timeout (%s) must be larger than session timeout (%s)" %
(request_timeout_ms, session_timeout_ms))
if request_timeout_ms <= fetch_max_wait_ms:
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
(request_timeout_ms, fetch_max_wait_ms))

metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
Expand Down
8 changes: 8 additions & 0 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])

def test_session_timeout_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)

def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)


class TestMultiProcessConsumer(unittest.TestCase):
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
Expand Down