Skip to content

Commit

Permalink
Merge pull request #34 from erasche/rsync
Browse files Browse the repository at this point in the history
SSH Transport Implemented
  • Loading branch information
jmchilton committed Oct 18, 2014
2 parents 554ea36 + 1cd3052 commit 91ec064
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 2 deletions.
14 changes: 14 additions & 0 deletions docs/source/files/file_actions_sample_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,17 @@ paths:
action: rewrite
source_directory: /galaxy/data
destination_directory: /work/galaxy/data

# The following demonstrates use of the Rsync transport layer
- path: /galaxy/files/
action: remote_rsync_transfer
# Additionally the action remote_scp_transfer is available which behaves in
# an identical manner
ssh_user: galaxy
ssh_host: f.q.d.n
ssh_port: 22
ssh_key: |
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAwWYGEOhiJgbWv8eBV2LJp0MCSgrAPeX9FbsGn1I+UC110PPW
...
96 changes: 95 additions & 1 deletion pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from .util import unique_path_prefix
from .transport import get_file
from .transport import post_file
from .transport import rsync_get_file, scp_get_file
from .transport import rsync_post_file, scp_post_file
import tempfile


DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception?
Expand Down Expand Up @@ -130,6 +133,7 @@ def __init__(self, client=None, config=None):
if config is None:
config = self.__client_to_config(client)
self.default_action = config.get("default_action", "transfer")
self.ssh_key = config.get("ssh_key", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)

Expand All @@ -155,6 +159,7 @@ def to_dict(self):
return dict(
default_action=self.default_action,
files_endpoint=self.files_endpoint,
ssh_key=self.ssh_key,
paths=map(lambda m: m.to_dict(), self.mappers)
)

Expand All @@ -166,6 +171,8 @@ def __client_to_config(self, client):
config = dict()
config["default_action"] = client.default_file_action
config["files_endpoint"] = client.files_endpoint
if hasattr(client, 'ssh_key'):
config["ssh_key"] = client.ssh_key
return config

def __load_action_config(self, path):
Expand Down Expand Up @@ -209,6 +216,9 @@ def __process_action(self, action, file_type):
# TODO: URL encode path.
url = "%s&path=%s&file_type=%s" % (url_base, action.path, file_type)
action.url = url
elif action.action_type in ["remote_rsync_transfer", "remote_scp_transfer"]:
# Required, so no check for presence
action.ssh_key = self.ssh_key

REQUIRED_ACTION_KWD = object()

Expand Down Expand Up @@ -375,6 +385,88 @@ def write_from_path(self, pulsar_path):
post_file(self.url, pulsar_path)


class PubkeyAuthenticatedTransferAction(BaseAction):
"""Base class for file transfers requiring an SSH public/private key
"""
action_spec = dict(
ssh_user=REQUIRED_ACTION_KWD,
ssh_host=REQUIRED_ACTION_KWD,
ssh_port=REQUIRED_ACTION_KWD,
)
action_type = "remote_pubkey_transfer"
staging = STAGING_ACTION_REMOTE
ssh_key = None

def __init__(self, path, file_lister=None, url=None, ssh_user=None,
ssh_host=None, ssh_port=None, ssh_key=None):
super(PubkeyAuthenticatedTransferAction, self).__init__(path, file_lister=file_lister)
self.url = url
self.ssh_user = ssh_user
self.ssh_host = ssh_host
self.ssh_port = ssh_port
self.ssh_key = ssh_key

def to_dict(self):
return dict(path=self.path, action_type=self.action_type, url=self.url,
ssh_user=self.ssh_user, ssh_host=self.ssh_host,
ssh_port=self.ssh_port)

def serialize_ssh_key(self):
f = tempfile.NamedTemporaryFile()
if self.ssh_key is not None:
f.write(self.ssh_key)
else:
raise Exception("SSH_KEY not available")
f.close()
return f.name


class RsyncTransferAction(PubkeyAuthenticatedTransferAction):
action_type = "remote_rsync_transfer"

@classmethod
def from_dict(cls, action_dict):
return RsyncTransferAction(path=action_dict["path"],
url=action_dict["url"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
ssh_key=action_dict["ssh_key"])

def write_to_path(self, path):
key_file = self.serialize_ssh_key()
rsync_get_file(self.path, path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)

def write_from_path(self, pulsar_path):
key_file = self.serialize_ssh_key()
rsync_post_file(pulsar_path, self.path, self.ssh_user,
self.ssh_host, self.ssh_port, key_file)


class ScpTransferAction(PubkeyAuthenticatedTransferAction):
action_type = "remote_scp_transfer"

@classmethod
def from_dict(cls, action_dict):
return ScpTransferAction(path=action_dict["path"],
url=action_dict["url"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
ssh_key=action_dict["ssh_key"])

def write_to_path(self, path):
key_file = self.serialize_ssh_key()
scp_get_file(self.path, path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)

def write_from_path(self, pulsar_path):
key_file = self.serialize_ssh_key()
scp_post_file(pulsar_path, self.path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)


class MessageAction(object):
""" Sort of pseudo action describing "files" store in memory and
transferred via message (HTTP, Python-call, MQ, etc...)
Expand Down Expand Up @@ -408,7 +500,7 @@ def write_to_path(self, path):
open(path, "w").write(self.contents)


DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction]
DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction, RsyncTransferAction, ScpTransferAction]


def from_dict(action_dict):
Expand Down Expand Up @@ -554,6 +646,8 @@ def unstructured_map(self, path):
CopyAction,
RemoteCopyAction,
RemoteTransferAction,
RsyncTransferAction,
ScpTransferAction,
]
actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES])

Expand Down
7 changes: 7 additions & 0 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def __init__(self, destination_params, job_id):
)
else:
job_directory = None

if "ssh_key" in (destination_params or {}):
self.ssh_key = destination_params["ssh_key"]
else:
self.ssh_key = None

self.env = destination_params.get("env", [])
self.files_endpoint = destination_params.get("files_endpoint", None)
self.job_directory = job_directory
Expand Down Expand Up @@ -271,6 +277,7 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
launch_params['env'] = env
if remote_staging:
launch_params['remote_staging'] = remote_staging
launch_params['remote_staging']['ssh_key'] = self.ssh_key
if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
# destination arguments. Hence, must have Pulsar setup job
Expand Down
6 changes: 5 additions & 1 deletion pulsar/client/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ def __get_transport_type(transport_type, os_module):
from .poster import get_file
from .poster import post_file

__all__ = [get_transport, get_file, post_file]
from .ssh import rsync_get_file, scp_get_file
from .ssh import rsync_post_file, scp_post_file

__all__ = [get_transport, get_file, post_file, rsync_get_file, rsync_post_file,
scp_get_file, scp_post_file]
60 changes: 60 additions & 0 deletions pulsar/client/transport/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import subprocess
import logging
log = logging.getLogger(__name__)


def rsync_get_file(uri_from, uri_to, user, host, port, key):
cmd = [
'rsync',
'-e',
'ssh -i %s -p %s -o StrictHostKeyChecking=no' % (key, port),
'%s@%s:%s' % (user, host, uri_from),
uri_to,
]
exit_code = subprocess.check_call(cmd)
if exit_code != 0:
raise Exception("Rsync exited with code %s" % exit_code)


def rsync_post_file(uri_from, uri_to, user, host, port, key):
cmd = [
'rsync',
'-e',
'ssh -i %s -p %s -o StrictHostKeyChecking=no' % (key, port),
uri_from,
'%s@%s:%s' % (user, host, uri_to),
]
exit_code = subprocess.check_call(cmd)
if exit_code != 0:
raise Exception("Rsync exited with code %s" % exit_code)


def scp_get_file(uri_from, uri_to, user, host, port, key):
cmd = [
'scp',
'-P', str(port),
'-i', key,
'-o', 'StrictHostKeyChecking=no',
'%s@%s:%s' % (user, host, uri_from),
uri_to,
]
exit_code = subprocess.check_call(cmd)
if exit_code != 0:
raise Exception("scp exited with code %s" % exit_code)


def scp_post_file(uri_from, uri_to, user, host, port, key):
cmd = [
'scp',
'-P', str(port),
'-i', key,
'-o', 'StrictHostKeyChecking=no',
uri_from,
'%s@%s:%s' % (user, host, uri_to),
]
exit_code = subprocess.check_call(cmd)
if exit_code != 0:
raise Exception("scp exited with code %s" % exit_code)


___all__ = [rsync_post_file, rsync_get_file, scp_post_file, scp_get_file]
6 changes: 6 additions & 0 deletions pulsar/managers/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def launch(self, job_id, *args, **kwargs):
def do_preprocess():
try:
staging_config = job_directory.load_metadata("staging_config", {})
# TODO: swap out for a generic "job_extra_params"
if 'action_mapper' in staging_config and \
'ssh_key' in staging_config['action_mapper'] and \
'setup' in staging_config:
for action in staging_config['setup']:
action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key'])
preprocess(job_directory, staging_config.get("setup", []), self.__preprocess_action_executor)
self._proxied_manager.launch(job_id, *args, **kwargs)
with job_directory.lock("status"):
Expand Down

0 comments on commit 91ec064

Please sign in to comment.