Skip to content

Commit

Permalink
Quick and dirty dynamic rule changes for RNA STAR
Browse files Browse the repository at this point in the history
  • Loading branch information
natefoo committed Jun 23, 2017
1 parent 8ae4c07 commit c3f3155
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 35 deletions.
139 changes: 105 additions & 34 deletions files/galaxy/common/dynamic_rules/multi_dynamic_walltime.py
Expand Up @@ -2,6 +2,7 @@
## This file is maintained by Ansible - CHANGES WILL BE OVERWRITTEN
##

import os
import datetime
import logging
import re
Expand All @@ -24,7 +25,7 @@
# bowtie_color_wrapper same as bwa_wrapper
# bwa_color_wrapper same as bowtie_wrapper

JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks' )
JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks', 'rna_star' )
PUNT_TOOLS = ( 'bwa_wrapper', 'bowtie2', 'bowtie_wrapper', 'tophat', 'tophat2', 'bwa', 'bwa_mem' )
GENOME_SOURCE_PARAMS = ( 'genomeSource.refGenomeSource', 'reference_genome.source', 'refGenomeSource.genomeSource', 'reference_source.reference_source_selector' )
GENOME_SOURCE_VALUES = ( 'indexed', 'cached' )
Expand All @@ -35,9 +36,16 @@
STAMPEDE_DESTINATION = 'pulsar_stampede_normal'
STAMPEDE_DEVELOPMENT_DESTINATION = 'pulsar_stampede_development'
STAMPEDE_DESTINATIONS = (STAMPEDE_DESTINATION, STAMPEDE_DEVELOPMENT_DESTINATION)
BRIDGES_DESTINATION = 'pulsar_bridges_normal'
BRIDGES_DEVELOPMENT_DESTINATION = 'pulsar_bridges_development'
BRIDGES_DESTINATIONS = (BRIDGES_DESTINATION, BRIDGES_DEVELOPMENT_DESTINATION)
JETSTREAM_DESTINATIONS = ('jetstream_multi',)
VALID_DESTINATIONS = (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + STAMPEDE_DESTINATIONS + JETSTREAM_DESTINATIONS # WHY ARE WE SHOUTING
RESOURCES = {'tacc_compute_resource':VALID_DESTINATIONS, 'stampede_compute_resource':STAMPEDE_DESTINATIONS}
RESOURCES = {
'tacc_compute_resource': VALID_DESTINATIONS,
'multi_bridges_compute_resource': (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + JETSTREAM_DESTINATIONS + BRIDGES_DESTINATIONS,
'stampede_compute_resource': STAMPEDE_DESTINATIONS,
}
FAILURE_MESSAGE = 'This tool could not be run because of a misconfiguration in the Galaxy job running system, please report this error'

RESERVED_USERS = (
Expand Down Expand Up @@ -130,6 +138,35 @@
}


def _rnastar(app, param_dict, destination_id, explicit_destination):
source = param_dict['refGenomeSource']['geneSource']
if source == 'indexed':
build = param_dict['refGenomeSource']['GTFconditional']['genomeDir']
path = '%s/SA' % app.tool_data_tables.get('rnastar_index2').get_entry('value', build, 'path')
destination_id = None if not explicit_destination else destination_id
need_mb = (os.stat(path).st_size / 1024 / 1024) * 1.3 + 2048
else:
# Avoid the expense of staging large genome files
path = param_dict['refGenomeSource']['genomeFastaFiles'].get_file_name()
destination_id = LOCAL_DESTINATION if not explicit_destination else destination_id
need_mb = (os.stat(path).st_size / 1024 / 1024) * 10.0 + 2048
if need_mb < 8192:
# In testing, very small jobs needed more than the formula above, so guarantee everyone gets at least 8 GB
need_mb = 8192
elif need_mb > 40960 and not explicit_destination:
# 147456 MB == 144 GB (3 cores) (128GB is the minimum for LM)
destination_id = BRIDGES_DESTINATION
need_mb = 147456
if explicit_destination:
if destination_id in (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION, RESERVED_DESTINATION) and need_mb > 40960:
need_mb = 40960
elif destination_id in JETSTREAM_DESTINATIONS:
pass # won't be set anyway
elif destination_id in BRIDGES_DESTINATIONS:
need_mb = 147456
return (destination_id, int(need_mb))


def _set_walltime(tool_id, native_spec):
walltime = WALLTIMES.get(tool_id, None)
if walltime:
Expand All @@ -140,10 +177,11 @@ def _set_walltime(tool_id, native_spec):
return native_spec


def __rule(app, tool, job, user_email, resource):
def __rule(app, tool, job, user_email, resource_params, resource):
destination = None
destination_id = None
default_destination_id = RESOURCES[resource][0]
explicit_destination = False
tool_id = tool.id

if '/' in tool.id:
Expand All @@ -157,42 +195,39 @@ def __rule(app, tool, job, user_email, resource):
param_dict = tool.params_from_strings( param_dict, app )

# Explcitly set the destination if the user has chosen to do so with the resource selector
if '__job_resource' in param_dict:
if param_dict['__job_resource']['__job_resource__select'] == 'yes':
resource_key = None
for resource_key in param_dict['__job_resource'].keys():
if resource_key == resource:
destination_id = param_dict['__job_resource'][resource_key]
if destination_id in RESOURCES[resource_key]:
break
elif destination_id == TEAM_DESTINATION:
break
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id)
raise JobMappingException( FAILURE_MESSAGE )
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid value for selector: %s', job.id, param_dict['__job_resource']['__job_resource__select'])
raise JobMappingException( FAILURE_MESSAGE )
elif param_dict['__job_resource']['__job_resource__select'] == 'no':
# job will be sent to the default
if user_email.lower() in NORM_RESERVED_USERS:
log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email)
return RESERVED_DESTINATION
if resource_params:
resource_key = None
for resource_key in resource_params.keys():
if resource_key == resource:
destination_id = resource_params[resource_key]
if destination_id in RESOURCES[resource_key]:
explicit_destination = True
break
elif destination_id == TEAM_DESTINATION:
explicit_destination = True
break
else:
log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id)
raise JobMappingException(FAILURE_MESSAGE)
else:
log.warning('(%s) Destination/walltime dynamic plugin did not find a valid resource key, keys were: %s', job.id, param_dict['__job_resource'].keys())
raise JobMappingException( FAILURE_MESSAGE )
log.warning('(%s) Destination/walltime dynamic plugin did not receive a valid resource key, resource params were: %s', job.id, resource_params)
raise JobMappingException(FAILURE_MESSAGE)
else:
log.warning('(%s) Destination/walltime dynamic plugin did not receive the __job_resource param, keys were: %s', job.id, param_dict.keys())
raise JobMappingException( FAILURE_MESSAGE )
# if __job_resource is not passed or __job_resource_select is not set to a "yes-like" value, resource_params is an empty dict
if user_email.lower() in NORM_RESERVED_USERS:
log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email)
#return RESERVED_DESTINATION
destination_id = RESERVED_DESTINATION

if destination_id == TEAM_DESTINATION:
if user_email in TEAM_USERS:
destination_id = TEAM_DESTINATION
destination = app.job_config.get_destination( TEAM_DESTINATION )
destination.params['nativeSpecification'] += ' --ntasks=%s' % param_dict['__job_resource']['team_cpus']
destination = app.job_config.get_destination(TEAM_DESTINATION)
destination.params['nativeSpecification'] += ' --ntasks=%s' % resource_params['team_cpus']
else:
log.warning("(%s) Unauthorized user '%s' selected team development destination", job.id, user_email)
destination_id = LOCAL_DESTINATION
explicit_destination = False

# Only allow stampede if a cached reference is selected
if destination_id in STAMPEDE_DESTINATIONS and tool_id in PUNT_TOOLS:
Expand All @@ -215,17 +250,35 @@ def __rule(app, tool, job, user_email, resource):
destination_id = default_destination_id

# Some tools do not react well to Jetstream
if destination_id is None and tool_id not in JETSTREAM_TOOLS:
if not explicit_destination and tool_id not in JETSTREAM_TOOLS:
log.info('(%s) Default destination requested and tool is not in Jetstream-approved list, job will be sent to local cluster', job.id)
destination_id = default_destination_id

# FIXME: this is getting really messy
mem_mb = None

if resource == 'multi_bridges_compute_resource' and tool_id == 'rna_star':
# FIXME: special casing
try:
_destination_id, mem_mb = _rnastar(app, param_dict, destination_id, explicit_destination)
if not explicit_destination:
destination_id = _destination_id
if destination_id == BRIDGES_DESTINATION:
destination = app.job_config.get_destination(destination_id)
destination.params['submit_native_specification'] += ' --time=48:00:00'
except:
log.exception('(%s) Error determining parameters for STAR job', job.id)
raise JobMappingException(FAILURE_MESSAGE)

# Need to explicitly pick a destination because of staging. Otherwise we
# could just submit with --clusters=a,b,c and let slurm sort it out
if destination_id in JETSTREAM_DESTINATIONS + (None,) and default_destination_id == LOCAL_DESTINATION:
test_destination_id = destination_id or default_destination_id
clusters = ','.join(JETSTREAM_DESTINATION_MAPS[test_destination_id]['clusters'])
native_specification = app.job_config.get_destination(test_destination_id).params.get('nativeSpecification', '')
native_specification = _set_walltime(tool_id, native_specification)
if mem_mb:
native_specification += ' --mem=%s' % mem_mb
sbatch_test_cmd = ['sbatch', '--test-only', '--clusters=%s' % clusters] + native_specification.split() + [TEST_SCRIPT]
log.debug('Testing job submission to determine suitable cluster: %s', ' '.join(sbatch_test_cmd))

Expand All @@ -251,11 +304,26 @@ def __rule(app, tool, job, user_email, resource):

destination_id = '%s_%s' % (destination_prefix, JETSTREAM_DESTINATION_MAPS[default_destination_id]['partition'])
destination = app.job_config.get_destination(destination_id)
destination.params['nativeSpecification'] = _set_walltime(tool_id, destination.params.get('nativeSpecification', ''))
# FIXME: aaaaah i just need this to work for now
if destination_id.startswith('jetstream'):
destination.params['submit_native_specification'] = _set_walltime(tool_id, destination.params.get('submit_native_specification', ''))
else:
destination.params['nativeSpecification'] = _set_walltime(tool_id, destination.params.get('nativeSpecification', ''))

if destination_id is None:
destination_id = default_destination_id

if mem_mb:
if destination is None:
destination = app.job_config.get_destination(destination_id)
# FIXME: and here wow such mess
if destination_id in (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION, RESERVED_DESTINATION):
destination.params['nativeSpecification'] += ' --mem=%s' % mem_mb
elif destination_id.startswith('jetstream'):
pass # don't set --mem, you get the whole node anyway
elif destination_id in BRIDGES_DESTINATIONS:
destination.params['submit_native_specification'] += ' --mem=%s' % mem_mb

# Set a walltime if local is the destination and this is a dynamic walltime tool
if destination_id == LOCAL_DESTINATION and tool_id in RUNTIMES:
destination_id = LOCAL_WALLTIME_DESTINATION
Expand All @@ -274,8 +342,11 @@ def __rule(app, tool, job, user_email, resource):
log.debug(" nativeSpecification is: %s", destination.params['nativeSpecification'])
return destination or destination_id

def dynamic_local_stampede_select_dynamic_walltime( app, tool, job, user_email ):
return __rule( app, tool, job, user_email, 'tacc_compute_resource' )
def dynamic_local_stampede_select_dynamic_walltime(app, tool, job, user_email, resource_params):
return __rule(app, tool, job, user_email, resource_params, 'tacc_compute_resource')

def dynamic_multi_bridges_select(app, tool, job, user_email, resource_params):
return __rule(app, tool, job, user_email, resource_params, 'multi_bridges_compute_resource')

def dynamic_stampede_select( app, tool, job, user_email ):
return __rule( app, tool, job, user_email, 'stampede_compute_resource' )
Expand Up @@ -9,6 +9,13 @@
<option value="pulsar_stampede_development">TACC Stampede test/development (beta)</option>
<option value="jetstream_multi">Jetstream</option>
</param>
<param label="Compute Resource" name="multi_bridges_compute_resource" type="select" help="Need help selecting a compute resource? Options and limits are explained in detail &lt;a href='https://galaxyproject.org/main' target='_blank'&gt;in the Galaxy Hub&lt;/a&gt;">
<option value="slurm_multi">Galaxy cluster (default)</option>
<option value="jetstream_multi">Jetstream</option>
<option value="pulsar_bridges_normal">PSC Bridges</option>
<option value="slurm_multi_development">Galaxy cluster test/development</option>
<option value="pular_bridges_development">PSC Bridges test/development</option>
</param>
<param label="Compute Resource" name="stampede_compute_resource" type="select" help="Need help selecting a compute resource? Options and limits are explained in detail &lt;a href='https://galaxyproject.org/main/' target='_blank'&gt;in the Galaxy Hub&lt;/a&gt;">
<option value="pulsar_stampede_normal">TACC Stampede (default)</option>
<option value="pulsar_stampede_development">TACC Stampede test/development</option>
Expand Down
11 changes: 10 additions & 1 deletion templates/galaxy/usegalaxy.org/config/job_conf.xml.j2
Expand Up @@ -196,6 +196,11 @@
<param id="function">dynamic_local_stampede_select_dynamic_walltime</param>
</destination>

<destination id="dynamic_multi_bridges_select" runner="dynamic">
<param id="type">python</param>
<param id="function">dynamic_multi_bridges_select</param>
</destination>

<!-- dynamic method for the stampede-only selector -->
<destination id="dynamic_stampede_select" runner="dynamic">
<param id="type">python</param>
Expand Down Expand Up @@ -516,6 +521,7 @@
<!-- use constraint to avoid running on xl nodes, which do not mount /pylon5 -->
<param id="submit_native_specification">-p LM --constraint=LM</param>
<param id="submit_user_email">$__user_email__</param>
<param id="file_action_config">{{ galaxy_config_dir }}/pulsar_bridges_actions.yml</param>
<env exec="eval `modulecmd sh purge`" />
<env file="/pylon2/mc48nsp/xcgalaxy/main/galaxy/venv/bin/activate" />
</destination>
Expand All @@ -533,6 +539,7 @@
<!-- 147456 MB == 144 GB (3 cores) (128GB is the minimum for LM) -->
<param id="submit_native_specification">-p LM -t 00:30:00 --mem=147456</param>
<param id="submit_user_email">$__user_email__</param>
<param id="file_action_config">{{ galaxy_config_dir }}/pulsar_bridges_actions.yml</param>
<env exec="eval `modulecmd sh purge`" />
<env file="/pylon2/mc48nsp/xcgalaxy/main/galaxy/venv/bin/activate" />
</destination>
Expand Down Expand Up @@ -583,7 +590,8 @@
<tool id="deeptools_plot_coverage" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<tool id="deeptools_plot_fingerprint" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<tool id="kraken" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<tool id="rna_star" destination="dynamic_local_stampede_select_dynamic_walltime" handler="multi_handlers" resources="local_or_stampede"/>
<!-- roundup multi/jetstream/bridges jobs -->
<tool id="rna_star" destination="dynamic_multi_bridges_select" handler="multi_handlers" resources="multi_or_bridges"/>
<!-- explicit stampede jobs -->
<tool id="lastz_wrapper_2" destination="dynamic_stampede_select" handler="multi_handlers" resources="stampede"/>
<tool id="megablast_wrapper" destination="dynamic_stampede_select" handler="multi_handlers" resources="stampede"/>
Expand Down Expand Up @@ -687,6 +695,7 @@
<resources default="default">
<group id="default"></group>
<group id="local_or_stampede">tacc_compute_resource</group>
<group id="multi_or_bridges">multi_bridges_compute_resource</group>
<group id="stampede">stampede_compute_resource</group>
<group id="bridges">bridges_compute_resource</group>
</resources>
Expand Down

0 comments on commit c3f3155

Please sign in to comment.