Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ def close(self) -> None:
"""
Close the channel.
"""
ExecutePlanResponseReattachableIterator.shutdown()
ExecutePlanResponseReattachableIterator.shutdown_threadpool()
self._channel.close()
self._closed = True

Expand Down
47 changes: 30 additions & 17 deletions python/pyspark/sql/connect/client/reattach.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,41 @@ class ExecutePlanResponseReattachableIterator(Generator):

# Lock to manage the pool
_lock: ClassVar[RLock] = RLock()
_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
_release_thread_pool_instance: Optional[ThreadPool] = None

@classmethod # type: ignore[misc]
@property
def _release_thread_pool(cls) -> ThreadPool:
# Perform a first check outside the critical path.
if cls._release_thread_pool_instance is not None:
return cls._release_thread_pool_instance
with cls._lock:
if cls._release_thread_pool_instance is None:
cls._release_thread_pool_instance = ThreadPool(
os.cpu_count() if os.cpu_count() else 8
)
return cls._release_thread_pool_instance

@classmethod
def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
def shutdown_threadpool(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
"""
When the channel is closed, this method will be called before, to make sure all
outstanding calls are closed.
"""
with cls._lock:
if cls._release_thread_pool is not None:
cls._release_thread_pool.close()
cls._release_thread_pool.join()
cls._release_thread_pool = None
if cls._release_thread_pool_instance is not None:
cls._release_thread_pool.close() # type: ignore[attr-defined]
cls._release_thread_pool.join() # type: ignore[attr-defined]
cls._release_thread_pool_instance = None

@classmethod
def _initialize_pool_if_necessary(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
"""
If the processing pool for the release calls is None, initialize the pool exactly once.
When the channel is closed, this method will be called before, to make sure all
outstanding calls are closed, and mark this iterator is shutdown.
"""
with cls._lock:
if cls._release_thread_pool is None:
cls._release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
with self._lock:
self.shutdown_threadpool()
self._is_shutdown = True

def __init__(
self,
Expand All @@ -88,7 +101,7 @@ def __init__(
retrying: Callable[[], Retrying],
metadata: Iterable[Tuple[str, str]],
):
ExecutePlanResponseReattachableIterator._initialize_pool_if_necessary()
self._is_shutdown = False
self._request = request
self._retrying = retrying
if request.operation_id:
Expand Down Expand Up @@ -206,8 +219,8 @@ def target() -> None:
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

if ExecutePlanResponseReattachableIterator._release_thread_pool is not None:
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
if not self._is_shutdown:
self._release_thread_pool.apply_async(target)
Comment on lines +222 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is a bit confusing to me, but it might simply because of the way Python works. My understanding was that self only gives me access to instance but not class variables, but _release_thread_pool is an actual class variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah It actually accesses to the class variable, and it is not either discouraged or encouraged in PEP 8 if I am correctly remembering this. I just used self to make it look consistent, e.g., at shutdown but I don't mind changing it back to explicit ExecutePlanResponseReattachableIterator.


def _release_all(self) -> None:
"""
Expand All @@ -230,8 +243,8 @@ def target() -> None:
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

if ExecutePlanResponseReattachableIterator._release_thread_pool is not None:
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
if not self._is_shutdown:
self._release_thread_pool.apply_async(target)
self._result_complete = True

def _call_iter(self, iter_fun: Callable) -> Any:
Expand Down