Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,18 @@ def _clean_py4j_conn_for_current_thread():
from pyspark import SparkContext

jvm = SparkContext._jvm
thread_connection = jvm._gateway_client.thread_connection.connection()
thread_connection = jvm._gateway_client.get_thread_connection()
if thread_connection is not None:
connections = jvm._gateway_client.deque
# Reuse the lock for Py4J in PySpark
with SparkContext._lock:
for i in range(len(connections)):
if connections[i] is thread_connection:
connections[i].close()
del connections[i]
break
else:
# Just in case the connection was not closed but removed from the
# queue.
thread_connection.close()
try:
# Dequeue is shared across other threads but it's thread-safe.
# If this function has to be invoked one more time in the same thead
# Py4J will create a new connection automatically.
jvm._gateway_client.deque.remove(thread_connection)
except ValueError:
# Should never reach this point
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the removal on the deque on the specific thread local will only happen in the same thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was just copied from Py4J to be extra safe.

return
finally:
thread_connection.close()


if __name__ == "__main__":
Expand Down