Skip to content

Commit

Permalink
Added debug logging function, run_command timeout updates (#244)
Browse files Browse the repository at this point in the history
* Added debug logging function
* Updated tests, added timeout test
* Updated reader
* Updated CI cfg
* Updated change log
  • Loading branch information
pkittenis committed Dec 14, 2020
1 parent 801b76a commit f1c149f
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 18 deletions.
6 changes: 4 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ jobs:
name: Integration tests
- run:
command: |
cd doc; make html; cd ..
cd doc; make html
cd ..
name: make docs
- run:
command: |
python setup.py sdist
cd dist; pip install *; cd ..
cd dist; pip install *
cd ..
name: Source dist install
- run:
command: codecov
Expand Down
15 changes: 15 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
Change Log
============

2.4.0
+++++

Changes
-------

* Added ``pssh.utils.enable_debug_logger`` function.
* ``ParallelSSHClient`` timeout parameter is now also applied to starting remote commands.

Fixes
-----

* ``SSHClient`` with proxy enabled could not be used without setting port - #


2.3.2
+++++

Expand Down
3 changes: 2 additions & 1 deletion pssh/clients/base/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ def run_command(self, command, sudo=False, user=None,
_command = 'sudo -u %s -S ' % (user,)
_shell = shell if shell else '$SHELL -c'
_command += "%s '%s'" % (_shell, command,)
with GTimeout(seconds=self.timeout):
channel = self.execute(_command, use_pty=use_pty)
_timeout = read_timeout if read_timeout else timeout
channel = self.execute(_command, use_pty=use_pty)
_stdout_buffer = ConcurrentRWBuffer()
_stderr_buffer = ConcurrentRWBuffer()
_stdout_reader, _stderr_reader = self._make_output_readers(
Expand Down
9 changes: 2 additions & 7 deletions pssh/clients/native/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,9 @@ def _password_auth(self):
def open_session(self):
"""Open new channel from session"""
try:
chan = self.session.open_session()
chan = self._eagain(self.session.open_session)
except Exception as ex:
raise SessionError(ex)
while chan == LIBSSH2_ERROR_EAGAIN:
self.poll()
try:
chan = self.session.open_session()
except Exception as ex:
raise SessionError(ex)
# Multiple forward requests result in ChannelRequestDenied
# errors, flag is used to avoid this.
if self.forward_ssh_agent and not self._forward_requested:
Expand Down Expand Up @@ -727,6 +721,7 @@ def poll(self, timeout=None):
Blocks current greenlet only if socket has pending read or write operations
in the appropriate direction.
"""
timeout = self.timeout if timeout is None else timeout
directions = self.session.block_directions()
if directions == 0:
return
Expand Down
8 changes: 6 additions & 2 deletions pssh/clients/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
class ConcurrentRWBuffer(object):
"""Concurrent reader/writer of bytes for use from multiple greenlets.
Iterate on buffer object to read data, blocking greenlet if no data exists
Iterate on buffer object to read data, yielding greenlet if no data exists
until self.eof has been set.
Writers should ``eof.set()`` when finished writing data via ``write``.
Readers can use ``read()`` to get any available data, or None.
"""
__slots__ = ('_buffer', '_read_pos', '_write_pos', 'eof', '_lock')

def __init__(self):
self._buffer = BytesIO()
Expand All @@ -54,7 +55,10 @@ def write(self, data):
self._write_pos += self._buffer.write(data)

def read(self):
"""Read available data, or return None"""
"""Read available data, or return None
:rtype: bytes
"""
with self._lock:
if self._write_pos == 0 or self._read_pos == self._write_pos:
return
Expand Down
3 changes: 2 additions & 1 deletion pssh/clients/ssh/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def open_session(self):
while channel.open_session() == SSH_AGAIN:
logger.debug(
"Channel open session blocked, waiting on socket..")
self.poll(timeout=self.timeout)
self.poll()
# Select on open session can dead lock without
# yielding event loop
sleep(.1)
Expand Down Expand Up @@ -323,6 +323,7 @@ def close_channel(self, channel):

def poll(self, timeout=None):
"""ssh-python based co-operative gevent select on session socket."""
timeout = self.timeout if timeout is None else timeout
directions = self.session.get_poll_flags()
if directions == 0:
return
Expand Down
8 changes: 6 additions & 2 deletions pssh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

def enable_logger(_logger, level=logging.INFO):
"""Enables logging to stdout for given logger"""
_logger.setLevel(level)
stream_handlers = [h for h in _logger.handlers
if isinstance(h, logging.StreamHandler)]
if stream_handlers:
Expand All @@ -35,12 +36,15 @@ def enable_logger(_logger, level=logging.INFO):
host_log_format = logging.Formatter('%(message)s')
handler.setFormatter(host_log_format)
_logger.addHandler(handler)
_logger.setLevel(level)


def enable_host_logger():
"""Enable host logger for logging stdout from remote commands
as it becomes available.
"""
enable_logger(host_logger)


def enable_debug_logger():
"""Enable debug logging for the library to sdout."""
return enable_logger(logger, level=logging.DEBUG)
13 changes: 13 additions & 0 deletions tests/native/test_parallel_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ def test_pssh_client_timeout(self):
self.assertIsInstance(output[0].exception,
Timeout)

def test_timeout_on_open_session(self):
timeout = 1
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key,
timeout=timeout,
num_retries=1)
def _session(timeout=1):
sleep(timeout+1)
joinall(client.connect_auth())
sleep(.01)
client._host_clients[(0, self.host)].open_session = _session
self.assertRaises(Timeout, client.run_command, self.cmd)

def test_connection_timeout(self):
client_timeout = .01
host = 'fakehost.com'
Expand Down
12 changes: 11 additions & 1 deletion tests/native/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from hashlib import sha256
from datetime import datetime

from gevent import socket, sleep, spawn
from gevent import socket, sleep, spawn, Timeout as GTimeout

from pssh.clients.native import SSHClient
from ssh2.session import Session
Expand Down Expand Up @@ -71,6 +71,16 @@ def test_execute(self):
self.assertEqual(host_out.exit_code, 0)
self.assertEqual(expected, output)

def test_open_session_timeout(self):
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
timeout=1)
def _session(timeout=2):
sleep(2)
client.open_session = _session
self.assertRaises(GTimeout, client.run_command, self.cmd)

def test_finished_error(self):
self.assertRaises(ValueError, self.client.wait_finished, None)
self.assertIsNone(self.client.finished(None))
Expand Down
15 changes: 14 additions & 1 deletion tests/ssh/test_parallel_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import datetime
from sys import version_info

from gevent import joinall, spawn, socket, Greenlet
from gevent import joinall, spawn, socket, Greenlet, sleep
from pssh import logger as pssh_logger
from pssh.output import HostOutput
from pssh.exceptions import UnknownHostException, \
Expand Down Expand Up @@ -79,6 +79,19 @@ def make_random_port(self):
sock.close()
return listen_port

def test_timeout_on_open_session(self):
timeout = 1
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key,
timeout=timeout,
num_retries=1)
def _session(timeout=1):
sleep(timeout+1)
joinall(client.connect_auth())
sleep(.01)
client._host_clients[(0, self.host)].open_session = _session
self.assertRaises(Timeout, client.run_command, self.cmd)

def test_join_timeout(self):
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key)
Expand Down
11 changes: 11 additions & 0 deletions tests/ssh/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from datetime import datetime

from gevent import sleep, Timeout as GTimeout
from ssh.session import Session
# from ssh.exceptions import SocketDisconnectError
from pssh.exceptions import AuthenticationException, ConnectionErrorException, \
Expand All @@ -41,6 +42,16 @@ def test_context_manager(self):
num_retries=1) as client:
self.assertIsInstance(client, SSHClient)

def test_open_session_timeout(self):
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
timeout=1)
def _session(timeout=2):
sleep(2)
client.open_session = _session
self.assertRaises(GTimeout, client.run_command, self.cmd)

def test_execute(self):
host_out = self.client.run_command(self.cmd)
output = list(host_out.stdout)
Expand Down
4 changes: 3 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import unittest
from logging import NullHandler
from logging import NullHandler, DEBUG

from pssh import utils

Expand All @@ -39,4 +39,6 @@ def test_enabling_pssh_logger(self):
utils.enable_logger(utils.logger)
self.assertTrue(len([h for h in utils.logger.handlers
if not isinstance(h, NullHandler)]) == 1)
utils.enable_debug_logger()
self.assertEqual(utils.logger.level, DEBUG)
utils.logger.handlers = [NullHandler()]

0 comments on commit f1c149f

Please sign in to comment.