diff --git a/Changelog.rst b/Changelog.rst index fabbb1ba..ad8a5f6a 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -17,6 +17,7 @@ Fixes ----- * ``HostOutput`` would have empty host on some exceptions when ``stop_on_errors`` is ``False`` - #297 +* Race condition when forcefully closing channel via ``SSHClient.close_channel`` while channel data was left unread. 2.6.0 +++++ diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 4603c618..8e62db57 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -21,6 +21,7 @@ from warnings import warn from gevent import sleep, spawn, get_hub +from gevent.lock import RLock from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \ Timeout as SSH2Timeout @@ -127,6 +128,7 @@ def __init__(self, host, identity_auth=identity_auth, ) proxy_host = '127.0.0.1' + self._chan_lock = RLock() super(SSHClient, self).__init__( host, user=user, password=password, port=port, pkey=pkey, num_retries=num_retries, retry_delay=retry_delay, @@ -291,10 +293,12 @@ def execute(self, cmd, use_pty=False, channel=None): def _read_output_to_buffer(self, read_func, _buffer): try: while True: - size, data = read_func() + with self._chan_lock: + size, data = read_func() while size == LIBSSH2_ERROR_EAGAIN: self.poll() - size, data = read_func() + with self._chan_lock: + size, data = read_func() if size <= 0: break _buffer.write(data) @@ -325,8 +329,9 @@ def wait_finished(self, host_output, timeout=None): self.close_channel(channel) def close_channel(self, channel): - logger.debug("Closing channel") - self._eagain(channel.close) + with self._chan_lock: + logger.debug("Closing channel") + self._eagain(channel.close) def _eagain(self, func, *args, **kwargs): return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs)