From 6f3481ffe51393d4f949fac9ad6956f33527cca9 Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 24 Dec 2020 10:33:33 +0000 Subject: [PATCH] Interactive shell (#255) * Added interactive shells to single and parallel clients, ability to run commands and get output from shell. Added interactive shell tests. * Added reader test * Removed unused files * Updated requirements * Updated setup.py * Updated changelog * Updated config tests * Updated documentation, docstrings --- .gitmodules | 0 Changelog.rst | 5 +- appveyor.yml | 139 ------------------ doc/advanced.rst | 205 +++++++++++++++++++++++++-- doc/api.rst | 1 + doc/base_parallel.rst | 4 +- doc/base_single.rst | 4 +- doc/clients.rst | 2 +- doc/index.rst | 2 +- doc/reader.rst | 7 + doc/ssh_parallel.rst | 4 +- doc/ssh_single.rst | 4 +- pssh/clients/base/parallel.py | 80 ++++++++++- pssh/clients/base/single.py | 140 +++++++++++++++--- pssh/clients/native/parallel.py | 9 +- pssh/clients/native/single.py | 12 ++ pssh/clients/reader.py | 7 +- pssh/clients/ssh/parallel.py | 28 +--- pssh/clients/ssh/single.py | 18 ++- pssh/config.py | 4 +- pssh/exceptions.py | 4 + pssh/output.py | 9 +- requirements.txt | 2 +- setup.py | 4 +- tests/native/test_parallel_client.py | 62 +++++++- tests/native/test_single_client.py | 19 +++ tests/ssh/test_single_client.py | 19 +++ tests/test_host_config.py | 4 + tests/test_reader.py | 7 + 29 files changed, 567 insertions(+), 238 deletions(-) delete mode 100644 .gitmodules delete mode 100644 appveyor.yml create mode 100644 doc/reader.rst diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index e69de29b..00000000 diff --git a/Changelog.rst b/Changelog.rst index 7504e1ca..596cb456 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -7,13 +7,14 @@ Change Log Changes ------- +* Added interactive shell support to single and parallel clients - see `documentation `_. * Added ``pssh.utils.enable_debug_logger`` function. -* ``ParallelSSHClient`` timeout parameter is now also applied to starting remote commands. +* ``ParallelSSHClient`` timeout parameter is now also applied to *starting* remote commands via ``run_command``. Fixes ----- -* ``SSHClient`` with proxy enabled could not be used without setting port - # +* ``SSHClient`` with proxy enabled could not be used without setting port - #248 2.3.2 diff --git a/appveyor.yml b/appveyor.yml deleted file mode 100644 index acbb8f51..00000000 --- a/appveyor.yml +++ /dev/null @@ -1,139 +0,0 @@ -environment: - global: - # SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the - # /E:ON and /V:ON options are not enabled in the batch script intepreter - # See: http://stackoverflow.com/a/13751649/163740 - CMD_IN_ENV: "cmd /E:ON /V:ON /C .\\appveyor\\run_with_env.cmd" - - matrix: - - # Pre-installed Python versions, which Appveyor may upgrade to - # a later point release. - - - PYTHON: "C:\\Python34-x64" - PYTHON_VERSION: "3.4.x" # currently 3.4.3 - PYTHON_ARCH: "64" - PYTHON_EXE: python - - - PYTHON: "C:\\Python27-x64" - PYTHON_VERSION: "2.7.x" # currently 2.7.11 - PYTHON_ARCH: "64" - PYTHON_EXE: python - - - PYTHON: "C:\\Python35-x64" - PYTHON_VERSION: "3.5.x" # currently 3.5.1 - PYTHON_ARCH: "64" - PYTHON_EXE: python - - - PYTHON: "C:\\Python35" - PYTHON_VERSION: "3.5.x" # currently 3.5.0 - PYTHON_ARCH: "32" - PYTHON_EXE: python - - - PYTHON: "C:\\Python27" - PYTHON_VERSION: "2.7.x" # currently 2.7.11 - PYTHON_ARCH: "32" - PYTHON_EXE: python - - - PYTHON: "C:\\Python34" - PYTHON_VERSION: "3.4.x" # currently 3.4.3 - PYTHON_ARCH: "32" - PYTHON_EXE: python - -install: - # If there is a newer build queued for the same PR, cancel this one. - # The AppVeyor 'rollout builds' option is supposed to serve the same - # purpose but it is problematic because it tends to cancel builds pushed - # directly to master instead of just PR builds (or the converse). - # credits: JuliaLang developers. - - ps: if ($env:APPVEYOR_PULL_REQUEST_NUMBER -and $env:APPVEYOR_BUILD_NUMBER -ne ((Invoke-RestMethod ` - https://ci.appveyor.com/api/projects/$env:APPVEYOR_ACCOUNT_NAME/$env:APPVEYOR_PROJECT_SLUG/history?recordsNumber=50).builds | ` - Where-Object pullRequestId -eq $env:APPVEYOR_PULL_REQUEST_NUMBER)[0].buildNumber) { ` - throw "There are newer queued builds for this pull request, failing early." } - - ECHO "Filesystem root:" - - ps: "ls \"C:/\"" - - - ECHO "Installed SDKs:" - - ps: "ls \"C:/Program Files/Microsoft SDKs/Windows\"" - - # Install Python (from the official .msi of http://python.org) and pip when - # not already installed. - # PyPy portion based on https://github.com/wbond/asn1crypto/blob/master/appveyor.yml - - ps: - $env:PYTMP = "${env:TMP}\py"; - if (!(Test-Path "$env:PYTMP")) { - New-Item -ItemType directory -Path "$env:PYTMP" | Out-Null; - } - if ("${env:PYTHON_ID}" -eq "pypy") { - if (!(Test-Path "${env:PYTMP}\pypy-4.0.1-win32.zip")) { - (New-Object Net.WebClient).DownloadFile('https://bitbucket.org/pypy/pypy/downloads/pypy-4.0.1-win32.zip', "${env:PYTMP}\pypy-4.0.1-win32.zip"); - } - 7z x -y "${env:PYTMP}\pypy-4.0.1-win32.zip" -oC:\ | Out-Null; - if (!(Test-Path "${env:PYTMP}\get-pip.py")) { - (New-Object Net.WebClient).DownloadFile('https://bootstrap.pypa.io/get-pip.py', "${env:PYTMP}\get-pip.py"); - } - & "${env:PYTHON}\pypy.exe" "${env:PYTMP}\get-pip.py"; - - } - elseif (-not(Test-Path($env:PYTHON))) { - & appveyor\install.ps1; - } - - # Prepend newly installed Python to the PATH of this build (this cannot be - # done from inside the powershell script as it would require to restart - # the parent CMD process). - - "SET PATH=%PYTHON%;%PYTHON%\\Scripts;%PYTHON%\\bin;%PATH%" - - "SET PYEXE=%PYTHON%\\%PYTHON_EXE%.exe" - - # Check that we have the expected version and architecture for Python - - "%PYEXE% --version" - - "%PYEXE% -c \"import struct; print(struct.calcsize('P') * 8)\"" - - # Upgrade to the latest version of pip to avoid it displaying warnings - # about it being out of date. - - "pip install --disable-pip-version-check --user -U pip" - - # Install the build dependencies of the project. If some dependencies contain - # compiled extensions and are not provided as pre-built wheel packages, - # pip will build them from source using the MSVC compiler matching the - # target Python version and architecture - # NOTE: psutil won't install under PyPy. - - "pip install -r requirements.txt nose" - - - ps: "if(Test-Path(\"${env:PYTHON}\\bin\")) {ls ${env:PYTHON}\\bin;}" - - ps: "if(Test-Path(\"${env:PYTHON}\\Scripts\")) {ls ${env:PYTHON}\\Scripts;}" - -cache: - - "%TMP%\\py\\" - -build_script: - # Build wheel - - "%PYEXE% setup.py install" - # - ps: "ls dist" - # # Now install the wheel. - # # I couldn't get wildcards to work for pip install, so stuff it - # # into a variable, using python to glob. - # - "%PYEXE% -c \"import glob; print(glob.glob('dist/*whl')[0])\" > whl.txt" - # - set /p PYWHL== (3,) and sys.exit(1)' || eval \"2to3 -nw embedded_server\\*.py && 2to3 tests\\*.py -o tests3 -nw && cp tests\\test_client_private_key* tests3\\ && %PYEXE% setup.py nosetests -w tests3\\"" - -after_test: - # We already built the wheel during build_script, because it's - # much faster to do that and install from the wheel than to - # rebuild it here (because we wind up re-building all the cython - # code, even though it's already built on disk; our make.cmd is not smart) - #- "%CMD_IN_ENV% %PYEXE% setup.py bdist_wheel bdist_wininst" - - ps: "ls dist" - -artifacts: - # Archive the generated wheel package in the ci.appveyor.com build report. - - path: dist\* - -#on_success: -# - TODO: upload the content of dist/*.whl to a public wheelhouse -# diff --git a/doc/advanced.rst b/doc/advanced.rst index 20cfa793..f7286bc7 100644 --- a/doc/advanced.rst +++ b/doc/advanced.rst @@ -30,14 +30,14 @@ Native Clients ssh2-python (libssh2) ===================== -Starting from version ``1.2.0``, the default client in ``parallel-ssh`` is based on `ssh2-python` (`libssh2`). It is a native client, offering C level performance with an easy to use Python API. +The default client in ``parallel-ssh`` is based on `ssh2-python` (`libssh2`). It is a native client, offering C level performance with an easy to use Python API. See `this post `_ for a performance comparison of the available clients in the `1.x.x` series. .. code-block:: python - from pssh.clients import ParallelSSHClient + from pssh.clients import ParallelSSHClient, SSHClient hosts = ['my_host', 'my_other_host'] client = ParallelSSHClient(hosts) @@ -55,18 +55,20 @@ See `this post `_ for a perf API documentation for `parallel `_ and `single `_ native clients. +*New in 1.2.0* + ssh-python (libssh) Client ============================ -From version `1.12.0` another client based on `libssh `_ via `ssh-python` is provided for testing purposes. +A set of alternative clients based on `libssh `_ via `ssh-python `_ are also provided. -The API is similar to the default client, while ``ssh-python`` offers more supported authentication methods compared to the default client. +The API is similar to the default client, while ``ssh-python`` offers more supported authentication methods compared to the default client, such as certificate and GSS API authentication. -On the other hand, this client lacks SCP, SFTP and proxy functionality. +On the other hand, these clients lack SCP, SFTP and proxy functionality. .. code-block:: python - from pssh.clients.ssh import ParallelSSHClient + from pssh.clients.ssh import ParallelSSHClient, SSHClient hosts = ['localhost', 'localhost'] client = ParallelSSHClient(hosts) @@ -77,6 +79,12 @@ On the other hand, this client lacks SCP, SFTP and proxy functionality. for line in host_out.stdout: print(line) +.. seealso:: + + API documentation for :py:class:`parallel ` and :py:class:`single ` ssh-python clients. + + +*New in 1.12.0* GSS-API Authentication - aka Kerberos -------------------------------------- @@ -108,14 +116,15 @@ In the ``pssh.clients.ssh`` clients, certificate authentication is supported. from pssh.clients.ssh import ParallelSSHClient - client = ParallelSSHClient(hosts, pkey='id_rsa', cert_file='id_rsa-cert.pub') + client = ParallelSSHClient( + hosts, pkey='id_rsa', cert_file='id_rsa-cert.pub') Where ``id_rsa-cert.pub`` is an RSA signed certificate file for the ``id_rsa`` private key. Both private key and corresponding signed public certificate file must be provided. -``ssh-python`` :py:mod:`ParallelSSHClient ` clients only. +``ssh-python`` :py:mod:`ParallelSSHClient ` only. Proxy Hosts and Tunneling @@ -125,7 +134,7 @@ This is used in cases where the client does not have direct access to the target Commonly used for additional security as only the proxy host needs to have access to the target host. -Client ------> Proxy host --------> Target host +Client --------> Proxy host --------> Target host Proxy host can be configured as follows in the simplest case: @@ -179,7 +188,7 @@ See :py:mod:`HostConfig ` for all possible configuration .. note:: - New tunneling implementation from `2.2.0` for highest performance. + New tunneling implementation from `2.2.0` for best performance. Connecting to dozens or more hosts via a single proxy host will impact performance considerably. @@ -192,7 +201,7 @@ Clients have timeout functionality on reading output and ``client.join``. Join timeout is applied to all parallel commands in total and is separate from ``ParallelSSHClient(timeout=<..>)`` which is applied to SSH session operations individually. -Timeout exceptions contain attributes for which commands have finished and which have not so client code can get output from any finished commands when handling timeouts. +Timeout exceptions from ``join`` contain attributes for which commands have finished and which have not so client code can get output from any finished commands when handling timeouts. .. code-block:: python @@ -410,10 +419,9 @@ While not best practice and password-less ``sudo`` is best configured for a limi client = <..> output = client.run_command(<..>, sudo=True) - for host in output: - stdin = output[host].stdin - stdin.write('my_password\n') - stdin.flush() + for host_out in output: + host_out.stdin.write('my_password\n') + host_out.stdin.flush() client.join(output) .. note:: @@ -458,6 +466,7 @@ Contents of ``stdout`` are `UTF-16` encoded. Encoding must be valid `Python codec `_ + Enabling use of pseudo terminal emulation =========================================== @@ -604,6 +613,172 @@ If wanting to copy a file from a single remote host and retain the original file :py:func:`SSHClient.copy_remote_file ` API documentation and exceptions raised. +Interactive Shells +****************** + +Interactive shells can be used to run commands, as an alternative to ``run_command``. + +This is best used in cases where wanting to run multiple commands per host on the same channel with combined output. + +.. code-block:: python + + client = ParallelSSHClient(<..>) + + cmd = """ + echo me + echo me too + """ + + shells = client.open_shell() + client.run_shell_commands(shells, cmd) + client.join_shells(shells) + + for shell in shells: + for line in shell.stdout: + print(line) + print(shell.exit_code) + + +Running Commands On Shells +========================== + +Command to run can be multi-line, a single command or a list of commands. + +Shells provided are used for all commands, reusing the channel opened by ``open_shell``. + + +Multi-line Commands +------------------- + +Multi-line commands or command string is executed as-is. + +.. code-block:: python + + client = ParallelSSHClient(<..>) + + cmd = """ + echo me + echo me too + """ + + shells = client.open_shell() + client.run_shell_commands(shells, cmd) + + +Single And List Of Commands +--------------------------- + +A single command can be used, as well as a list of commands to run on each shell. + +.. code-block:: python + + cmd = 'echo me three' + client.run_shell_commands(shells, cmd) + + cmd = ['echo me also', 'echo and as well me', 'exit 1'] + client.run_shell_commands(shells, cmd) + + +Waiting For Completion +====================== + +Joining shells waits for running commands to complete and closes shells. + +This allows output to be read up to the last command executed without blocking. + +.. code-block:: python + + client.join_shells(shells) + +Joined on shells are closed and may not run any further commands. + +Trying to use the same shells after ``join_shells`` will raise :py:class:`pssh.exceptions.ShellError`. + + +Reading Shell Output +==================== + +Output for each shell includes all commands executed. + +.. code-block:: python + + for shell in shells: + stdout = list(shell.stdout) + exit_code = shell.exit_code + + +Exit code is for the *last executed command only* and can be retrieved when ``run_shell_commands`` has been used at least once. + +Each shell also has a ``shell.output`` which is a :py:class:`HostOutput ` object. ``shell.stdout`` et al are the same as ``shell.output.stdout``. + + +Reading Partial Shell Output +---------------------------- + +Reading output will **block indefinitely** prior to join being called. Use ``read_timeout`` in order to read partial output. + +.. code-block:: python + + shells = client.open_shell(read_timeout=1) + client.run_shell_commands(shells, ['echo me']) + + # Times out after one second + for line in shells[0].stdout: + print(line) + + +Join Timeouts +============= + +Timeouts on ``join_shells`` can be done similarly to ``join``. + +.. code-block:: python + + cmds = ["echo me", "sleep 1.2"] + + shells = client.open_shell() + client.run_shell_commands(shells, cmds) + client.join_shells(shells, timeout=1) + + +Single Clients +============== + +On single clients shells can be used as a context manager to join and close the shell on exit. + +.. code-block:: python + + client = SSHClient(<..>) + + cmd = 'echo me' + with client.open_shell() as shell: + shell.run(cmd) + print(list(shell.stdout)) + print(shell.exit_code) + + +Or explicitly: + +.. code-block:: python + + cmd = 'echo me' + shell = client.open_shell() + shell.run(cmd) + shell.close() + +Closing a shell also waits for commands to complete. + + +.. seealso:: + + :py:class:`pssh.clients.base.single.InteractiveShell` for more documentation. + + * :py:func:`open_shell() ` + * :py:func:`run_shell_commands() ` + * :py:func:`join_shells() ` + + + Hosts filtering and overriding ******************************* diff --git a/doc/api.rst b/doc/api.rst index 0a31f195..7f45c00a 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -13,6 +13,7 @@ API Documentation output config tunnel + reader utils exceptions constants diff --git a/doc/base_parallel.rst b/doc/base_parallel.rst index 66544668..f1572c65 100644 --- a/doc/base_parallel.rst +++ b/doc/base_parallel.rst @@ -1,5 +1,5 @@ -BaseParallelSSHClient -====================== +Base Parallel Client +==================== API documentation for common parallel client functionality. diff --git a/doc/base_single.rst b/doc/base_single.rst index acc0b4ba..b6e29688 100644 --- a/doc/base_single.rst +++ b/doc/base_single.rst @@ -1,5 +1,5 @@ -BaseSSHClient -=============== +Base Single Client +================== API documentation for common single host client functionality. diff --git a/doc/clients.rst b/doc/clients.rst index c72913a0..047c03a3 100644 --- a/doc/clients.rst +++ b/doc/clients.rst @@ -14,11 +14,11 @@ Kerberos (GSS) authentication Not supported Yes Private key file authentication Yes Yes Agent authentication Yes Yes Password authentication Yes Yes -SFTP copy to/from hosts Yes No OpenSSH config parsing Not yet implemented Not yet implemented ECDSA keys support Yes Yes ED25519 keys support Yes Yes Certificate authentication Not supported Yes +SFTP copy to/from hosts Yes No SCP functionality Yes No Keep-alive functionality Yes No =============================== ====================== ====================== diff --git a/doc/index.rst b/doc/index.rst index e114b3e9..1d26601b 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -73,9 +73,9 @@ Single host client is also available with similar API. introduction installation quickstart - clients advanced api + clients Changelog api_upgrade_2_0 diff --git a/doc/reader.rst b/doc/reader.rst new file mode 100644 index 00000000..c5f9671c --- /dev/null +++ b/doc/reader.rst @@ -0,0 +1,7 @@ +Output reader/writer +==================== + +.. automodule:: pssh.clients.reader + :members: + :undoc-members: + :member-order: groupwise diff --git a/doc/ssh_parallel.rst b/doc/ssh_parallel.rst index 9c8dc64f..f4ab637e 100644 --- a/doc/ssh_parallel.rst +++ b/doc/ssh_parallel.rst @@ -1,5 +1,5 @@ -ssh-python based Parallel Client -================================= +ssh-python Parallel Client +=========================== API documentation for the ``ssh-python`` (``libssh``) based parallel client. diff --git a/doc/ssh_single.rst b/doc/ssh_single.rst index 6abc3026..6e51f96f 100644 --- a/doc/ssh_single.rst +++ b/doc/ssh_single.rst @@ -1,5 +1,5 @@ -ssh-python based Single Host Client -==================================== +ssh-python Single Host Client +============================== Single host non-blocking client based on ``ssh-python`` (``libssh``). Suitable for running asynchronous commands on a single host. diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 126f7185..06a1ed7a 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -26,7 +26,7 @@ from gevent.hub import Hub from ...constants import DEFAULT_RETRIES, RETRY_DELAY -from ...exceptions import HostArgumentError, Timeout +from ...exceptions import HostArgumentError, Timeout, ShellError from ...output import HostOutput @@ -79,6 +79,78 @@ def _check_host_config(self): "Got %s host config entries from %s hosts" % ( len(self.host_config), host_len)) + def _open_shell(self, host_i, host, + encoding='utf-8', read_timeout=None): + try: + _client = self._make_ssh_client(host_i, host) + shell = _client.open_shell( + encoding=encoding, read_timeout=read_timeout) + return shell + except (GTimeout, Exception) as ex: + host = ex.host if hasattr(ex, 'host') else None + logger.error("Failed to run on host %s - %s", host, ex) + raise ex + + def open_shell(self, encoding='utf-8', read_timeout=None): + """Open interactive shells on all hosts. + + :param encoding: Encoding to use for shell output. + :type encoding: str + :param read_timeout: Seconds before reading from output times out. + :type read_timeout: float + + :returns: Opened shells for each of self.hosts, in order. + :rtype: list(:py:class:`pssh.clients.native.base.single.InteractiveShell`) + """ + cmds = [self.pool.spawn( + self._open_shell, host_i, host, encoding=encoding, read_timeout=read_timeout) + for host_i, host in enumerate(self.hosts) + ] + finished = joinall(cmds, raise_error=True) + return [cmd.get() for cmd in finished] + + def run_shell_commands(self, shells, commands): + """Run command(s) on shells. + + :param shells: Shells to run on. + :type shells: list(:py:class:`pssh.clients.base.single.InteractiveShell`) + :param commands: Commands to run. + :type commands: list or str + """ + if not isinstance(commands, list): + commands = [commands] + cmds = [self.pool.spawn(shell.run, cmd) + for shell in shells + for cmd in commands] + try: + finished = joinall(cmds, raise_error=True, timeout=self.timeout) + except Exception as ex: + raise ShellError(ex) + return finished + + def join_shells(self, shells, timeout=None): + """Wait for running commands to complete and close shells. + + :param shells: Shells to join on. + :type shells: list(:py:class:`pssh.clients.base.single.InteractiveShell`) + :param timeout: Seconds before waiting for shell commands to finish times out. + Defaults to self.timeout if not provided. + :type timeout: float + + :raises: :py:class:`pssh.exceptions.Timeout` on timeout requested and + reached with commands still running. + """ + _timeout = self.timeout if timeout is None else timeout + cmds = [self.pool.spawn(shell.close) for shell in shells] + finished = joinall(cmds, timeout=_timeout) + if _timeout is None: + return + finished_shells = [g.get() for g in finished] + unfinished_shells = list(set(shells).difference(set(finished_shells))) + if len(unfinished_shells) > 0: + raise Timeout("Timeout of %s sec(s) reached with commands " + "still running", timeout, finished_shells, unfinished_shells) + def run_command(self, command, user=None, stop_on_errors=True, host_args=None, use_pty=False, shell=None, encoding='utf-8', return_list=True, @@ -279,9 +351,9 @@ def _join(self, host_out, consume_output=False, timeout=None, def finished(self, output=None): """Check if commands have finished without blocking. - :param output: As returned by - :py:func:`pssh.pssh_client.ParallelSSHClient.get_last_output` - :type output: list + :param output: (Optional) Output to check if finished. Defaults to + :py:func:`get_last_output ` + :type output: list(:py:mod:`HostOutput `) :rtype: bool """ diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 77847b02..121c8bbb 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -43,6 +43,77 @@ logger = logging.getLogger(__name__) +class InteractiveShell(object): + """ + Run commands on an interactive shell. + + Use as context manager to wait for commands to finish on exit. + + Read from .stdout and stderr once context manager has exited. + + ``InteractiveShell.output`` is a :py:class:`pssh.output.HostOutput` object. + """ + __slots__ = ('_chan', '_client', 'output') + _EOL = '\n' + + def __init__(self, channel, client, encoding='utf-8', read_timeout=None): + """ + :param channel: The channel to open shell on. + :type channel: ``ssh2.channel.Channel`` or similar. + :param client: The SSHClient that opened the channel. + :type client: :py:class:`BaseSSHClient` + """ + self._chan = channel + self._client = client + self._client._shell(self._chan) + self.output = self._client._make_host_output( + self._chan, encoding=encoding, read_timeout=read_timeout) + + @property + def stdout(self): + """``self.output.stdout``""" + return self.output.stdout + + @property + def stderr(self): + """``self.output.stderr``""" + return self.output.stderr + + @property + def stdin(self): + """``self.output.stdin``""" + return self.output.stdin + + @property + def exit_code(self): + """``self.output.exit_code``""" + return self.output.exit_code + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def close(self): + """Wait for shell to finish executing and close channel.""" + if self._chan is None: + return + self._client._eagain(self._chan.send_eof) + self._client.wait_finished(self.output) + return self + + def run(self, cmd): + """Run command on interactive shell. + + :param cmd: The command string to run. + Note that ``\\n`` is appended to every string. + :type cmd: str + """ + cmd += self._EOL + self._client._eagain_write(self._chan.write, cmd) + + class BaseSSHClient(object): IDENTITIES = ( @@ -123,6 +194,23 @@ def __enter__(self): def __exit__(self, *args): self.disconnect() + def open_shell(self, encoding='utf-8', read_timeout=None): + """Open interactive shell on new channel. + + Can be used as context manager - ``with open_shell() as shell``. + + :param encoding: Encoding to use for output from shell. + :type encoding: str + :param read_timeout: Timeout in seconds for reading from output. + :type read_timeout: float + """ + chan = self.open_session() + shell = InteractiveShell(chan, self, encoding=encoding, read_timeout=read_timeout) + return shell + + def _shell(self, channel): + raise NotImplementedError + def _connect_init_session_retry(self, retries): try: self.session.disconnect() @@ -204,12 +292,30 @@ def auth(self): def _password_auth(self): raise NotImplementedError - def _pkey_auth(self, pkey, password=None): + def _pkey_auth(self, password=None): raise NotImplementedError def open_session(self): raise NotImplementedError + def _make_host_output(self, channel, encoding, read_timeout): + _stdout_buffer = ConcurrentRWBuffer() + _stderr_buffer = ConcurrentRWBuffer() + _stdout_reader, _stderr_reader = self._make_output_readers( + channel, _stdout_buffer, _stderr_buffer) + _stdout_reader.start() + _stderr_reader.start() + _buffers = HostOutputBuffers( + stdout=BufferData(rw_buffer=_stdout_buffer, reader=_stdout_reader), + stderr=BufferData(rw_buffer=_stderr_buffer, reader=_stderr_reader)) + stdin = channel + host_out = HostOutput( + host=self.host, channel=channel, stdin=stdin, + client=self, encoding=encoding, read_timeout=read_timeout, + buffers=_buffers, + ) + return host_out + def _make_output_readers(self, channel, stdout_buffer, stderr_buffer): raise NotImplementedError @@ -271,16 +377,14 @@ def _read_output_buffer(self, _buffer, timeout=None): def _read_output_to_buffer(self, read_func, _buffer): raise NotImplementedError - def wait_finished(self, channel, timeout=None): + def wait_finished(self, host_output, timeout=None): raise NotImplementedError def close_channel(self, channel): raise NotImplementedError def get_exit_status(self, channel): - if not channel.eof(): - return - return channel.get_exit_status() + raise NotImplementedError def read_output_buffer(self, output_buffer, prefix=None, callback=None, @@ -345,23 +449,13 @@ def run_command(self, command, sudo=False, user=None, with GTimeout(seconds=self.timeout): channel = self.execute(_command, use_pty=use_pty) _timeout = read_timeout if read_timeout else timeout - _stdout_buffer = ConcurrentRWBuffer() - _stderr_buffer = ConcurrentRWBuffer() - _stdout_reader, _stderr_reader = self._make_output_readers( - channel, _stdout_buffer, _stderr_buffer) - _stdout_reader.start() - _stderr_reader.start() - _buffers = HostOutputBuffers( - stdout=BufferData(rw_buffer=_stdout_buffer, reader=_stdout_reader), - stderr=BufferData(rw_buffer=_stderr_buffer, reader=_stderr_reader)) - stdin = channel - host_out = HostOutput( - host=self.host, channel=channel, stdin=stdin, - client=self, encoding=encoding, read_timeout=_timeout, - buffers=_buffers, - ) + channel = self.execute(_command, use_pty=use_pty) + host_out = self._make_host_output(channel, encoding, _timeout) return host_out + def _eagain_write(self, write_func, data, timeout=None): + raise NotImplementedError + def _eagain(self, func, *args, **kwargs): raise NotImplementedError @@ -372,7 +466,7 @@ def _mkdir(self, sftp, directory): raise NotImplementedError def copy_file(self, local_file, remote_file, recurse=False, - sftp=None, _dir=None): + sftp=None): raise NotImplementedError def _sftp_put(self, remote_fh, local_file): @@ -383,7 +477,7 @@ def _sftp_put(self, remote_fh, local_file): def sftp_put(self, sftp, local_file, remote_file): raise NotImplementedError - def mkdir(self, sftp, directory, _parent_path=None): + def mkdir(self, sftp, directory): raise NotImplementedError def _copy_dir(self, local_dir, remote_dir, sftp): @@ -475,7 +569,7 @@ def _remote_paths_split(self, file_path): if _sep > 0: return file_path[:_sep] - def poll(timeout=None): + def poll(self, timeout=None): raise NotImplementedError def _poll_socket(self, events, timeout=None): diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index 8bd6fd74..8174f357 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -189,10 +189,8 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True, Parameter kept for backwards compatibility - to be removed in future releases. :type return_list: bool - :rtype: Dictionary with host as key and - :py:class:`pssh.output.HostOutput` as value - *or* list(:py:class:`pssh.output.HostOutput`) when - ``return_list=True`` + :rtype: list(:py:class:`pssh.output.HostOutput`) + :raises: :py:class:`pssh.exceptions.AuthenticationError` on authentication error :raises: :py:class:`pssh.exceptions.UnknownHostError` on DNS @@ -207,8 +205,7 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True, dict for cmd string format :raises: :py:class:`pssh.exceptions.ProxyError` on errors connecting to proxy if a proxy host has been set. - :raises: :py:class:`gevent.Timeout` on greenlet timeout. Gevent timeout - can not be caught by ``stop_on_errors=False``. + :raises: :py:class:`pssh.exceptions.Timeout` on timeout starting command. :raises: Exceptions from :py:mod:`ssh2.exceptions` for all other specific errors such as :py:class:`ssh2.exceptions.SocketDisconnectError` et al. diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 56df8c09..438b9673 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -132,6 +132,9 @@ def __init__(self, host, proxy_host=proxy_host, proxy_port=proxy_port, identity_auth=identity_auth) + def _shell(self, channel): + return self._eagain(channel.shell) + def _connect_proxy(self, proxy_host, proxy_port, proxy_pkey, user=None, password=None, num_retries=DEFAULT_RETRIES, @@ -388,6 +391,9 @@ def copy_file(self, local_file, remote_file, recurse=False, sftp=None): :type remote_file: str :param recurse: Whether or not to descend into directories recursively. :type recurse: bool + :param sftp: SFTP channel to use instead of creating a + new one. + :type sftp: :py:class:`ssh2.sftp.SFTP` :raises: :py:class:`ValueError` when a directory is supplied to ``local_file`` and ``recurse`` is not set @@ -475,6 +481,9 @@ def copy_remote_file(self, remote_file, local_file, recurse=False, :type recurse: bool :param encoding: Encoding to use for file paths. :type encoding: str + :param sftp: SFTP channel to use instead of creating a + new one. + :type sftp: :py:class:`ssh2.sftp.SFTP` :raises: :py:class:`ValueError` when a directory is supplied to ``local_file`` and ``recurse`` is not set @@ -744,3 +753,6 @@ def eagain_write(self, write_func, data, timeout=None): total_written += bytes_written if rc == LIBSSH2_ERROR_EAGAIN: self.poll(timeout=timeout) + + def _eagain_write(self, write_func, data, timeout=None): + return self.eagain_write(write_func, data, timeout=timeout) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index 2b94ab0f..c3f2d818 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -28,11 +28,14 @@ class ConcurrentRWBuffer(object): """Concurrent reader/writer of bytes for use from multiple greenlets. + Supports both concurrent reading and writing. + Iterate on buffer object to read data, yielding greenlet if no data exists until self.eof has been set. Writers should ``eof.set()`` when finished writing data via ``write``. - Readers can use ``read()`` to get any available data, or None. + + Readers can use ``read()`` to get any available data or ``None``. """ __slots__ = ('_buffer', '_read_pos', '_write_pos', 'eof', '_lock') @@ -44,7 +47,7 @@ def __init__(self): self._lock = RLock() def write(self, data): - """Write data to buffer + """Write data to buffer. :param data: Data to write :type data: bytes diff --git a/pssh/clients/ssh/parallel.py b/pssh/clients/ssh/parallel.py index 863351fd..cfccd218 100644 --- a/pssh/clients/ssh/parallel.py +++ b/pssh/clients/ssh/parallel.py @@ -56,7 +56,7 @@ def __init__(self, hosts, user=None, password=None, port=22, pkey=None, :param cert_file: Public key signed certificate file to use for authentication. The corresponding private key must also be provided via ``pkey`` parameter. - For example ``pkey='id_rsa',cert_file='id_rsa-cert.pub'`` for RSA + For example ``pkey='id_rsa', cert_file='id_rsa-cert.pub'`` for RSA signed certificate. Path must be absolute or relative to user home directory. :type cert_file: str @@ -197,24 +197,11 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True, read_timeout and kept for backwards compatibility - to be removed in future release. :type timeout: float - :param greenlet_timeout: (Optional) Greenlet timeout setting. - Defaults to no timeout. If set, this function will raise - :py:class:`gevent.Timeout` after ``greenlet_timeout`` seconds - if no result is available from greenlets. - - In some cases, such as when using proxy hosts, connection timeout - is controlled by proxy server and getting result from greenlets may - hang indefinitely if remote server is unavailable. - - Use this setting - to avoid blocking in such circumstances. - Note that ``gevent.Timeout`` is a special class that inherits from - ``BaseException`` and thus **can not be caught** by - ``stop_on_errors=False``. - :type greenlet_timeout: float - :rtype: Dictionary with host as key and - :py:class:`pssh.output.HostOutput` as value as per - :py:func:`pssh.pssh_client.ParallelSSHClient.get_output` + :param return_list: No-op - list of ``HostOutput`` always returned. + Parameter kept for backwards compatibility - to be removed in future + releases. + :type return_list: bool + :rtype: list(:py:class:`pssh.output.HostOutput`) :raises: :py:class:`pssh.exceptions.AuthenticationError` on authentication error @@ -230,8 +217,7 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True, dict for cmd string format :raises: :py:class:`pssh.exceptions.ProxyError` on errors connecting to proxy if a proxy host has been set. - :raises: :py:class:`gevent.Timeout` on greenlet timeout. Gevent timeout - can not be caught by ``stop_on_errors=False``. + :raises: :py:class:`pssh.exceptions.Timeout` on timeout starting command. :raises: Exceptions from :py:mod:`ssh.exceptions` for all other specific errors. """ diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index c0762df9..f5069c10 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -156,7 +156,7 @@ def auth(self): if self.pkey is not None: logger.debug( "Proceeding with private key file authentication") - return self._pkey_auth(self.pkey, self.password) + return self._pkey_auth(self.password) if self.allow_agent: try: self.session.userauth_agent(self.user) @@ -194,8 +194,8 @@ def _password_auth(self): except Exception as ex: raise AuthenticationError("Password authentication failed - %s", ex) - def _pkey_auth(self, pkey, password=None): - pkey = import_privkey_file(pkey, passphrase=password if password is not None else '') + def _pkey_auth(self, password=None): + pkey = import_privkey_file(self.pkey, passphrase=password if password is not None else '') if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") self._import_cert_file(pkey) @@ -207,6 +207,9 @@ def _import_cert_file(self, pkey): copy_cert_to_privkey(cert_key, pkey) logger.debug("Imported certificate file %s for pkey %s", self.cert_file, self.pkey) + def _shell(self, channel): + return self._eagain(channel.request_shell) + def open_session(self): """Open new channel from session.""" logger.debug("Opening new channel on %s", self.host) @@ -343,3 +346,12 @@ def _eagain(self, func, *args, **kwargs): self.poll(timeout=timeout) ret = func(*args, **kwargs) return ret + + def _eagain_write(self, write_func, data, timeout=None): + data_len = len(data) + total_written = 0 + while total_written < data_len: + rc, bytes_written = write_func(data[total_written:]) + total_written += bytes_written + if rc == SSH_AGAIN: + self.poll(timeout=timeout) diff --git a/pssh/config.py b/pssh/config.py index 7ee680cf..038d98be 100644 --- a/pssh/config.py +++ b/pssh/config.py @@ -42,9 +42,9 @@ def __init__(self, user=None, port=None, password=None, private_key=None, :type user: str :param port: Port number. :type port: int - :password: Password to login with. + :param password: Password to login with. :type password: str - :private key: Private key file to use for authentication. + :param private_key: Private key file to use for authentication. :type private_key: str :param allow_agent: Enable/disable SSH agent authentication. :type allow_agent: bool diff --git a/pssh/exceptions.py b/pssh/exceptions.py index d37c16de..fcfeb20f 100644 --- a/pssh/exceptions.py +++ b/pssh/exceptions.py @@ -88,3 +88,7 @@ class SCPError(Exception): class PKeyFileError(Exception): """Raised on errors finding private key file""" + + +class ShellError(Exception): + """Raised on errors running command on interactive shell""" diff --git a/pssh/output.py b/pssh/output.py index 128332d8..1d4223b6 100644 --- a/pssh/output.py +++ b/pssh/output.py @@ -67,14 +67,10 @@ def __init__(self, host, channel, stdin, :type host: str :param channel: SSH channel used for command execution :type channel: :py:class:`socket.socket` compatible object - :param stdout: Standard output buffer - :type stdout: generator - :param stderr: Standard error buffer - :type stderr: generator :param stdin: Standard input buffer :type stdin: :py:func:`file`-like object :param client: `SSHClient` output is coming from. - :type client: :py:class:`pssh.clients.base_ssh_client.SSHClient` + :type client: :py:class:`pssh.clients.base.single.BaseSSHClient` :param exception: Exception from host if any :type exception: :py:class:`Exception` or ``None`` :param read_timeout: Timeout in seconds for reading from buffers. @@ -123,13 +119,10 @@ def __repr__(self): return "\thost={host}{linesep}" \ "\texit_code={exit_code}{linesep}" \ "\tchannel={channel}{linesep}" \ - "\tstdout={stdout}{linesep}\tstderr={stderr}{linesep}" \ - "\tstdin={stdin}{linesep}" \ "\texception={exception}{linesep}" \ "\tencoding={encoding}{linesep}" \ "\tread_timeout={read_timeout}".format( host=self.host, channel=self.channel, - stdout=self.stdout, stdin=self.stdin, stderr=self.stderr, exception=self.exception, linesep=linesep, exit_code=self.exit_code, encoding=self.encoding, read_timeout=self.read_timeout, ) diff --git a/requirements.txt b/requirements.txt index e0ab4b56..ee762ab0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ gevent>=1.1 ssh2-python>=0.22.0 -ssh-python>=0.8.0 +ssh-python>=0.9.0 diff --git a/setup.py b/setup.py index ba14ff7b..79987017 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ '*.tests', '*.tests.*') ), install_requires=[ - 'gevent>=1.1', 'ssh2-python>=0.22.0', 'ssh-python>=0.8.0'], + 'gevent>=1.1', 'ssh2-python>=0.22.0', 'ssh-python>=0.9.0'], classifiers=[ 'Development Status :: 5 - Production/Stable', 'License :: OSI Approved :: GNU Lesser General Public License v2 (LGPLv2)', @@ -52,6 +52,8 @@ 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', 'Topic :: System :: Networking', 'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries :: Python Modules', diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index e41c8411..368cc597 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -36,7 +36,7 @@ from pssh.exceptions import UnknownHostException, \ AuthenticationException, ConnectionErrorException, SessionError, \ HostArgumentException, SFTPError, SFTPIOError, Timeout, SCPError, \ - PKeyFileError + PKeyFileError, ShellError from pssh.output import HostOutput from .base_ssh2_case import PKEY_FILENAME, PUB_FILE @@ -88,6 +88,66 @@ def test_connect_auth(self): client = ParallelSSHClient([self.host], pkey=self.user_key, port=self.port, num_retries=1) joinall(client.connect_auth(), raise_error=True) + def test_client_shells(self): + shells = self.client.open_shell() + self.client.run_shell_commands(shells, self.cmd) + self.client.run_shell_commands(shells, [self.cmd, self.cmd]) + self.client.run_shell_commands( + shells, """ + %s + exit 1 + """ % (self.cmd,)) + self.client.join_shells(shells) + self.assertRaises(ShellError, self.client.run_shell_commands, shells, self.cmd) + for shell in shells: + stdout = list(shell.stdout) + self.assertListEqual(stdout, [self.resp, self.resp, self.resp, self.resp]) + expected_exit_code = 1 + self.assertEqual(shell.exit_code, expected_exit_code) + self.assertListEqual(list(shell.stderr), []) + self.assertTrue(shell.stdin is not None) + + def test_client_shells_read_timeout(self): + shells = self.client.open_shell(read_timeout=1) + self.client.run_shell_commands(shells, self.cmd) + self.client.run_shell_commands(shells, [self.cmd, 'sleep 2', 'exit 1']) + stdout = [] + for shell in shells: + try: + for line in shell.output.stdout: + stdout.append(line) + except Timeout: + pass + self.assertListEqual(stdout, [self.resp, self.resp]) + self.assertEqual(shell.output.exit_code, None) + expected_exit_code = 1 + self.client.join_shells(shells) + self.assertEqual(shell.output.exit_code, expected_exit_code) + + def test_client_shells_timeout(self): + client = ParallelSSHClient([self.host], pkey=self.user_key, port=self.port, + timeout=0.01, num_retries=1) + self.assertRaises(Timeout, client.open_shell) + + def test_client_shells_join_timeout(self): + shells = self.client.open_shell() + cmd = """ + echo me + sleep 2 + echo me + """ + self.client.run_shell_commands(shells, cmd) + self.assertRaises(Timeout, self.client.join_shells, shells, timeout=1) + try: + self.client.join_shells(shells, timeout=.5) + except Timeout: + pass + else: + raise AssertionError + self.client.join_shells(shells, timeout=2) + stdout = list(shells[0].stdout) + self.assertListEqual(stdout, [self.resp, self.resp]) + def test_client_join_consume_output(self): output = self.client.run_command(self.cmd) expected_exit_code = 0 diff --git a/tests/native/test_single_client.py b/tests/native/test_single_client.py index 59c8d65e..5f146260 100644 --- a/tests/native/test_single_client.py +++ b/tests/native/test_single_client.py @@ -564,6 +564,25 @@ def test_scp_recv_dir_target(self): except OSError: pass + def test_interactive_shell(self): + with self.client.open_shell() as shell: + shell.run(self.cmd) + shell.run(self.cmd) + stdout = list(shell.stdout) + self.assertListEqual(stdout, [self.resp, self.resp]) + self.assertEqual(shell.exit_code, 0) + + def test_interactive_shell_exit_code(self): + with self.client.open_shell() as shell: + shell.run(self.cmd) + shell.run('sleep 1') + shell.run(self.cmd) + shell.run('exit 1') + stdout = list(shell.stdout) + self.assertListEqual(stdout, [self.resp, self.resp]) + self.assertEqual(shell.exit_code, 1) + + # TODO # * scp send recursive # * scp recv recursive local dir permission denied diff --git a/tests/ssh/test_single_client.py b/tests/ssh/test_single_client.py index 7ca2067e..a54a7ac0 100644 --- a/tests/ssh/test_single_client.py +++ b/tests/ssh/test_single_client.py @@ -123,6 +123,25 @@ def scope_killer(): self.assertListEqual(output, [self.resp]) scope_killer() + def test_interactive_shell(self): + with self.client.open_shell() as shell: + shell.run(self.cmd) + shell.run(self.cmd) + stdout = list(shell.output.stdout) + self.assertListEqual(stdout, [self.resp, self.resp]) + self.assertEqual(shell.output.exit_code, 0) + + def test_interactive_shell_exit_code(self): + with self.client.open_shell() as shell: + shell.run(self.cmd) + shell.run('sleep 1') + shell.run(self.cmd) + shell.run('exit 1') + stdout = list(shell.output.stdout) + self.assertListEqual(stdout, [self.resp, self.resp]) + self.assertEqual(shell.output.exit_code, 1) + + # TODO: # * read timeouts # * session connect retry diff --git a/tests/test_host_config.py b/tests/test_host_config.py index d94ce637..ba6d2267 100644 --- a/tests/test_host_config.py +++ b/tests/test_host_config.py @@ -59,4 +59,8 @@ def test_host_config_bad_entries(self): self.assertRaises(ValueError, HostConfig, timeout='') self.assertRaises(ValueError, HostConfig, identity_auth='') self.assertRaises(ValueError, HostConfig, proxy_host=1) + self.assertRaises(ValueError, HostConfig, proxy_port='') + self.assertRaises(ValueError, HostConfig, proxy_user=1) + self.assertRaises(ValueError, HostConfig, proxy_password=1) + self.assertRaises(ValueError, HostConfig, proxy_pkey=1) self.assertRaises(ValueError, HostConfig, keepalive_seconds='') diff --git a/tests/test_reader.py b/tests/test_reader.py index 9c7a88b4..4bf9230c 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -77,3 +77,10 @@ def _writer(_buffer): self.assertEqual(data, _data) writer.kill() writer.get() + + def test_non_cur_write(self): + data = b"asdf" + self.buffer.write(data) + self.buffer._buffer.seek(0) + self.buffer.write(data) + self.assertEqual(self.buffer.read(), data + data)