Skip to content

Commit

Permalink
Merge branch 'release_18.05' into release_18.09
Browse files Browse the repository at this point in the history
  • Loading branch information
nsoranzo committed Oct 12, 2018
2 parents 2c14c2a + a8fa5a1 commit d22f652
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 20 deletions.
Expand Up @@ -9,8 +9,8 @@ ie_request.load_deploy_config()
ie_request.attr.docker_port = 80
ie_request.attr.import_volume = False
bam = ie_request.volume(hda.file_name, '/input/bamfile.bam', how='ro')
bam_index = ie_request.volume(hda.metadata.bam_index.file_name, '/input/bamfile.bam.bai', how='ro')
bam = ie_request.volume(hda.file_name, '/input/bamfile.bam', mode='ro')
bam_index = ie_request.volume(hda.metadata.bam_index.file_name, '/input/bamfile.bam.bai', mode='ro')
ie_request.launch(volumes=[bam, bam_index], env_override={
'PUB_HTTP_PORT': ie_request.attr.galaxy_config.dynamic_proxy_bind_port,
Expand Down
Expand Up @@ -9,7 +9,7 @@
import os
mount_path = hda.file_name
data_vol = ie_request.volume(mount_path, '/data/data_pack.tar.gz', how='rw')
data_vol = ie_request.volume(mount_path, '/data/data_pack.tar.gz', mode='rw')
# Add all environment variables collected from Galaxy's IE infrastructure
# Launch the IE.
Expand Down
Expand Up @@ -7,7 +7,7 @@
# if the user knows ahead of time that they will need it.
import os
mount_path = str(os.path.dirname(hda.file_name)) + '/dataset_{}_files'.format( hda.dataset.id )
data_vol = ie_request.volume(mount_path, '/data', how='rw')
data_vol = ie_request.volume(mount_path, '/data', mode='rw')
# data_vol = ie_request.volume('${HOME}/neo4j/data', '/data/', how='rw')
# Add all environment variables collected from Galaxy's IE infrastructure
# Launch the IE.
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/util/cli/shell/local.py
Expand Up @@ -44,7 +44,7 @@ def __init__(self, **kwds):

def execute(self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds):
outf = TemporaryFile()
p = Popen(cmd, shell=True, stdin=None, stdout=outf, stderr=PIPE)
p = Popen(cmd, stdin=None, stdout=outf, stderr=PIPE)
# poll until timeout

for i in range(int(timeout / timeout_check_interval)):
Expand Down
25 changes: 13 additions & 12 deletions lib/galaxy/jobs/runners/util/cli/shell/rsh.py
Expand Up @@ -19,20 +19,23 @@

class RemoteShell(LocalShell):

def __init__(self, rsh='rsh', rcp='rcp', hostname='localhost', username=None, **kwargs):
def __init__(self, rsh='rsh', rcp='rcp', hostname='localhost', username=None, options=None, **kwargs):
super(RemoteShell, self).__init__(**kwargs)
self.rsh = rsh
self.rcp = rcp
self.hostname = hostname
self.username = username
self.options = options
self.sessions = {}

def execute(self, cmd, persist=False, timeout=60):
# TODO: implement persistence
if self.username is None:
fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd)
else:
fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd)
fullcmd = [self.rsh]
if self.options:
fullcmd.extend(self.options)
if self.username:
fullcmd.extend(["-l", self.username])
fullcmd.extend([self.hostname, cmd])
return super(RemoteShell, self).execute(fullcmd, persist, timeout)


Expand All @@ -41,15 +44,13 @@ class SecureShell(RemoteShell):

def __init__(self, rsh='ssh', rcp='scp', private_key=None, port=None, strict_host_key_checking=True, **kwargs):
strict_host_key_checking = "yes" if strict_host_key_checking else "no"
rsh += " -oStrictHostKeyChecking=%s -oConnectTimeout=60" % strict_host_key_checking
rcp += " -oStrictHostKeyChecking=%s -oConnectTimeout=60" % strict_host_key_checking
options = ["-o", "StrictHostKeyChecking=%s" % strict_host_key_checking]
options.extend(["-o", "ConnectTimeout=60"])
if private_key:
rsh += " -i %s" % private_key
rcp += " -i %s" % private_key
options.extend(['-i', private_key])
if port:
rsh += " -p %s" % port
rcp += " -p %s" % port
super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs)
options.extend(['-p', str(port)])
super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, options=options, **kwargs)


class ParamikoShell(object):
Expand Down
7 changes: 6 additions & 1 deletion lib/galaxy/util/monitors.py
Expand Up @@ -3,6 +3,7 @@
import logging
import threading

from galaxy.web.stack import register_postfork_function
from .sleeper import Sleeper

log = logging.getLogger(__name__)
Expand All @@ -27,7 +28,11 @@ def _init_monitor_thread(self, name, target_name=None, target=None, start=False,
self.sleeper = Sleeper()
self.monitor_thread = threading.Thread(name=name, target=monitor_func)
self.monitor_thread.setDaemon(True)
if start:
self._start = start
register_postfork_function(self.start_monitoring)

def start_monitoring(self):
if self._start:
self.monitor_thread.start()

def stop_monitoring(self):
Expand Down
2 changes: 2 additions & 0 deletions test/functional/tools/job_environment_default.xml
Expand Up @@ -8,6 +8,7 @@
echo `pwd` > '$pwd';
echo "\$HOME" > '$home';
echo "\$TMP" > '$tmp';
echo "\$SOME_ENV_VAR" > '$some_env_var';
]]></command>
<inputs>
</inputs>
Expand All @@ -17,6 +18,7 @@
<data name="pwd" format="txt" label="pwd" />
<data name="home" format="txt" label="home" />
<data name="tmp" format="txt" label="tmp" />
<data name="some_env_var" format="txt" label="env_var" />
</outputs>
<help>
</help>
Expand Down
2 changes: 2 additions & 0 deletions test/functional/tools/job_environment_default_legacy.xml
Expand Up @@ -8,6 +8,7 @@
echo `pwd` > '$pwd';
echo "\$HOME" > '$home';
echo "\$TMP" > '$tmp';
echo "\$SOME_ENV_VAR" > '$some_env_var';
]]></command>
<inputs>
</inputs>
Expand All @@ -17,6 +18,7 @@
<data name="pwd" format="txt" label="pwd" />
<data name="home" format="txt" label="home" />
<data name="tmp" format="txt" label="tmp" />
<data name="some_env_var" format="txt" label="env_var" />
</outputs>
<help>
</help>
Expand Down
Expand Up @@ -8,6 +8,7 @@
echo `pwd` > '$pwd';
echo "\$HOME" > '$home';
echo "\$TMP" > '$tmp';
echo "\$SOME_ENV_VAR" > '$some_env_var';
]]></command>
<inputs>
</inputs>
Expand All @@ -17,6 +18,7 @@
<data name="pwd" format="txt" label="pwd" />
<data name="home" format="txt" label="home" />
<data name="tmp" format="txt" label="tmp" />
<data name="some_env_var" format="txt" label="env_var" />
</outputs>
<help>
</help>
Expand Down
150 changes: 150 additions & 0 deletions test/integration/test_cli_runners.py
@@ -0,0 +1,150 @@
"""Integration tests for the CLI shell plugins and runners."""
import collections
import os
import string
import subprocess
import tempfile
import unittest

from Crypto.PublicKey import RSA

from base import integration_util # noqa: I100,I202
from base.populators import skip_without_tool
from .test_job_environments import BaseJobEnvironmentIntegrationTestCase # noqa: I201


def generate_keys():
key = RSA.generate(2048)
return (key.export_key(), key.publickey().export_key(format='OpenSSH'))


RemoteConnection = collections.namedtuple('remote_connection', ['hostname', 'username', 'password', 'port', 'private_key', 'public_key'])


@integration_util.skip_unless_docker()
def start_ssh_docker(container_name, jobs_directory, port=10022, image='agaveapi/slurm'):
private_key, public_key = generate_keys()
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(private_key)
private_key_file = f.name
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(public_key)
public_key_file = f.name
START_SLURM_DOCKER = ['docker',
'run',
'-h',
'localhost',
'-p',
'{port}:22'.format(port=port),
'-d',
'--name',
container_name,
'--rm',
'-v',
"{jobs_directory}:{jobs_directory}".format(jobs_directory=jobs_directory),
"-v",
"{public_key_file}:/home/testuser/.ssh/authorized_keys".format(public_key_file=public_key_file),
'--ulimit',
'nofile=2048:2048',
image]
subprocess.check_call(START_SLURM_DOCKER)
return RemoteConnection('localhost', 'testuser', 'testuser', port, private_key_file, public_key_file)


def stop_ssh_docker(container_name, remote_connection):
subprocess.check_call(['docker', 'rm', '-f', container_name])
os.remove(remote_connection.private_key)
os.remove(remote_connection.public_key)


def cli_job_config(remote_connection, shell_plugin='ParamikoShell', job_plugin='Slurm'):
job_conf_template = string.Template("""<job_conf>
<plugins>
<plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" workers="1"/>
</plugins>
<destinations default="ssh_slurm">
<destination id="ssh_slurm" runner="cli">
<param id="shell_plugin">$shell_plugin</param>
<param id="job_plugin">$job_plugin</param>
<param id="shell_username">$username</param>
<param id="shell_private_key">$private_key</param>
<param id="shell_hostname">$hostname</param>
<param id="shell_port">$port</param>
<param id="embed_metadata_in_job">False</param>
<env id="SOME_ENV_VAR">42</env>
</destination>
</destinations>
</job_conf>
""")
job_conf_str = job_conf_template.substitute(shell_plugin=shell_plugin,
job_plugin=job_plugin,
**remote_connection._asdict())
with tempfile.NamedTemporaryFile(suffix="_slurm_integration_job_conf", delete=False) as job_conf:
job_conf.write(job_conf_str)
return job_conf.name


class BaseCliIntegrationTestCase(BaseJobEnvironmentIntegrationTestCase):

@classmethod
def setUpClass(cls):
if cls is BaseCliIntegrationTestCase:
raise unittest.SkipTest("Base class")
cls.container_name = "%s_container" % cls.__name__
cls.jobs_directory = tempfile.mkdtemp()
cls.remote_connection = start_ssh_docker(container_name=cls.container_name,
jobs_directory=cls.jobs_directory,
image=cls.image)
super(BaseCliIntegrationTestCase, cls).setUpClass()

@classmethod
def tearDownClass(cls):
stop_ssh_docker(cls.container_name, cls.remote_connection)
super(BaseCliIntegrationTestCase, cls).tearDownClass()

@classmethod
def handle_galaxy_config_kwds(cls, config, ):
config["jobs_directory"] = cls.jobs_directory
config["file_path"] = cls.jobs_directory
config["job_config_file"] = cli_job_config(remote_connection=cls.remote_connection,
shell_plugin=cls.shell_plugin,
job_plugin=cls.job_plugin)

@skip_without_tool("job_environment_default")
def test_running_cli_job(self):
job_env = self._run_and_get_environment_properties()
assert job_env.some_env == '42'


class TorqueSetup(object):
job_plugin = 'Torque'
image = 'mvdbeek/galaxy-integration-docker-images:torque_latest'


class SlurmSetup(object):
job_plugin = 'Slurm'
image = 'mvdbeek/galaxy-integration-docker-images:slurm_latest'


class ParamikoShell(object):
shell_plugin = 'ParamikoShell'


class SecureShell(object):
shell_plugin = 'SecureShell'


class ParamikoCliSlurmIntegrationTestCase(SlurmSetup, ParamikoShell, BaseCliIntegrationTestCase):
pass


class ShellJobCliSlurmIntegrationTestCase(SlurmSetup, SecureShell, BaseCliIntegrationTestCase):
pass


class ParamikoCliTorqueIntegrationTestCase(TorqueSetup, ParamikoShell, BaseCliIntegrationTestCase):
pass


class ShellJobCliTorqueIntegrationTestCase(TorqueSetup, SecureShell, BaseCliIntegrationTestCase):
pass
5 changes: 3 additions & 2 deletions test/integration/test_job_environments.py
Expand Up @@ -21,6 +21,7 @@
"pwd",
"home",
"tmp",
"some_env",
])


Expand All @@ -38,8 +39,8 @@ def _environment_properties(self, history_id):
pwd = self.dataset_populator.get_history_dataset_content(history_id, hid=3).strip()
home = self.dataset_populator.get_history_dataset_content(history_id, hid=4).strip()
tmp = self.dataset_populator.get_history_dataset_content(history_id, hid=5).strip()

return JobEnviromentProperties(user_id, group_id, pwd, home, tmp)
some_env = self.dataset_populator.get_history_dataset_content(history_id, hid=6).strip()
return JobEnviromentProperties(user_id, group_id, pwd, home, tmp, some_env)


class BaseJobEnvironmentIntegrationTestCase(integration_util.IntegrationTestCase, RunsEnvironmentJobs):
Expand Down

0 comments on commit d22f652

Please sign in to comment.