Skip to content

Commit c02df4b

Browse files
dpkpjeffwidman
authored andcommitted
Avoid race condition on client._conns in send() (#1772)
There was a very small possibility that between checking `self._can_send_request(node_id)` and grabbing the connection object via `self._conns[node_id]` that the connection could get closed / recycled / removed from _conns and cause a KeyError. This PR should prevent such a KeyError. In the case where the connection is disconnected by the time we call send(), we should expect conn.send() simply to fail the request.
1 parent 3664ae8 commit c02df4b

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

kafka/client_async.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,14 +516,15 @@ def send(self, node_id, request, wakeup=True):
516516
Returns:
517517
Future: resolves to Response struct or Error
518518
"""
519-
if not self._can_send_request(node_id):
519+
conn = self._conns.get(node_id)
520+
if not conn or not self._can_send_request(node_id):
520521
self.maybe_connect(node_id, wakeup=wakeup)
521522
return Future().failure(Errors.NodeNotReadyError(node_id))
522523

523524
# conn.send will queue the request internally
524525
# we will need to call send_pending_requests()
525526
# to trigger network I/O
526-
future = self._conns[node_id].send(request, blocking=False)
527+
future = conn.send(request, blocking=False)
527528

528529
# Wakeup signal is useful in case another thread is
529530
# blocked waiting for incoming network traffic while holding

0 commit comments

Comments
 (0)