diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index c7ad0f59b..36df78326 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -747,7 +747,9 @@ def _poll_once(self, timer, max_records, update_offsets=True): # We do not want to be stuck blocking in poll if we are missing some positions # since the offset lookup may be backing off after a failure - poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000) + poll_timeout_ms = timer.timeout_ms + if self.config['group_id'] is not None: + poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000) if not has_all_fetch_positions: log.debug('poll: do not have all fetch positions...') poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms']) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index d13ce4abb..e1d8d8336 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1198,6 +1198,7 @@ def _run_once(self): self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) elif not self.coordinator.connected(): + self.coordinator._client.maybe_connect(self.coordinator.coordinator_id) self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)