Skip to content

Commit

Permalink
Sergio: updated gsubbeast with post-processing feature. Extract from
Browse files Browse the repository at this point in the history
execution's log file specific columns of data and combine them.
  • Loading branch information
Sergio Maffioletti committed Sep 21, 2017
1 parent 07e3fa8 commit fa7b5a6
Showing 1 changed file with 33 additions and 48 deletions.
81 changes: 33 additions & 48 deletions gc3apps/usz/gsubbeast.py
@@ -1,7 +1,7 @@
#! /usr/bin/env python
#
# gsubbeast.py -- Front-end script for evaluating R-based 'weight'
# function over a large dataset.
# gsubbeast.py -- Front-end script for running BEAST jobs.
# https://www.beast2.org/
#
# Copyright (C) 2011, 2012 GC3, University of Zurich
#
Expand All @@ -20,38 +20,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Front-end script for submitting multiple `R` jobs.
Front-end script for submitting multiple BEAST jobs.
It uses the generic `gc3libs.cmdline.SessionBasedScript` framework.
See the output of ``gsubbeast.py --help`` for program usage
instructions.
Input parameters consists of:
:param str edges file: Path to an .csv file containing input data in
the for of:
X1 X2
1 id1 id2
2 id1 id3
3 id1 id4
...
XXX: To be clarified:
. When input files should be removed ?
. What happen if an error happen at merging time ?
. Should be possible to re-run a subset of the initial chunk list
without re-creating a new session ?
e.g. adding a new argument accepting chunk ranges (-R 3000:7500)
This would trigger the re-run of the whole workflow only
for lines between 3000 and 7500
:param str BEAST XML file: Path to an .xml file containing BEAST settings
and input data
"""

# summary of user-visible changes
__changelog__ = """
2013-07-03:
2017-08-28:
* Initial version
"""
__author__ = 'Sergio Maffioletti <sergio.maffioletti@gc3.uzh.ch>'
__author__ = 'Sergio Maffioletti <sergio.maffioletti@uzh.ch>'
__docformat__ = 'reStructuredText'
__version__ = '1.0'

Expand Down Expand Up @@ -84,6 +69,8 @@

DEFAULT_REMOTE_OUTPUT_FOLDER = "./results"
DEFAULT_SEED_RANGE=37444887175646646
DEFAULT_JAR_LOCATION="/app/beast/lib/beast.jar"
BEAST_COMMAND="java -jar {jar} -working -overwrite -beagle -seed {seed} {resume} {input_xml}"

# Utility functions
def _get_id_from_inputfile(input_file):
Expand Down Expand Up @@ -153,11 +140,6 @@ def __init__(self, input_file, state_file, id_name, seed, jar=None, **extra_args
outputs = dict()

inputs[input_file] = os.path.basename(input_file)
gsubbeast_wrapper = resource_filename(Requirement.parse("gc3pie"),
"gc3libs/etc/gsubbeast_wrapper.sh")

inputs[gsubbeast_wrapper] = "./wrapper.sh"
executables.append(inputs[gsubbeast_wrapper])

if state_file:
inputs[state_file] = os.path.basename(state_file)
Expand All @@ -167,20 +149,20 @@ def __init__(self, input_file, state_file, id_name, seed, jar=None, **extra_args

if jar:
inputs[jar] = os.path.basename(jar)
jar_option = "-j "+inputs[jar]
jar_cmd = inputs[jar]
else:
jar_option = ""
jar_cmd = DEFAULT_JAR_LOCATION

arguments ="./wrapper.sh {jar} -s {seed} {resume} {input_xml}".format(seed=seed,
jar=jar_option,
resume=resume_option,
input_xml=inputs[input_file])
arguments = BEAST_COMMAND.format(jar=jar_cmd,
seed=seed,
resume=resume_option,
input_xml=inputs[input_file])

gc3libs.log.debug("Creating application for executing: %s", arguments)

self.id_name = id_name
self.seed = seed
self.jar_option = jar_option
self.jar_cmd = jar_cmd

Application.__init__(
self,
Expand Down Expand Up @@ -307,24 +289,26 @@ def setup_options(self):
help="Periodically fetch job's output folder and copy locally.")

self.add_param('-M', '--merge-anyway',
dest='must_complete_all_jobs',
dest='merge_anyway',
action='store_true',
default=False,
help='Merge results only when all jobs have completed " \
" successfully. Default: %(default)s.')
help="Merge results only when all jobs have completed " \
" successfully. Default: %(default)s.")

self.add_param('-O', '--store-aggregate-csv',
dest='result_csv',
default='',
help='Location of aggregated .csv results. Default: %(default)s.')
default='.',
help="Location of aggregated .csv results. Default: '%(default)s'.")

self.add_param('-P', '--extract-columns',
dest='columns',
default='TreeHeight.t',
default=['TreeHeight.t'],
help='Comma separated list of columns name to extract from " \
"output log file. Default: %(default)s.')


def parse_args(self):
self.params.columns = self.params.columns.split(',')


def before_main_loop(self):
# XXX: should this be done with `make_controller` instead?
Expand Down Expand Up @@ -371,7 +355,7 @@ def after_main_loop(self):
Merge all result files together
Then clean up tmp files
"""
if self.params.must_complete_all_jobs:
if not self.params.merge_anyway:
for task in self.session:
if task.execution.state != Run.State.TERMINATED:
gc3libs.log.warning('Could not perform aggregation task as not all jobs have terminated.')
Expand All @@ -385,21 +369,22 @@ def after_main_loop(self):
df_dict[df_name] = pandas.DataFrame()

for task in self.session:
if isinstance(task, GsubbeastApplication) and task.execution.returncode == 0:
if isinstance(task, GsubbeastRetryableTask) and task.execution.returncode == 0:
result_log_file = os.path.join(task.output_dir, '{0}.log'.format(task.id_name))
if os.path.isfile(result_log_file):
gc3libs.log.debug('Reading output file {0}'.format(result_log_file))
data = pandas.read_csv(result_file, sep='\t')
data = pandas.read_csv(result_log_file, sep='\t', comment="#")
for key in df_dict.keys():
df = df_dict[key]
gc3libs.log.debug("Column '{0}' counts {1} elements".format(key, len(data[key])))
df[task.id_name] = data[key]
column_to_search = "{0}:{1}".format(key,task.id_name)
gc3libs.log.debug("Column '{0}' counts {1} elements".format(key,
len(data[column_to_search])))
df_dict[key][task.id_name] = data[column_to_search]

else:
gc3libs.log.error('Output file {0} for task {1} not found'.format(result_log_file, task.id_name))
continue
else:
gc3libs.log.warning('Task {0} not completed. Ignoring'.format(task.id_name))
gc3libs.log.warning('Task {0} not completed or failed during execution. Ignoring'.format(task.id_name))

gc3libs.log.info('Writing aggregated .csv results in {0}'.format(self.params.result_csv))
for key in df_dict.keys():
Expand Down

0 comments on commit fa7b5a6

Please sign in to comment.