Skip to content

Commit

Permalink
Do not require client lock for read-only operations (#1730)
Browse files Browse the repository at this point in the history
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
  • Loading branch information
dpkp committed Mar 7, 2019
1 parent 37699be commit 7a99013
Showing 1 changed file with 50 additions and 50 deletions.
100 changes: 50 additions & 50 deletions kafka/client_async.py
Expand Up @@ -402,10 +402,10 @@ def ready(self, node_id, metadata_priority=True):

def connected(self, node_id):
"""Return True iff the node_id is connected."""
with self._lock:
if node_id not in self._conns:
return False
return self._conns[node_id].connected()
conn = self._conns.get(node_id)
if conn is None:
return False
return conn.connected()

def _close(self):
if not self._closed:
Expand Down Expand Up @@ -448,10 +448,10 @@ def is_disconnected(self, node_id):
Returns:
bool: True iff the node exists and is disconnected
"""
with self._lock:
if node_id not in self._conns:
return False
return self._conns[node_id].disconnected()
conn = self._conns.get(node_id)
if conn is None:
return False
return conn.disconnected()

def connection_delay(self, node_id):
"""
Expand All @@ -467,10 +467,10 @@ def connection_delay(self, node_id):
Returns:
int: The number of milliseconds to wait.
"""
with self._lock:
if node_id not in self._conns:
return 0
return self._conns[node_id].connection_delay()
conn = self._conns.get(node_id)
if conn is None:
return 0
return conn.connection_delay()

def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
Expand Down Expand Up @@ -656,13 +656,14 @@ def in_flight_request_count(self, node_id=None):
Returns:
int: pending in-flight requests for the node, or all nodes if None
"""
with self._lock:
if node_id is not None:
if node_id not in self._conns:
return 0
return len(self._conns[node_id].in_flight_requests)
else:
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
if node_id is not None:
conn = self._conns.get(node_id)
if conn is None:
return 0
return len(conn.in_flight_requests)
else:
return sum([len(conn.in_flight_requests)
for conn in list(self._conns.values())])

def _fire_pending_completed_requests(self):
responses = []
Expand All @@ -689,38 +690,37 @@ def least_loaded_node(self):
Returns:
node_id or None if no suitable node was found
"""
with self._lock:
nodes = [broker.nodeId for broker in self.cluster.brokers()]
random.shuffle(nodes)

inflight = float('inf')
found = None
for node_id in nodes:
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected()
blacked_out = conn is not None and conn.blacked_out()
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection
# with no in-flight requests, we can stop right away
return node_id
elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id

if found is not None:
return found

# some broker versions return an empty list of broker metadata
# if there are no topics created yet. the bootstrap process
# should detect this and keep a 'bootstrap' node alive until
# a non-bootstrap node is connected and non-empty broker
# metadata is available
elif 'bootstrap' in self._conns:
return 'bootstrap'
nodes = [broker.nodeId for broker in self.cluster.brokers()]
random.shuffle(nodes)

return None
inflight = float('inf')
found = None
for node_id in nodes:
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected()
blacked_out = conn is not None and conn.blacked_out()
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection
# with no in-flight requests, we can stop right away
return node_id
elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id

if found is not None:
return found

# some broker versions return an empty list of broker metadata
# if there are no topics created yet. the bootstrap process
# should detect this and keep a 'bootstrap' node alive until
# a non-bootstrap node is connected and non-empty broker
# metadata is available
elif 'bootstrap' in self._conns:
return 'bootstrap'

return None

def set_topics(self, topics):
"""Set specific topics to track for metadata.
Expand Down

0 comments on commit 7a99013

Please sign in to comment.