Skip to content

Commit

Permalink
Hack RNA STAR into the terrible One Dynamic Rule To Rule Them All
Browse files Browse the repository at this point in the history
  • Loading branch information
natefoo committed Jun 23, 2017
1 parent 5f46d03 commit 49a63ad
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 37 deletions.
119 changes: 86 additions & 33 deletions env/common/files/galaxy/dynamic_rules/multi_dynamic_walltime.py
Expand Up @@ -2,6 +2,7 @@
## This file is maintained by Ansible - CHANGES WILL BE OVERWRITTEN
##

import os
import datetime
import logging
import re
Expand All @@ -24,7 +25,7 @@
# bowtie_color_wrapper same as bwa_wrapper
# bwa_color_wrapper same as bowtie_wrapper

JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks' )
JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks', 'rna_star' )
PUNT_TOOLS = ( 'bwa_wrapper', 'bowtie2', 'bowtie_wrapper', 'tophat', 'tophat2', 'bwa', 'bwa_mem' )
GENOME_SOURCE_PARAMS = ( 'genomeSource.refGenomeSource', 'reference_genome.source', 'refGenomeSource.genomeSource', 'reference_source.reference_source_selector' )
GENOME_SOURCE_VALUES = ( 'indexed', 'cached' )
Expand All @@ -35,9 +36,16 @@
STAMPEDE_DESTINATION = 'pulsar_stampede_normal'
STAMPEDE_DEVELOPMENT_DESTINATION = 'pulsar_stampede_development'
STAMPEDE_DESTINATIONS = (STAMPEDE_DESTINATION, STAMPEDE_DEVELOPMENT_DESTINATION)
BRIDGES_DESTINATION = 'pulsar_bridges_normal'
BRIDGES_DEVELOPMENT_DESTINATION = 'pulsar_bridges_development'
BRIDGES_DESTINATIONS = (BRIDGES_DESTINATION, BRIDGES_DEVELOPMENT_DESTINATION)
JETSTREAM_DESTINATIONS = ('jetstream_multi',)
VALID_DESTINATIONS = (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + STAMPEDE_DESTINATIONS + JETSTREAM_DESTINATIONS # WHY ARE WE SHOUTING
RESOURCES = {'tacc_compute_resource':VALID_DESTINATIONS, 'stampede_compute_resource':STAMPEDE_DESTINATIONS}
RESOURCES = {
'tacc_compute_resource': VALID_DESTINATIONS,
'multi_bridges_compute_resource': (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + JETSTREAM_DESTINATIONS + BRIDGES_DESTINATIONS,
'stampede_compute_resource': STAMPEDE_DESTINATIONS,
}
FAILURE_MESSAGE = 'This tool could not be run because of a misconfiguration in the Galaxy job running system, please report this error'

RESERVED_USERS = (
Expand Down Expand Up @@ -130,6 +138,27 @@
}


def _rnastar(app, param_dict):
source = param_dict['refGenomeSource']['geneSource']
if source == 'indexed':
build = param_dict['refGenomeSource']['GTFconditional']['genomeDir']
path = '%s/SA' % app.tool_data_tables.get('rnastar_index2').get_entry('value', build, 'path')
dest = None
else:
# Avoid the expense of staging large genome files
path = param_dict['refGenomeSource']['genomeFastaFiles'].get_file_name()
dest = LOCAL_DESTINATION
need_mb = (os.stat(path).st_size / 1024 / 1024) * 1.3 + 2048
if need_mb < 8192:
# In testing, very small jobs needed more than the formula above, so guarantee everyone gets at least 8 GB
need_mb = 8192
elif need_mb > 40960:
# 147456 MB == 144 GB (3 cores) (128GB is the minimum for LM)
dest = BRIDGES_DESTINATION
need_mb = 147456
return (dest, need_mb)


def _set_walltime(tool_id, native_spec):
walltime = WALLTIMES.get(tool_id, None)
if walltime:
Expand All @@ -140,10 +169,11 @@ def _set_walltime(tool_id, native_spec):
return native_spec


def __rule(app, tool, job, user_email, resource):
def __rule(app, tool, job, user_email, resource_params, resource):
destination = None
destination_id = None
default_destination_id = RESOURCES[resource][0]
explicit_destination = False
tool_id = tool.id

if '/' in tool.id:
Expand All @@ -157,42 +187,38 @@ def __rule(app, tool, job, user_email, resource):
param_dict = tool.params_from_strings( param_dict, app )

# Explcitly set the destination if the user has chosen to do so with the resource selector
if '__job_resource' in param_dict:
if param_dict['__job_resource']['__job_resource__select'] == 'yes':
resource_key = None
for resource_key in param_dict['__job_resource'].keys():
if resource_key == resource:
destination_id = param_dict['__job_resource'][resource_key]
if destination_id in RESOURCES[resource_key]:
break
elif destination_id == TEAM_DESTINATION:
break
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id)
raise JobMappingException( FAILURE_MESSAGE )
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid value for selector: %s', job.id, param_dict['__job_resource']['__job_resource__select'])
raise JobMappingException( FAILURE_MESSAGE )
elif param_dict['__job_resource']['__job_resource__select'] == 'no':
# job will be sent to the default
if user_email.lower() in NORM_RESERVED_USERS:
log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email)
return RESERVED_DESTINATION
if resource_params:
resource_key = None
for resource_key in resource_params.keys():
if resource_key == resource:
destination_id = resource_params[resource_key]
if destination_id in RESOURCES[resource_key]:
explicit_destination = True
break
elif destination_id == TEAM_DESTINATION:
explicit_destination = True
break
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id)
raise JobMappingException(FAILURE_MESSAGE)
else:
log.warning('(%s) Destination/walltime dynamic plugin did not find a valid resource key, keys were: %s', job.id, param_dict['__job_resource'].keys())
raise JobMappingException( FAILURE_MESSAGE )
log.warning('(%s) Destination/walltime dynamic plugin did not receive a valid resource key, resource params were: %s', job.id, resource_params)
raise JobMappingException(FAILURE_MESSAGE)
else:
log.warning('(%s) Destination/walltime dynamic plugin did not receive the __job_resource param, keys were: %s', job.id, param_dict.keys())
raise JobMappingException( FAILURE_MESSAGE )
# if __job_resource is not passed or __job_resource_select is not set to a "yes-like" value, resource_params is an empty dict
if user_email.lower() in NORM_RESERVED_USERS:
log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email)
return RESERVED_DESTINATION

if destination_id == TEAM_DESTINATION:
if user_email in TEAM_USERS:
destination_id = TEAM_DESTINATION
destination = app.job_config.get_destination( TEAM_DESTINATION )
destination.params['nativeSpecification'] += ' --ntasks=%s' % param_dict['__job_resource']['team_cpus']
destination = app.job_config.get_destination(TEAM_DESTINATION)
destination.params['nativeSpecification'] += ' --ntasks=%s' % resource_params['team_cpus']
else:
log.warning("(%s) Unauthorized user '%s' selected team development destination", job.id, user_email)
destination_id = LOCAL_DESTINATION
explicit_destination = False

# Only allow stampede if a cached reference is selected
if destination_id in STAMPEDE_DESTINATIONS and tool_id in PUNT_TOOLS:
Expand All @@ -215,17 +241,36 @@ def __rule(app, tool, job, user_email, resource):
destination_id = default_destination_id

# Some tools do not react well to Jetstream
if destination_id is None and tool_id not in JETSTREAM_TOOLS:
if not explicit_destination and tool_id not in JETSTREAM_TOOLS:
log.info('(%s) Default destination requested and tool is not in Jetstream-approved list, job will be sent to local cluster', job.id)
destination_id = default_destination_id

# FIXME: this is getting really messy
mem_mb = None

if resource == 'multi_bridges_compute_resource' and tool_id == 'rna_star':
# FIXME: special casing
try:
_destination_id, mem_mb = _rnastar(app, param_dict)
if not explicit_destination:
destination_id = _destination_id
if destination_id == BRIDGES_DESTINATION:
destination = app.job_config.get_destination(destination_id)
destination.params['submit_native_specification'] += ' --time=48:00:00'
destination.params['submit_native_specification'] += ' --mem=%s' % mem_mb
except:
log.exception('(%s) Error determining parameters for STAR job', job.id)
raise JobMappingException(FAILURE_MESSAGE)

# Need to explicitly pick a destination because of staging. Otherwise we
# could just submit with --clusters=a,b,c and let slurm sort it out
if destination_id in JETSTREAM_DESTINATIONS + (None,) and default_destination_id == LOCAL_DESTINATION:
test_destination_id = destination_id or default_destination_id
clusters = ','.join(JETSTREAM_DESTINATION_MAPS[test_destination_id]['clusters'])
native_specification = app.job_config.get_destination(test_destination_id).params.get('nativeSpecification', '')
native_specification = _set_walltime(tool_id, native_specification)
if mem_mb:
native_specification += ' --mem=%s' % mem_mb
sbatch_test_cmd = ['sbatch', '--test-only', '--clusters=%s' % clusters] + native_specification.split() + [TEST_SCRIPT]
log.debug('Testing job submission to determine suitable cluster: %s', ' '.join(sbatch_test_cmd))

Expand Down Expand Up @@ -256,6 +301,11 @@ def __rule(app, tool, job, user_email, resource):
if destination_id is None:
destination_id = default_destination_id

if mem_mb:
if destination is None:
destination = app.job_config.get_destination(destination_id)
destination.params['nativeSpecification'] += ' --mem=%s' % mem_mb

# Set a walltime if local is the destination and this is a dynamic walltime tool
if destination_id == LOCAL_DESTINATION and tool_id in RUNTIMES:
destination_id = LOCAL_WALLTIME_DESTINATION
Expand All @@ -274,8 +324,11 @@ def __rule(app, tool, job, user_email, resource):
log.debug(" nativeSpecification is: %s", destination.params['nativeSpecification'])
return destination or destination_id

def dynamic_local_stampede_select_dynamic_walltime( app, tool, job, user_email ):
return __rule( app, tool, job, user_email, 'tacc_compute_resource' )
def dynamic_local_stampede_select_dynamic_walltime(app, tool, job, user_email, resource_params):
return __rule(app, tool, job, user_email, resource_params, 'tacc_compute_resource')

def dynamic_multi_bridges_select(app, tool, job, user_email, resource_params):
return __rule(app, tool, job, user_email, resource_params, 'multi_bridges_compute_resource')

def dynamic_stampede_select( app, tool, job, user_email ):
return __rule( app, tool, job, user_email, 'stampede_compute_resource' )
7 changes: 7 additions & 0 deletions env/test/files/galaxy/config/job_resource_params_conf.xml
Expand Up @@ -16,6 +16,13 @@
<option value="8">8</option>
<option value="16">16</option>
</param>
<param label="Compute Resource" name="multi_bridges_compute_resource" type="select" help="Need help selecting a compute resource? Options and limits are explained in detail &lt;a href='https://wiki.galaxyproject.org/Test' target='_blank'&gt;in the Galaxy Wiki&lt;/a&gt;">
<option value="slurm_multi">Galaxy cluster (default)</option>
<option value="jetstream_multi">Jetstream</option>
<option value="pulsar_bridges_normal">PSC Bridges</option>
<option value="slurm_multi_development">Galaxy cluster test/development</option>
<option value="pular_bridges_development">PSC Bridges test/development</option>
</param>
<param label="Compute Resource" name="stampede_compute_resource" type="select" help="Need help selecting a compute resource? Options and limits are explained in detail &lt;a href='https://wiki.galaxyproject.org/Test' target='_blank'&gt;in the Galaxy Wiki&lt;/a&gt;">
<option value="pulsar_stampede_normal">TACC Stampede (default)</option>
<option value="pulsar_stampede_development">TACC Stampede test/development</option>
Expand Down
19 changes: 15 additions & 4 deletions env/test/templates/galaxy/config/job_conf.xml.j2
Expand Up @@ -115,6 +115,10 @@
<handler id="test_handler2">
<plugin id="amqp_pulsar_bridges"/>
<plugin id="amqp_pulsar_bridges_nagios"/>
<plugin id="pulsar_jetstream_iu"/>
<plugin id="pulsar_jetstream_iu_nagios"/>
<plugin id="pulsar_jetstream_tacc"/>
<plugin id="pulsar_jetstream_tacc_nagios"/>
<plugin id="slurm"/>
</handler>
<handler id="test_handler3" tags="multi_handlers">
Expand Down Expand Up @@ -168,6 +172,11 @@
<param id="function">dynamic_local_stampede_select_dynamic_walltime</param>
</destination>

<destination id="dynamic_multi_bridges_select" runner="dynamic">
<param id="type">python</param>
<param id="function">dynamic_multi_bridges_select</param>
</destination>

<!-- dynamic method for the stampede-only selector -->
<destination id="dynamic_stampede_select" runner="dynamic">
<param id="type">python</param>
Expand Down Expand Up @@ -230,7 +239,7 @@
<env file="/cvmfs/test.galaxyproject.org/venv/bin/activate" />
</destination>
<destination id="reserved_multi" runner="slurm">
<param id="nativeSpecification">--time=36:00:00 --clusters=roundup --nodes=1 --ntasks=4 --partition=reserved --mem-per-cpu=5120</param>
<param id="nativeSpecification">--time=36:00:00 --clusters=roundup --nodes=1 --ntasks=6 --partition=reserved --mem-per-cpu=5120</param>
<env id="PATH">/galaxy/test/linux-x86_64/bin:/galaxy/software/linux-x86_64/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin</env>
<env id="LC_ALL">C</env>
<env id="TEMP">$(dirname ${BASH_SOURCE[0]})/_galaxy_tmp</env>
Expand Down Expand Up @@ -288,7 +297,7 @@
</destination>

<destination id="slurm_multi" runner="slurm">
<param id="nativeSpecification">--time=24:00:00 --nodes=1 --ntasks=4 --partition=multi</param>
<param id="nativeSpecification">--time=24:00:00 --nodes=1 --ntasks=6 --partition=multi</param>
<env id="PATH">/galaxy/test/linux-x86_64/bin:/galaxy/software/linux-x86_64/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin</env>
<env id="LC_ALL">C</env>
<env id="TEMP">$(dirname ${BASH_SOURCE[0]})/_galaxy_tmp</env>
Expand Down Expand Up @@ -389,7 +398,7 @@
<!-- dynamic modified destinations: do not use directly -->
<destination id="slurm_multi_dynamic_walltime" runner="slurm">
<!-- walltime is set dynamically -->
<param id="nativeSpecification">--nodes=1 --ntasks=4 --partition=multi</param>
<param id="nativeSpecification">--nodes=1 --ntasks=6 --partition=multi</param>
<resubmit condition="walltime_reached" destination="pulsar_stampede_normal" handler="test_handler4"/>
<env file="/cvmfs/test.galaxyproject.org/venv/bin/activate" />
</destination>
Expand Down Expand Up @@ -543,11 +552,12 @@
<tool id="stringtie" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<tool id="hisat2" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<tool id="prokka" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<!-- roundup multi/jetstream/bridges jobs -->
<tool id="rna_star" destination="dynamic_multi_bridges_select" handler="test_handler2" resources="multi_or_bridges"/>
<!-- bridges jobs -->
<tool id="trinity_psc" destination="dynamic_bridges_select" handler="test_handler2" resources="bridges"/>
<tool id="spades" destination="dynamic_bridges_select" handler="test_handler2" resources="bridges"/>
<tool id="rnaspades" destination="dynamic_bridges_select" handler="test_handler2" resources="bridges"/>
<tool id="rna_star" destination="dynamic_bridges_select" handler="test_handler2" resources="bridges"/>
<!-- trackster jobs -->
<tool id="cufflinks" destination="slurm_multi_trackster">
<param id="source">trackster</param>
Expand Down Expand Up @@ -615,6 +625,7 @@
<resources default="default">
<group id="default"></group>
<group id="local_or_stampede">tacc_compute_resource,team_cpus</group>
<group id="multi_or_bridges">multi_bridges_compute_resource</group>
<group id="stampede">stampede_compute_resource</group>
<group id="bridges">bridges_compute_resource</group>
</resources>
Expand Down

0 comments on commit 49a63ad

Please sign in to comment.