Skip to content

Commit

Permalink
Merge 684e765 into bd557da
Browse files Browse the repository at this point in the history
  • Loading branch information
pecalleja authored Sep 5, 2020
2 parents bd557da + 684e765 commit 8d473da
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,21 @@ def __init__(self, **configs):
for key in self.config:
if key in configs:
self.config[key] = configs[key]

self._closed = False
self._wake_r, self._wake_w = socket.socketpair()
self._selector = self.config['selector']()
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()

self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
self._sending = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()

self._wake_r.setblocking(False)
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
Expand All @@ -226,7 +228,7 @@ def __init__(self, **configs):

self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False

self._sensors = None
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
Expand Down

0 comments on commit 8d473da

Please sign in to comment.