Skip to content

Commit

Permalink
Fix initialization order in KafkaClient (#2119)
Browse files Browse the repository at this point in the history
Fix initialization order in KafkaClient
  • Loading branch information
pecalleja committed Sep 17, 2020
1 parent b32f369 commit e485a6e
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions kafka/client_async.py
Expand Up @@ -201,18 +201,22 @@ def __init__(self, **configs):
if key in configs:
self.config[key] = configs[key]

# these properties need to be set on top of the initialization pipeline
# because they are used when __del__ method is called
self._closed = False
self._wake_r, self._wake_w = socket.socketpair()
self._selector = self.config['selector']()

self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
self._sending = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
Expand All @@ -226,7 +230,6 @@ def __init__(self, **configs):

self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
self._sensors = None
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
Expand Down

0 comments on commit e485a6e

Please sign in to comment.