Skip to content

Commit

Permalink
Read output (#227)
Browse files Browse the repository at this point in the history
* Updated native single client to not use native functions. Moved polling to base client.
* Updated libssh client for polling changes.
* Removed native extensions
* Added read timeout tests for single clients.
* Updated setup.py
* Simplified timeout settings. Renamed run_command timeout to read_timeout. 
* Updated changelog.
* File copy tests cleanup their files better.
* Updated dev requirements
* Removed appveyor cfg.
  • Loading branch information
pkittenis committed Sep 27, 2020
1 parent eb0feb8 commit 47f3caf
Show file tree
Hide file tree
Showing 19 changed files with 263 additions and 8,998 deletions.
78 changes: 0 additions & 78 deletions .appveyor.yml

This file was deleted.

6 changes: 6 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ See `Upgrading to API 2.0 <upgrade-link>`_ for examples of code that will need u
* Removed deprecated ``ParallelSSHClient`` ``host_config`` dictionary implementation - now list of ``HostConfig``.
* Removed ``HostOutput.cmd`` attribute.
* Removed ``ParallelSSHClient.host_clients`` attribute.
* Made ``ParallelSSHClient(timeout=<seconds>)`` a global timeout setting for all operations.
* Removed ``run_command(greenlet_timeout=<..>)`` argument - now uses global timeout setting.
* Renamed ``run_command`` ``timeout`` to ``read_timeout=<seconds>)`` for setting output read timeout individually - defaults to global timeout setting.
* Removed ``pssh.native`` package and native code.
* No native code means package architecture has changed to ``none-any``.


Fixes
-----

* Removed now unnecessary locking around SSHClient initialisation so it can be parallelised - #219.
* ``ParallelSSHClient.join`` with encoding would not pass on encoding when reading from output buffers - #214.
* Clients could raise ``Timeout`` early when timeout settings were used with many hosts.


1.13.0
Expand Down
41 changes: 22 additions & 19 deletions pssh/clients/base/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import gevent.pool

from gevent import joinall
from gevent import joinall, spawn, Timeout as GTimeout
from gevent.hub import Hub

from ...constants import DEFAULT_RETRIES, RETRY_DELAY
Expand Down Expand Up @@ -82,7 +82,6 @@ def run_command(self, command, user=None, stop_on_errors=True,
host_args=None, use_pty=False, shell=None,
encoding='utf-8', return_list=True,
*args, **kwargs):
greenlet_timeout = kwargs.pop('greenlet_timeout', None)
if host_args:
try:
cmds = [self.pool.spawn(
Expand All @@ -103,28 +102,31 @@ def run_command(self, command, user=None, stop_on_errors=True,
*args, **kwargs)
for host_i, host in enumerate(self.hosts)]
self.cmds = cmds
joinall(cmds, raise_error=False, timeout=greenlet_timeout)
return self._get_output_from_cmds(cmds, stop_on_errors=stop_on_errors,
timeout=greenlet_timeout,
joinall(cmds, timeout=self.timeout)
return self._get_output_from_cmds(cmds, raise_error=stop_on_errors,
return_list=return_list)

def _get_output_from_cmds(self, cmds, stop_on_errors=False, timeout=None,
def _get_output_from_cmds(self, cmds, raise_error=False,
return_list=True):
return [self._get_output_from_greenlet(cmd, timeout=timeout, raise_error=stop_on_errors)
for cmd in cmds]
_cmds = [spawn(self._get_output_from_greenlet, cmd, raise_error=raise_error)
for cmd in cmds]
finished = joinall(_cmds, raise_error=True)
return [f.get() for f in finished]

def _get_output_from_greenlet(self, cmd, timeout=None, raise_error=False):
def _get_output_from_greenlet(self, cmd, raise_error=False):
try:
host_out = cmd.get(timeout=timeout)
host_out = cmd.get()
return host_out
except Exception as ex:
host = ex.host
except (GTimeout, Exception) as ex:
host = ex.host if hasattr(ex, 'host') else None
if isinstance(ex, GTimeout):
ex = Timeout()
if raise_error:
raise ex
return HostOutput(host, None, None, None, None,
None, exception=ex)

def get_last_output(self, cmds=None, greenlet_timeout=None,
def get_last_output(self, cmds=None, timeout=None,
return_list=True):
"""Get output for last commands executed by ``run_command``
Expand Down Expand Up @@ -153,8 +155,8 @@ def get_last_output(self, cmds=None, greenlet_timeout=None,
if cmds is None:
return
return self._get_output_from_cmds(
cmds, timeout=greenlet_timeout, return_list=return_list,
stop_on_errors=False)
cmds, return_list=return_list,
raise_error=False)

def reset_output_generators(self, host_out, timeout=None,
client=None, channel=None,
Expand Down Expand Up @@ -205,16 +207,17 @@ def _get_host_config_values(self, host_i, host):

def _run_command(self, host_i, host, command, sudo=False, user=None,
shell=None, use_pty=False,
encoding='utf-8', timeout=None):
encoding='utf-8', read_timeout=None):
"""Make SSHClient if needed, run command on host"""
logger.debug("_run_command with read timeout %s", read_timeout)
try:
_client = self._make_ssh_client(host_i, host)
host_out = _client.run_command(
command, sudo=sudo, user=user, shell=shell,
use_pty=use_pty, encoding=encoding, timeout=timeout)
use_pty=use_pty, encoding=encoding, read_timeout=read_timeout)
return host_out
except Exception as ex:
ex.host = host
except (GTimeout, Exception) as ex:
host = ex.host if hasattr(ex, 'host') else None
logger.error("Failed to run on host %s - %s", host, ex)
raise ex

Expand Down
26 changes: 22 additions & 4 deletions pssh/clients/base/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from gevent import sleep, socket
from gevent.hub import Hub
from gevent.select import poll

from ..common import _validate_pkey_path
from ...constants import DEFAULT_RETRIES, RETRY_DELAY
Expand Down Expand Up @@ -249,7 +250,7 @@ def read_output_buffer(self, output_buffer, prefix=None,

def run_command(self, command, sudo=False, user=None,
use_pty=False, shell=None,
encoding='utf-8', timeout=None):
encoding='utf-8', timeout=None, read_timeout=None):
"""Run remote command.
:param command: Command to run.
Expand All @@ -267,8 +268,10 @@ def run_command(self, command, sudo=False, user=None,
:param encoding: Encoding to use for output. Must be valid
`Python codec <https://docs.python.org/2.7/library/codecs.html>`_
:type encoding: str
:param read_timeout: (Optional) Timeout in seconds for reading output.
:type read_timeout: float
:rtype: (channel, host, stdout, stderr, stdin) tuple.
:rtype: :py:class:`pssh.output.HostOutput`
"""
# Fast path for no command substitution needed
if not sudo and not user and not shell:
Expand All @@ -281,12 +284,13 @@ def run_command(self, command, sudo=False, user=None,
_command = 'sudo -u %s -S ' % (user,)
_shell = shell if shell else '$SHELL -c'
_command += "%s '%s'" % (_shell, command,)
_timeout = read_timeout if read_timeout else timeout
channel = self.execute(_command, use_pty=use_pty)
stdout = self.read_output_buffer(
self.read_output(channel, timeout=timeout),
self.read_output(channel, timeout=_timeout),
encoding=encoding)
stderr = self.read_output_buffer(
self.read_stderr(channel, timeout=timeout), encoding=encoding,
self.read_stderr(channel, timeout=_timeout), encoding=encoding,
prefix='\t[err]')
stdin = channel
host_out = HostOutput(self.host, channel, stdout, stderr, stdin, self)
Expand Down Expand Up @@ -404,3 +408,17 @@ def _remote_paths_split(self, file_path):
_sep = file_path.rfind('/')
if _sep > 0:
return file_path[:_sep]

def poll(timeout=None):
raise NotImplementedError

def _poll_socket(self, events, timeout=None):
if self.sock is None:
return
# gevent.select.poll converts seconds to miliseconds to match python socket
# implementation
timeout = timeout * 1000 if timeout is not None else 100
poller = poll()
poller.register(self.sock, eventmask=events)
logger.debug("Polling socket with timeout %s", timeout)
poller.poll(timeout=timeout)
Loading

0 comments on commit 47f3caf

Please sign in to comment.