Skip to content

Commit

Permalink
Error if connections_max_idle_ms not larger than request_timeout_ms
Browse files Browse the repository at this point in the history
`connections_max_idle_ms` must always be larger than `request_timeout_ms`
to avoid potentially unexpected behavior.

Fix #1680.
  • Loading branch information
jeffwidman committed Mar 14, 2019
1 parent 703f065 commit 9433abf
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
10 changes: 7 additions & 3 deletions kafka/consumer/group.py
Expand Up @@ -313,11 +313,15 @@ def __init__(self, *topics, **configs):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config

connections_max_idle_ms = self.config['connections_max_idle_ms']
request_timeout_ms = self.config['request_timeout_ms']
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
if request_timeout_ms <= fetch_max_wait_ms:
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
(request_timeout_ms, fetch_max_wait_ms))
if not (fetch_max_wait_ms < request_timeout_ms < connections_max_idle_ms):
raise KafkaConfigurationError(
"connections_max_idle_ms ({}) must be larger than "
"request_timeout_ms ({}) which must be larger than "
"fetch_max_wait_ms ({})."
.format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))

metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
Expand Down
6 changes: 5 additions & 1 deletion test/test_consumer.py
Expand Up @@ -15,12 +15,16 @@
class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)

def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)

def test_connections_max_idle_ms_smaller_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), connections_max_idle_ms=69000, request_timeout_ms=40000)

def test_subscription_copy(self):
consumer = KafkaConsumer('foo', api_version=(0, 10))
sub = consumer.subscription()
Expand Down

0 comments on commit 9433abf

Please sign in to comment.