Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 84 additions & 44 deletions embedded_server/embedded_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,58 @@
Implements:
* Execution of commands via exec_command
* Public key and password auth
* Direct TCP tunneling
* Direct TCP tunneling (port forwarding)
* SSH agent forwarding
* Stub SFTP server from Paramiko
* Forced authentication failure
* Forced server timeout for connection timeout simulation

Does _not_ support interactive shells, our clients do not use them.
Does _not_ support interactive shells - it is intended for purely API driven use.

Server private key is hardcoded. Server listen code inspired by demo_server.py in \
Paramiko repository.
An embedded private key is provided as `embedded_server.host_key` and may be overriden

Server runs asynchronously in its own greenlet. Call `start_server` with a new `multiprocessing.Process` to run it on a new process with its own event loop.
Server runs asynchronously in its own greenlet.

*Warning* - Note that commands, with or without a shell, are actually run on the system running this server. Destructive commands will affect the system as permissions of user running the server allow. **Use at your own risk**.

Example Usage
===============

from embedded_server import start_server, start_server_from_ip, make_socket

Make server from existing socket
----------------------------------

socket = make_socket('127.0.0.1')
server = start_server(socket)

Make server from IP and optionally port
-----------------------------------------

server, listen_port = start_server_from_ip('127.0.0.1')
other_server, _ = start_server_from_ip('127.0.0.1', port=1234)
"""

import sys
if 'threading' in sys.modules:
del sys.modules['threading']
import gevent
from gevent import monkey
monkey.patch_all()

import os
import socket
import gevent
from gevent import socket
from gevent.event import Event
import sys
import traceback
import logging
import paramiko
import time
import paramiko
import gevent.subprocess
import gevent.hub

from .stub_sftp import StubSFTPServer
from .tunnel import Tunneler
import gevent.subprocess

logger = logging.getLogger("embedded_server")
paramiko_logger = logging.getLogger('paramiko.transport')
Expand All @@ -62,12 +83,36 @@
os.path.dirname(__file__), 'rsa.key']))

class Server(paramiko.ServerInterface):
def __init__(self, transport, fail_auth=False,
"""Implements :mod:`paramiko.ServerInterface` to provide an
embedded SSH2 server implementation.

Start a `Server` with at least a :mod:`paramiko.Transport` object
and a host private key.

Any SSH2 client with public key or password authentication
is allowed, only. Interactive shell requests are not accepted.

Implemented:
* Direct tcp-ip channels (tunneling)
* SSH Agent forwarding on request
* PTY requests
* Exec requests (run a command on server)

Not Implemented:
* Interactive shell requests
"""

def __init__(self, transport, host_key, fail_auth=False,
ssh_exception=False):
paramiko.ServerInterface.__init__(self)
transport.load_server_moduli()
transport.add_server_key(host_key)
transport.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
self.transport = transport
self.event = Event()
self.fail_auth = fail_auth
self.ssh_exception = ssh_exception
self.transport = transport
self.host_key = host_key

def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
Expand Down Expand Up @@ -108,10 +153,14 @@ def check_channel_direct_tcpip_request(self, chanid, origin, destination):
logger.error("Error creating proxy connection to %s - %s",
destination, ex,)
return paramiko.OPEN_FAILED_CONNECT_FAILED
self.event.set()
gevent.sleep()
logger.debug("Proxy connection started")
return paramiko.OPEN_SUCCEEDED

def check_channel_forward_agent_request(self, channel):
logger.debug("Forward agent key request for channel %s" % (channel,))
gevent.sleep()
return True

def check_channel_exec_request(self, channel, cmd,
Expand All @@ -124,6 +173,7 @@ def check_channel_exec_request(self, channel, cmd,
stdin=gevent.subprocess.PIPE,
shell=True, env=_env)
gevent.spawn(self._read_response, channel, process)
gevent.sleep(0)
return True

def _read_response(self, channel, process):
Expand All @@ -135,17 +185,18 @@ def _read_response(self, channel, process):
channel.send_exit_status(process.returncode)
logger.debug("Command finished with return code %s", process.returncode)
# Let clients consume output from channel before closing
gevent.sleep(.2)
gevent.sleep(.1)
channel.close()
gevent.sleep(0)

def make_socket(listen_ip, port=0):
"""Make socket on given address and available port chosen by OS"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((listen_ip, port))
except Exception as e:
logger.error('Failed to bind to address - %s' % (str(e),))
except Exception as ex:
logger.error('Failed to bind to address - %s', ex)
traceback.print_exc()
return
return sock
Expand All @@ -157,31 +208,21 @@ def listen(sock, fail_auth=False, ssh_exception=False,
where server is a joinable server thread and socket is listening
socket of server.
"""
listen_ip, listen_port = sock.getsockname()
if not sock:
logger.error("Could not establish listening connection on %s:%s",
listen_ip, listen_port)
return
# sock = make_socket(ip, port=port)
try:
sock.listen(100)
logger.info('Listening for connection on %s:%s..', listen_ip,
listen_port)
except Exception as e:
logger.error('*** Listen failed: %s' % (str(e),))
traceback.print_exc()
return
handle_ssh_connection(sock, fail_auth=fail_auth,
host, port = sock.getsockname()
logger.info('Listening for connection on %s:%s..', host, port)
return handle_ssh_connection(sock, fail_auth=fail_auth,
timeout=timeout, ssh_exception=ssh_exception)

def _handle_ssh_connection(transport, fail_auth=False,
ssh_exception=False):
try:
transport.load_server_moduli()
except:
return
transport.add_server_key(host_key)
transport.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
server = Server(transport,
server = Server(transport, host_key,
fail_auth=fail_auth, ssh_exception=ssh_exception)
try:
transport.start_server(server=server)
Expand All @@ -191,7 +232,9 @@ def _handle_ssh_connection(transport, fail_auth=False,
except Exception:
logger.exception("Error occured starting server")
return
gevent.sleep(0)
# *Important* Allow other greenlets to execute before establishing connection
# which may be handled by said other greenlets
gevent.sleep(.5)
channel = transport.accept(20)
if not channel:
logger.error("Could not establish channel")
Expand All @@ -211,31 +254,28 @@ def handle_ssh_connection(sock,
if timeout:
logger.debug("SSH server sleeping for %s then raising socket.timeout",
timeout)
gevent.Timeout(timeout).start()
gevent.Timeout(timeout).start().get()
try:
transport = paramiko.Transport(conn)
_handle_ssh_connection(transport, fail_auth=fail_auth,
ssh_exception=ssh_exception)
return _handle_ssh_connection(transport, fail_auth=fail_auth,
ssh_exception=ssh_exception)
except Exception as e:
logger.error('*** Caught exception: %s: %s' % (str(e.__class__), str(e),))
traceback.print_exc()
try:
transport.close()
except:
except Exception:
pass
return

def start_server(sock, fail_auth=False, ssh_exception=False,
timeout=None):
return gevent.spawn(listen, sock, fail_auth=fail_auth,
timeout=timeout, ssh_exception=ssh_exception)

if __name__ == "__main__":
logging.basicConfig()
logger.setLevel(logging.DEBUG)
sock = make_socket('127.0.0.1')
server = start_server(sock)
try:
server.join()
except KeyboardInterrupt:
sys.exit(0)
def start_server_from_ip(ip, port=0,
fail_auth=False, ssh_exception=False,
timeout=None):
server_sock = make_socket(ip, port=port)
server = start_server(server_sock, fail_auth=fail_auth,
ssh_exception=ssh_exception, timeout=timeout)
return server, server_sock.getsockname()[1]
43 changes: 0 additions & 43 deletions embedded_server/fake_agent.py

This file was deleted.

13 changes: 10 additions & 3 deletions embedded_server/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
from gevent import socket, select
import logging

logger = logging.getLogger("fake_server")
logger = logging.getLogger("embedded_server.tunnel")

class Tunneler(gevent.Greenlet):
class Tunneler(gevent.Greenlet):

def __init__(self, address, transport, chanid):
gevent.Greenlet.__init__(self)
gevent.sleep(.2)
logger.info("Tunneller creating connection -> %s", address)
self.socket = socket.create_connection(address)
self.transport = transport
self.chanid = chanid
gevent.sleep(0)

def close(self):
try:
Expand All @@ -50,12 +54,14 @@ def tunnel(self, dest_socket, source_chan):
response_data = dest_socket.recv(1024)
source_chan.sendall(response_data)
logger.debug("Tunnel sent data..")
gevent.sleep(0)
gevent.sleep(.1)
finally:
source_chan.close()
dest_socket.close()
gevent.sleep(0)

def run(self):
logger.info("Tunnel waiting for connection")
channel = self.transport.accept(20)
if not channel:
return
Expand All @@ -69,3 +75,4 @@ def run(self):
except Exception as ex:
logger.exception("Got exception creating tunnel - %s", ex,)
logger.debug("Finished tunneling")
gevent.sleep(0)
1 change: 1 addition & 0 deletions examples/parallel_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
start = datetime.datetime.now()
for _output in output:
client.join(_output)
print(_output)
end = datetime.datetime.now()
print("All commands finished in %s" % (end-start,))
1 change: 1 addition & 0 deletions pssh/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import paramiko.agent

class SSHAgent(paramiko.agent.AgentSSH):
""":mod:`paramiko.agent.AgentSSH` compatible class for programmatically
Expand Down
18 changes: 12 additions & 6 deletions pssh/pssh_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,19 @@ def run_command(self, *args, **kwargs):
:rtype: Dictionary with host as key as per \
:mod:`pssh.pssh_client.ParallelSSHClient.get_output`

:raises: :mod:`pssh.exceptions.AuthenticationException` on authentication error
:raises: :mod:`pssh.exceptions.UnknownHostException` on DNS resolution error
:raises: :mod:`pssh.exceptions.ConnectionErrorException` on error connecting
:raises: :mod:`pssh.exceptions.SSHException` on other undefined SSH errors
:raises: :mod:`pssh.exceptions.HostArgumentException` on number of host \
arguments not equal to number of hosts
:raises: :mod:`pssh.exceptions.AuthenticationException` on \
authentication error
:raises: :mod:`pssh.exceptions.UnknownHostException` on DNS resolution \
error
:raises: :mod:`pssh.exceptions.ConnectionErrorException` on error \
connecting
:raises: :mod:`pssh.exceptions.SSHException` on other undefined SSH \
errors
:raises: :mod:`pssh.exceptions.HostArgumentException` on number of \
host arguments not equal to number of hosts
:raises: `TypeError` on not enough host arguments for cmd string format
:raises: `KeyError` on no host argument key in arguments dict for cmd \
string format

**Example Usage**

Expand Down
5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
setuptools>=21.0
paramiko>=1.12,!=1.16.0
gevent<=1.1; python_version < '3'
gevent>=1.1; python_version >= '3'
gevent<=1.1; python_version < '2.7'
gevent>=1.1; python_version >= '2.7'
Loading