From c3f315583d3449bd6269f0d6edbdd64300ca0efb Mon Sep 17 00:00:00 2001 From: Nate Coraor Date: Fri, 23 Jun 2017 15:30:54 -0400 Subject: [PATCH] Quick and dirty dynamic rule changes for RNA STAR --- .../dynamic_rules/multi_dynamic_walltime.py | 139 +++++++++++++----- .../config/job_resource_params_conf.xml | 7 + .../usegalaxy.org/config/job_conf.xml.j2 | 11 +- 3 files changed, 122 insertions(+), 35 deletions(-) diff --git a/files/galaxy/common/dynamic_rules/multi_dynamic_walltime.py b/files/galaxy/common/dynamic_rules/multi_dynamic_walltime.py index e36b93cc1..5ff52dd6a 100644 --- a/files/galaxy/common/dynamic_rules/multi_dynamic_walltime.py +++ b/files/galaxy/common/dynamic_rules/multi_dynamic_walltime.py @@ -2,6 +2,7 @@ ## This file is maintained by Ansible - CHANGES WILL BE OVERWRITTEN ## +import os import datetime import logging import re @@ -24,7 +25,7 @@ # bowtie_color_wrapper same as bwa_wrapper # bwa_color_wrapper same as bowtie_wrapper -JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks' ) +JETSTREAM_TOOLS = ( 'bowtie2', 'bwa', 'bwa_mem', 'tophat2', 'cufflinks', 'rna_star' ) PUNT_TOOLS = ( 'bwa_wrapper', 'bowtie2', 'bowtie_wrapper', 'tophat', 'tophat2', 'bwa', 'bwa_mem' ) GENOME_SOURCE_PARAMS = ( 'genomeSource.refGenomeSource', 'reference_genome.source', 'refGenomeSource.genomeSource', 'reference_source.reference_source_selector' ) GENOME_SOURCE_VALUES = ( 'indexed', 'cached' ) @@ -35,9 +36,16 @@ STAMPEDE_DESTINATION = 'pulsar_stampede_normal' STAMPEDE_DEVELOPMENT_DESTINATION = 'pulsar_stampede_development' STAMPEDE_DESTINATIONS = (STAMPEDE_DESTINATION, STAMPEDE_DEVELOPMENT_DESTINATION) +BRIDGES_DESTINATION = 'pulsar_bridges_normal' +BRIDGES_DEVELOPMENT_DESTINATION = 'pulsar_bridges_development' +BRIDGES_DESTINATIONS = (BRIDGES_DESTINATION, BRIDGES_DEVELOPMENT_DESTINATION) JETSTREAM_DESTINATIONS = ('jetstream_multi',) VALID_DESTINATIONS = (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + STAMPEDE_DESTINATIONS + JETSTREAM_DESTINATIONS # WHY ARE WE SHOUTING -RESOURCES = {'tacc_compute_resource':VALID_DESTINATIONS, 'stampede_compute_resource':STAMPEDE_DESTINATIONS} +RESOURCES = { + 'tacc_compute_resource': VALID_DESTINATIONS, + 'multi_bridges_compute_resource': (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION) + JETSTREAM_DESTINATIONS + BRIDGES_DESTINATIONS, + 'stampede_compute_resource': STAMPEDE_DESTINATIONS, +} FAILURE_MESSAGE = 'This tool could not be run because of a misconfiguration in the Galaxy job running system, please report this error' RESERVED_USERS = ( @@ -130,6 +138,35 @@ } +def _rnastar(app, param_dict, destination_id, explicit_destination): + source = param_dict['refGenomeSource']['geneSource'] + if source == 'indexed': + build = param_dict['refGenomeSource']['GTFconditional']['genomeDir'] + path = '%s/SA' % app.tool_data_tables.get('rnastar_index2').get_entry('value', build, 'path') + destination_id = None if not explicit_destination else destination_id + need_mb = (os.stat(path).st_size / 1024 / 1024) * 1.3 + 2048 + else: + # Avoid the expense of staging large genome files + path = param_dict['refGenomeSource']['genomeFastaFiles'].get_file_name() + destination_id = LOCAL_DESTINATION if not explicit_destination else destination_id + need_mb = (os.stat(path).st_size / 1024 / 1024) * 10.0 + 2048 + if need_mb < 8192: + # In testing, very small jobs needed more than the formula above, so guarantee everyone gets at least 8 GB + need_mb = 8192 + elif need_mb > 40960 and not explicit_destination: + # 147456 MB == 144 GB (3 cores) (128GB is the minimum for LM) + destination_id = BRIDGES_DESTINATION + need_mb = 147456 + if explicit_destination: + if destination_id in (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION, RESERVED_DESTINATION) and need_mb > 40960: + need_mb = 40960 + elif destination_id in JETSTREAM_DESTINATIONS: + pass # won't be set anyway + elif destination_id in BRIDGES_DESTINATIONS: + need_mb = 147456 + return (destination_id, int(need_mb)) + + def _set_walltime(tool_id, native_spec): walltime = WALLTIMES.get(tool_id, None) if walltime: @@ -140,10 +177,11 @@ def _set_walltime(tool_id, native_spec): return native_spec -def __rule(app, tool, job, user_email, resource): +def __rule(app, tool, job, user_email, resource_params, resource): destination = None destination_id = None default_destination_id = RESOURCES[resource][0] + explicit_destination = False tool_id = tool.id if '/' in tool.id: @@ -157,42 +195,39 @@ def __rule(app, tool, job, user_email, resource): param_dict = tool.params_from_strings( param_dict, app ) # Explcitly set the destination if the user has chosen to do so with the resource selector - if '__job_resource' in param_dict: - if param_dict['__job_resource']['__job_resource__select'] == 'yes': - resource_key = None - for resource_key in param_dict['__job_resource'].keys(): - if resource_key == resource: - destination_id = param_dict['__job_resource'][resource_key] - if destination_id in RESOURCES[resource_key]: - break - elif destination_id == TEAM_DESTINATION: - break - else: - log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id) - raise JobMappingException( FAILURE_MESSAGE ) - else: - log.warning('(%s) Destination/walltime dynamic plugin got an invalid value for selector: %s', job.id, param_dict['__job_resource']['__job_resource__select']) - raise JobMappingException( FAILURE_MESSAGE ) - elif param_dict['__job_resource']['__job_resource__select'] == 'no': - # job will be sent to the default - if user_email.lower() in NORM_RESERVED_USERS: - log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email) - return RESERVED_DESTINATION + if resource_params: + resource_key = None + for resource_key in resource_params.keys(): + if resource_key == resource: + destination_id = resource_params[resource_key] + if destination_id in RESOURCES[resource_key]: + explicit_destination = True + break + elif destination_id == TEAM_DESTINATION: + explicit_destination = True + break + else: + log.warning('(%s) Destination/walltime dynamic plugin got an invalid destination: %s', job.id, destination_id) + raise JobMappingException(FAILURE_MESSAGE) else: - log.warning('(%s) Destination/walltime dynamic plugin did not find a valid resource key, keys were: %s', job.id, param_dict['__job_resource'].keys()) - raise JobMappingException( FAILURE_MESSAGE ) + log.warning('(%s) Destination/walltime dynamic plugin did not receive a valid resource key, resource params were: %s', job.id, resource_params) + raise JobMappingException(FAILURE_MESSAGE) else: - log.warning('(%s) Destination/walltime dynamic plugin did not receive the __job_resource param, keys were: %s', job.id, param_dict.keys()) - raise JobMappingException( FAILURE_MESSAGE ) + # if __job_resource is not passed or __job_resource_select is not set to a "yes-like" value, resource_params is an empty dict + if user_email.lower() in NORM_RESERVED_USERS: + log.info("(%s) Destination/walltime dynamic plugin returning default reserved destination for '%s'", job.id, user_email) + #return RESERVED_DESTINATION + destination_id = RESERVED_DESTINATION if destination_id == TEAM_DESTINATION: if user_email in TEAM_USERS: destination_id = TEAM_DESTINATION - destination = app.job_config.get_destination( TEAM_DESTINATION ) - destination.params['nativeSpecification'] += ' --ntasks=%s' % param_dict['__job_resource']['team_cpus'] + destination = app.job_config.get_destination(TEAM_DESTINATION) + destination.params['nativeSpecification'] += ' --ntasks=%s' % resource_params['team_cpus'] else: log.warning("(%s) Unauthorized user '%s' selected team development destination", job.id, user_email) destination_id = LOCAL_DESTINATION + explicit_destination = False # Only allow stampede if a cached reference is selected if destination_id in STAMPEDE_DESTINATIONS and tool_id in PUNT_TOOLS: @@ -215,10 +250,26 @@ def __rule(app, tool, job, user_email, resource): destination_id = default_destination_id # Some tools do not react well to Jetstream - if destination_id is None and tool_id not in JETSTREAM_TOOLS: + if not explicit_destination and tool_id not in JETSTREAM_TOOLS: log.info('(%s) Default destination requested and tool is not in Jetstream-approved list, job will be sent to local cluster', job.id) destination_id = default_destination_id + # FIXME: this is getting really messy + mem_mb = None + + if resource == 'multi_bridges_compute_resource' and tool_id == 'rna_star': + # FIXME: special casing + try: + _destination_id, mem_mb = _rnastar(app, param_dict, destination_id, explicit_destination) + if not explicit_destination: + destination_id = _destination_id + if destination_id == BRIDGES_DESTINATION: + destination = app.job_config.get_destination(destination_id) + destination.params['submit_native_specification'] += ' --time=48:00:00' + except: + log.exception('(%s) Error determining parameters for STAR job', job.id) + raise JobMappingException(FAILURE_MESSAGE) + # Need to explicitly pick a destination because of staging. Otherwise we # could just submit with --clusters=a,b,c and let slurm sort it out if destination_id in JETSTREAM_DESTINATIONS + (None,) and default_destination_id == LOCAL_DESTINATION: @@ -226,6 +277,8 @@ def __rule(app, tool, job, user_email, resource): clusters = ','.join(JETSTREAM_DESTINATION_MAPS[test_destination_id]['clusters']) native_specification = app.job_config.get_destination(test_destination_id).params.get('nativeSpecification', '') native_specification = _set_walltime(tool_id, native_specification) + if mem_mb: + native_specification += ' --mem=%s' % mem_mb sbatch_test_cmd = ['sbatch', '--test-only', '--clusters=%s' % clusters] + native_specification.split() + [TEST_SCRIPT] log.debug('Testing job submission to determine suitable cluster: %s', ' '.join(sbatch_test_cmd)) @@ -251,11 +304,26 @@ def __rule(app, tool, job, user_email, resource): destination_id = '%s_%s' % (destination_prefix, JETSTREAM_DESTINATION_MAPS[default_destination_id]['partition']) destination = app.job_config.get_destination(destination_id) - destination.params['nativeSpecification'] = _set_walltime(tool_id, destination.params.get('nativeSpecification', '')) + # FIXME: aaaaah i just need this to work for now + if destination_id.startswith('jetstream'): + destination.params['submit_native_specification'] = _set_walltime(tool_id, destination.params.get('submit_native_specification', '')) + else: + destination.params['nativeSpecification'] = _set_walltime(tool_id, destination.params.get('nativeSpecification', '')) if destination_id is None: destination_id = default_destination_id + if mem_mb: + if destination is None: + destination = app.job_config.get_destination(destination_id) + # FIXME: and here wow such mess + if destination_id in (LOCAL_DESTINATION, LOCAL_DEVELOPMENT_DESTINATION, RESERVED_DESTINATION): + destination.params['nativeSpecification'] += ' --mem=%s' % mem_mb + elif destination_id.startswith('jetstream'): + pass # don't set --mem, you get the whole node anyway + elif destination_id in BRIDGES_DESTINATIONS: + destination.params['submit_native_specification'] += ' --mem=%s' % mem_mb + # Set a walltime if local is the destination and this is a dynamic walltime tool if destination_id == LOCAL_DESTINATION and tool_id in RUNTIMES: destination_id = LOCAL_WALLTIME_DESTINATION @@ -274,8 +342,11 @@ def __rule(app, tool, job, user_email, resource): log.debug(" nativeSpecification is: %s", destination.params['nativeSpecification']) return destination or destination_id -def dynamic_local_stampede_select_dynamic_walltime( app, tool, job, user_email ): - return __rule( app, tool, job, user_email, 'tacc_compute_resource' ) +def dynamic_local_stampede_select_dynamic_walltime(app, tool, job, user_email, resource_params): + return __rule(app, tool, job, user_email, resource_params, 'tacc_compute_resource') + +def dynamic_multi_bridges_select(app, tool, job, user_email, resource_params): + return __rule(app, tool, job, user_email, resource_params, 'multi_bridges_compute_resource') def dynamic_stampede_select( app, tool, job, user_email ): return __rule( app, tool, job, user_email, 'stampede_compute_resource' ) diff --git a/files/galaxy/usegalaxy.org/config/job_resource_params_conf.xml b/files/galaxy/usegalaxy.org/config/job_resource_params_conf.xml index 21eb97431..9ca692dc9 100644 --- a/files/galaxy/usegalaxy.org/config/job_resource_params_conf.xml +++ b/files/galaxy/usegalaxy.org/config/job_resource_params_conf.xml @@ -9,6 +9,13 @@ + + + + + + + diff --git a/templates/galaxy/usegalaxy.org/config/job_conf.xml.j2 b/templates/galaxy/usegalaxy.org/config/job_conf.xml.j2 index 571dd8575..9389901ec 100644 --- a/templates/galaxy/usegalaxy.org/config/job_conf.xml.j2 +++ b/templates/galaxy/usegalaxy.org/config/job_conf.xml.j2 @@ -196,6 +196,11 @@ dynamic_local_stampede_select_dynamic_walltime + + python + dynamic_multi_bridges_select + + python @@ -516,6 +521,7 @@ -p LM --constraint=LM $__user_email__ + {{ galaxy_config_dir }}/pulsar_bridges_actions.yml @@ -533,6 +539,7 @@ -p LM -t 00:30:00 --mem=147456 $__user_email__ + {{ galaxy_config_dir }}/pulsar_bridges_actions.yml @@ -583,7 +590,8 @@ - + + @@ -687,6 +695,7 @@ tacc_compute_resource + multi_bridges_compute_resource stampede_compute_resource bridges_compute_resource