Skip to content

Commit

Permalink
Sergio: create new src archive every new session.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergio Maffioletti committed Aug 28, 2017
1 parent 0501a03 commit d1bc57d
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
20 changes: 8 additions & 12 deletions gc3apps/bf.uzh.ch/gcrashdetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,14 @@ def _scan_and_tar(tarfile_folder, input_folder):
cwd = os.getcwd()
os.chdir(input_folder)

if os.path.isfile(os.path.join(tarfile_folder,GCRASHDETECT_INPUT_ARCHIVE)):
gc3libs.log.warning("Using already present input archive '%s'." % os.path.join(tarfile_folder,
GCRASHDETECT_INPUT_ARCHIVE))
else:
tar = tarfile.open(os.path.join(tarfile_folder,
GCRASHDETECT_INPUT_ARCHIVE),
"w:gz",
dereference=True)

for f in [ elem for elem in os.listdir('.') if os.path.splitext(elem)[-1] in GCRASHDETECT_VALID_INPUT_FILE_EXTENSIONS or os.path.isdir(elem)]:
tar.add(f)
tar.close()
tar = tarfile.open(os.path.join(tarfile_folder,
GCRASHDETECT_INPUT_ARCHIVE),
"w:gz",
dereference=True)

for f in [ elem for elem in os.listdir('.') if os.path.splitext(elem)[-1] in GCRASHDETECT_VALID_INPUT_FILE_EXTENSIONS or os.path.isdir(elem)]:
tar.add(f)
tar.close()
os.chdir(cwd)
return os.path.join(tarfile_folder,GCRASHDETECT_INPUT_ARCHIVE)
except Exception, x:
Expand Down
111 changes: 103 additions & 8 deletions gc3apps/usz/gsubbeast.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
import mimetypes
import random

import pandas
from xml.etree import cElementTree as ET
from xml.etree.ElementTree import Element, SubElement, Comment, tostring

from pkg_resources import Requirement, resource_filename

import gc3libs
Expand All @@ -82,6 +86,22 @@
DEFAULT_SEED_RANGE=37444887175646646

# Utility functions
def _get_id_from_inputfile(input_file):
"""
by convention, input XML files will have an 'id' tag in the 'data' item
Return 'id' tag name
"""
id_name = filename = os.path.splitext(os.path.basename(input_file))[0]
tree = ET.parse(input_file)
root = tree.getroot()
data = root.find('data')
if data.get('id'):
id_name = data.get('id')
else:
gc3libs.log.warning("Failed to extract 'id' from input file {0}. Using filename instead".format(filename))
gc3libs.log.info("Using id '{0}' for filename {1}".format(id_name, filename))
return id_name

def _get_valid_input(input_folder, resume):
"""
[ input_xml for input_xml in os.listdir(self.params.input_folder) if mimetypes.guess_type(input_xml)[0] == 'application/xml' ]:
Expand Down Expand Up @@ -126,7 +146,7 @@ class GsubbeastApplication(Application):
"""
application_name = 'gsubbeast'

def __init__(self, input_file, state_file, seed, jar=None, **extra_args):
def __init__(self, input_file, state_file, id_name, seed, jar=None, **extra_args):

executables = []
inputs = dict()
Expand Down Expand Up @@ -158,6 +178,7 @@ def __init__(self, input_file, state_file, seed, jar=None, **extra_args):

gc3libs.log.debug("Creating application for executing: %s", arguments)

self.id_name = id_name
self.seed = seed
self.jar_option = jar_option

Expand All @@ -172,15 +193,27 @@ def __init__(self, input_file, state_file, seed, jar=None, **extra_args):
**extra_args)


def terminated(self):
"""
Check whehter expected output log files have been generated.
"""
result_log_file = os.path.join(self.output_dir, '{0}.log'.format(self.id_name))
gc3libs.log.info('Application terminated with exit code %s' % self.execution.exitcode)
if not os.path.isfile(result_log_file):
gc3libs.log.error('Failed while checking outputfile %s.' % result_log_file)
self.execution.returncode = (0, 99)


class GsubbeastRetryableTask(RetryableTask):
def __init__(self, input_file, state_file, seed, jar=None, **extra_args):
def __init__(self, input_file, state_file, id_name, seed=None, jar=None, **extra_args):
RetryableTask.__init__(
self,
# actual computational job
GsubbeastApplication(
input_file,
state_file,
seed,
id_name,
seed=seed,
jar=jar,
**extra_args),
**extra_args
Expand Down Expand Up @@ -273,7 +306,26 @@ def setup_options(self):
default=False,
help="Periodically fetch job's output folder and copy locally.")

self.add_param('-M', '--merge-anyway',
dest='must_complete_all_jobs',
action='store_true',
default=False,
help='Merge results only when all jobs have completed " \
" successfully. Default: %(default)s.')

self.add_param('-O', '--store-aggregate-csv',
dest='result_csv',
default='',
help='Location of aggregated .csv results. Default: %(default)s.')

self.add_param('-P', '--extract-columns',
dest='columns',
default='TreeHeight.t',
help='Comma separated list of columns name to extract from " \
"output log file. Default: %(default)s.')



def before_main_loop(self):
# XXX: should this be done with `make_controller` instead?
self._controller.retrieve_running = self.params.follow
Expand All @@ -289,13 +341,12 @@ def new_tasks(self, extra):

for (input_file, stat_file) in _get_valid_input(self.params.input_folder,
self.params.resume):
for rep in range(1,self.params.repeat):
id_name = _get_id_from_inputfile(input_file)
for rep in range(0,self.params.repeat):

jobname = "{xml}-{rep}".format(xml=os.path.splitext(os.path.basename(input_file))[0],
rep=rep)

extra_args = extra.copy()
jobname = '{0}-{1}'.format(id_name, rep)

extra_args = extra.copy()
extra_args['jobname'] = jobname
extra_args['output_dir'] = self.params.output
extra_args['output_dir'] = extra_args['output_dir'].replace('NAME', jobname)
Expand All @@ -308,8 +359,52 @@ def new_tasks(self, extra):
tasks.append(GsubbeastRetryableTask(
input_file,
stat_file,
id_name,
seed=random.randrange(DEFAULT_SEED_RANGE),
jar=self.params.jar,
**extra_args))

return tasks

def after_main_loop(self):
"""
Merge all result files together
Then clean up tmp files
"""
if self.params.must_complete_all_jobs:
for task in self.session:
if task.execution.state != Run.State.TERMINATED:
gc3libs.log.warning('Could not perform aggregation task as not all jobs have terminated.')
return
if task.execution.returncode is not None and task.execution.returncode != 0:
gc3libs.log.warning('Could not perform aggregation task as not all jobs have completed successfully.')
return

df_dict = dict()
for df_name in self.params.columns:
df_dict[df_name] = pandas.DataFrame()

for task in self.session:
if isinstance(task, GsubbeastApplication) and task.execution.returncode == 0:
result_log_file = os.path.join(task.output_dir, '{0}.log'.format(task.id_name))
if os.path.isfile(result_log_file):
gc3libs.log.debug('Reading output file {0}'.format(result_log_file))
data = pandas.read_csv(result_file, sep='\t')
for key in df_dict.keys():
df = df_dict[key]
gc3libs.log.debug("Column '{0}' counts {1} elements".format(key, len(data[key])))
df[task.id_name] = data[key]

else:
gc3libs.log.error('Output file {0} for task {1} not found'.format(result_log_file, task.id_name))
continue
else:
gc3libs.log.warning('Task {0} not completed. Ignoring'.format(task.id_name))

gc3libs.log.info('Writing aggregated .csv results in {0}'.format(self.params.result_csv))
for key in df_dict.keys():
df = df_dict[key]
df.to_csv(os.path.join(self.params.result_csv, '{0}.csv'.format(key)))

return

0 comments on commit d1bc57d

Please sign in to comment.