diff --git a/gc3libs/cmdline.py b/gc3libs/cmdline.py index fb6ccf11..543c38de 100755 --- a/gc3libs/cmdline.py +++ b/gc3libs/cmdline.py @@ -24,21 +24,12 @@ applications should implement their specific behavior by subclassing and overriding a few customization methods. -There are currently two public classes provided here: - -:class:`GC3UtilsScript` - Base class for all the GC3Utils commands. Implements a few methods - useful for writing command-line scripts that operate on jobs by ID. +There is currently only one public class provided here: :class:`SessionBasedScript` Base class for the ``grosetta``/``ggamess``/``gcodeml`` scripts. Implements a long-running script to submit and manage a large number - of jobs grouped into a "session". - -:class:`SessionBasedDaemon` - Base class for GC3Pie daemons. Implements a long-running daemon with - XML-RPC interface and support for file/http/swift based inboxes - + of tasks grouped into a "session". """ __author__ = 'Riccardo Murri ' __docformat__ = 'reStructuredText' @@ -63,13 +54,13 @@ from collections import defaultdict from prettytable import PrettyTable import SimpleXMLRPCServer as sxmlrpc +from warnings import warn import xmlrpclib import json import yaml # 3rd party modules -import daemon import cli # pyCLI import cli.app import cli._ext.argparse as argparse @@ -83,11 +74,16 @@ import gc3libs.core import gc3libs.exceptions import gc3libs.persistence -import gc3libs.utils +from gc3libs.utils import ( + basename_sans, + deploy_configuration_file, + same_docstring_as, + test_file, +) import gc3libs.url from gc3libs.quantity import Memory, GB, Duration, hours from gc3libs.session import Session -from gc3libs.poller import get_poller, get_mask_description, events as notify_events + # types for command-line parsing; see # http://docs.python.org/dev/library/argparse.html#type @@ -215,23 +211,20 @@ def positive_int(num): def existing_file(path): - gc3libs.utils.test_file(path, - os.F_OK | os.R_OK, - argparse.ArgumentTypeError) + test_file(path, os.F_OK | os.R_OK, + argparse.ArgumentTypeError) return path def executable_file(path): - gc3libs.utils.test_file(path, - os.F_OK | os.R_OK | os.X_OK, - argparse.ArgumentTypeError) + test_file(path, os.F_OK | os.R_OK | os.X_OK, + argparse.ArgumentTypeError) return path def existing_directory(path): - gc3libs.utils.test_file(path, - os.F_OK | os.R_OK | os.X_OK, - argparse.ArgumentTypeError, isdir=True) + test_file(path, os.F_OK | os.R_OK | os.X_OK, + argparse.ArgumentTypeError, isdir=True) return path @@ -256,7 +249,6 @@ class _Script(cli.app.CommandLineApp): There is no defaults for positional arguments, you *must* override `setup_args`:meth: in derived classes. - """ ## @@ -353,14 +345,6 @@ def __init__(self, **extra_args): raise AssertionError( "Missing required parameter 'description'.") - # allow overriding command-line options in subclasses - def argparser_factory(*args, **kwargs): - kwargs.setdefault('conflict_handler', 'resolve') - kwargs.setdefault('formatter_class', - cli._ext.argparse.RawDescriptionHelpFormatter) - return cli.app.CommandLineApp.argparser_factory(*args, **kwargs) - - self.argparser_factory = argparser_factory # init superclass extra_args.setdefault( 'name', @@ -372,6 +356,16 @@ def argparser_factory(*args, **kwargs): # provide some defaults self.verbose_logging_threshold = 0 + @staticmethod + def argparser_factory(*args, **kwargs): + """ + Allow orverriding command-line options in subclasses. + """ + kwargs.setdefault('conflict_handler', 'resolve') + kwargs.setdefault('formatter_class', + cli._ext.argparse.RawDescriptionHelpFormatter) + return cli.app.CommandLineApp.argparser_factory(*args, **kwargs) + @property def description(self): """A string describing the application. @@ -469,18 +463,18 @@ def run(self): """ try: return cli.app.CommandLineApp.run(self) - except gc3libs.exceptions.InvalidUsage as ex: - # Fatal errors do their own printing, we only add a short usage - # message + except gc3libs.exceptions.InvalidUsage: + # Fatal errors do their own printing, + # we only add a short usage message sys.stderr.write( "Type '%s --help' to get usage help.\n" % self.name) - return 64 # EX_USAGE in /usr/include/sysexits.h + return os.EX_USAGE # see: /usr/include/sysexits.h except KeyboardInterrupt: sys.stderr.write( "%s: Exiting upon user request (Ctrl+C)\n" % self.name) self.cleanup() return 13 - except SystemExit as ex: + except SystemExit: # sys.exit() has been called in `post_run()`. raise # the following exception handlers put their error message @@ -488,7 +482,6 @@ def run(self): # tries to log the message and only outputs it to stderr if # this fails except lockfile.Error as ex: - # exc_info = sys.exc_info() msg = ("Error manipulating the lock file (%s: %s)." " This likely points to a filesystem error" " or a stale process holding the lock." @@ -497,13 +490,12 @@ def run(self): " including any output you got by running '%s -vvvv %s'." " (You need to be subscribed to post to the mailing list)") if len(sys.argv) > 0: - msg %= (ex.__class__.__name__, str(ex), + msg %= (ex.__class__.__name__, ex, self.name, str.join(' ', sys.argv[1:])) else: - msg %= (ex.__class__.__name__, str(ex), self.name, '') - # rc = 1 + msg %= (ex.__class__.__name__, ex, self.name, '') + rc = os.EX_UNAVAILABLE # see: /usr/include/sysexits.h except AssertionError as ex: - # exc_info = sys.exc_info() msg = ("BUG: %s\n" "Please send an email to gc3pie@googlegroups.com" " including any output you got by running '%s -vvvv %s'." @@ -513,13 +505,13 @@ def run(self): msg %= (ex, self.name, str.join(' ', sys.argv[1:])) else: msg %= (ex, self.name, '') - # rc = 1 + rc = os.EX_SOFTWARE # see: /usr/include/sysexits.h except cli.app.Abort as ex: msg = "%s: %s" % (ex.__class__.__name__, ex) - # rc = ex.status + rc = ex.status except EnvironmentError as ex: msg = "%s: %s" % (ex.__class__.__name__, ex) - # rc = os.EX_IOERR # 74 (see: /usr/include/sysexits.h ) + rc = os.EX_IOERR # see: /usr/include/sysexits.h except Exception as ex: if 'GC3PIE_NO_CATCH_ERRORS' in os.environ: # propagate generic exceptions for debugging purposes @@ -527,7 +519,7 @@ def run(self): else: # generic error exit msg = "%s: %s" % (ex.__class__.__name__, ex) - # rc = 1 + rc = 1 # output error message and -maybe- backtrace... try: self.log.critical( @@ -538,11 +530,11 @@ def run(self): sys.stderr.write("%s: FATAL ERROR: %s\n" % (self.name, msg)) # be careful here as `self.params` might not have been properly # constructed yet - if ('verbose' in self.params and self.params.verbose > - self.verbose_logging_threshold + 2): + if ('verbose' in self.params + and self.params.verbose > self.verbose_logging_threshold + 2): sys.excepthook(* sys.exc_info()) # ...and exit - return 1 + return rc ## # INTERNAL METHODS @@ -575,7 +567,7 @@ def _make_config( for location in reversed(config_file_locations): if (os.access(os.path.dirname(location), os.W_OK | os.X_OK) and not - gc3libs.utils.deploy_configuration_file( + deploy_configuration_file( location, "gc3pie.conf.example")): # warn user self.log.warning( @@ -685,8 +677,8 @@ def add(self, task): """ Method to add a task to the session (and the controller) """ - self.controller.add(task) - self.session.add(task) + self._controller.add(task) + self.session.add(task, flush=False) def before_main_loop(self): """ @@ -709,10 +701,11 @@ def every_main_loop(self): Override in subclasses to plug any behavior here; the default implementation does nothing. - FIXME: While on a SessionBasedScript this method is called - *after* processing all the jobs, on a SessionBasedDaemon it is - called *before*. + .. note:: + **FIXME:** While in a SessionBasedScript this method is + called *after* processing all tasks, in a + `SessionBasedDaemon`:class: it is called *before*. """ pass @@ -738,15 +731,6 @@ def after_main_loop(self): # the pyCLI docs before :-) ## - # safeguard against programming errors: if the `application` ctor - # parameter has not been given to the constructor, the following - # method raises a fatal error (this function simulates a class ctor) - def __unset_application_cls(*args, **kwargs): - """Raise an error if users did not set `application` in - `SessionBasedScript` initialization.""" - raise gc3libs.exceptions.InvalidArgument( - "PLEASE SET `application` in `SessionBasedScript` CONSTRUCTOR") - def __init__(self, **extra_args): """ Perform initialization and set the version, help and usage @@ -776,14 +760,7 @@ def __init__(self, **extra_args): self.session = None # by default, print stats of all kind of jobs self.stats_only_for = None - self.instances_per_file = 1 - self.instances_per_job = 1 self.extra = {} # extra extra_args arguments passed to `parse_args` - # use bogus values that should point ppl to the right place - self.input_filename_pattern = 'PLEASE SET `input_filename_pattern`' - 'IN `SessionBasedScript` CONSTRUCTOR' - # catch omission of mandatory `application` ctor param (see above) - self.application = _SessionBasedCommand.__unset_application_cls # init base classes _Script.__init__( self, @@ -838,36 +815,18 @@ def _add_new_tasks(self, new_jobs): # add new jobs to the session existing_job_names = self.session.list_names() - warning_on_old_style_given = False for n, item in enumerate(new_jobs): if isinstance(item, tuple): - if not warning_on_old_style_given: - self.log.warning( - "Using old-style new tasks list; please update" - " the code!") - warning_on_old_style_given = True - # build Task for (jobname, classname, args, kwargs) - jobname, cls, args, kwargs = item - if jobname in existing_job_names: - continue - kwargs.setdefault('jobname', jobname) - kwargs.setdefault( - 'output_dir', - self.make_directory_path( - self.params.output, - jobname)) - kwargs.setdefault( - 'requested_cores', self.extra['requested_cores']) - kwargs.setdefault( - 'requested_memory', self.extra['requested_memory']) - kwargs.setdefault( - 'requested_walltime', self.extra['requested_walltime']) # create a new `Task` object try: - task = cls(*args, **kwargs) - except Exception as ex: - self.log.error("Could not create job '%s': %s." - % (jobname, str(ex)), exc_info=__debug__) + warn( + "Using old-style tasks initializer;" + " please update the code in function `new_tasks`!", + DeprecationWarning) + task = self.__make_task_from_old_style_args(item) + except Exception as err: + self.log.error("Could not create task '%s': %s.", + jobname, err, exc_info=__debug__) continue # XXX: should we raise an exception here? # raise AssertionError( @@ -882,22 +841,49 @@ def _add_new_tasks(self, new_jobs): else: raise gc3libs.exceptions.InternalError( - "SessionBasedScript.process_args got %r (%s)," - " but was expecting a gc3libs.Task instance" % - (item, type(item))) + "SessionBasedScript.process_args got %r %s," + " but was expecting a gc3libs.Task instance" + % (item, type(item))) + + # silently ignore duplicates + if task.jobname in existing_job_names: + continue # patch output_dir if it's not changed from the default, # or if it's not defined (e.g., TaskCollection) - if 'output_dir' not in task or task.output_dir == self.extra[ - 'output_dir']: + if ('output_dir' not in task + or task.output_dir == self.extra['output_dir']): # user did not change the `output_dir` default, expand it now - self._fix_output_dir(task, task.jobname) + self.__fix_output_dir(task, task.jobname) # all done, append to session self.session.add(task, flush=False) - self.log.debug("Added task '%s' to session." % task.jobname) + self.log.debug("Added task '%s' to session.", task.jobname) + + def __make_task_from_old_style_args(self, item): + """ + Build Task from a ``(jobname, classname, args, kwargs)`` tuple. + + .. note:: - def _fix_output_dir(self, task, name): + This function is provided for compatibility with very old + script only. You should return a `Task`:class: instance + from the `new_tasks`:meth: instead. + """ + jobname, cls, args, kwargs = item + kwargs.setdefault('jobname', jobname) + kwargs.setdefault( + 'output_dir', + self.make_directory_path(self.params.output, jobname)) + kwargs.setdefault( + 'requested_cores', self.extra['requested_cores']) + kwargs.setdefault( + 'requested_memory', self.extra['requested_memory']) + kwargs.setdefault( + 'requested_walltime', self.extra['requested_walltime']) + return cls(*args, **kwargs) + + def __fix_output_dir(self, task, name): """Substitute the NAME string in output paths.""" if task.would_output: task.output_dir = task.output_dir.replace('NAME', name) @@ -962,12 +948,7 @@ def setup_common_options(self, parser): self.name), metavar="PATH", help="Store the session information in the directory at PATH." - " (Default: '%(default)s'). If PATH is an existing directory, it" - " will be used for storing job information, and an index file" - " (with suffix '.csv') will be created in it. Otherwise, the job" - " information will be stored in a directory named after PATH with" - " a suffix '.jobs' appended, and the index file" - " will be named after PATH with a suffix '.csv' added.") + " (Default: '%(default)s'). ") parser.add_param("-u", "--store-url", action="store", metavar="URL", @@ -978,9 +959,9 @@ def setup_common_options(self, parser): dest="new_session", action="store_true", default=False, - help="Discard any information saved in the session directory (see" - " '--session' option) and start a new session afresh. Any" - " information about previous jobs is lost.") + help="Discard any information saved in the session directory" + " (see '--session' option) and start a new session afresh." + " Any information about previous tasks is lost.") # 3. script execution control parser.add_param( @@ -993,23 +974,22 @@ def setup_common_options(self, parser): metavar="NUM", help="Keep running, monitoring jobs and possibly submitting" " new ones or fetching results every NUM seconds. Exit when" - " all jobs are finished.") + " all tasks are finished.") parser.add_param("-J", "--max-running", type=positive_int, dest="max_running", default=50, metavar="NUM", - help="Set the max NUMber of jobs (default: %(default)s)" - " in SUBMITTED or RUNNING state." + help="Set the maximum NUMber of jobs" + " in SUBMITTED or RUNNING state." + " (Default: %(default)s)" ) parser.add_param( "-o", "--output", dest="output", type=valid_directory, - default=os.path.join( - os.getcwd(), - 'NAME'), + default=os.path.join(os.getcwd(), 'NAME'), metavar='DIRECTORY', - help="Output files from all jobs will be collected in the" + help="Output files from all tasks will be collected in the" " specified DIRECTORY path; by default, output files are placed" " in the same directory where the corresponding input file" " resides. If the destination directory does not exist, it is" @@ -1019,7 +999,7 @@ def setup_common_options(self, parser): " 'DATE' is replaced by the submission date in ISO format" " (YYYY-MM-DD); 'TIME' is replaced by the submission time" " formatted as HH:MM. 'SESSION' is replaced by the path to the" - " session directory, with a '.out' appended.") + " session directory, with a '.out' suffix appended.") return def setup(self): @@ -1037,12 +1017,7 @@ def setup(self): def _prerun_common_checks(self): # consistency checks - try: - # FIXME: backwards-compatibility, remove after 2.0 release - self.params.walltime = Duration(int(self.params.wctime), hours) - except ValueError: - # cannot convert to `int`, use extended parsing - self.params.walltime = Duration(self.params.wctime) + self.params.walltime = Duration(self.params.wctime) # determine the session file name (and possibly create an empty index) self.session_uri = gc3libs.url.Url(self.params.session) @@ -1058,7 +1033,6 @@ def _prerun_common_checks(self): self.config.auth_factory.add_params( private_copy_directory=self.session.path) - # XXX: ARClib errors out if the download directory already exists, so # we need to make sure that each job downloads results in a new one. # The easiest way to do so is to append 'NAME' to the `output_dir` # (if it's not already there). @@ -1075,21 +1049,13 @@ def pre_run(self): _Script.pre_run(self) self._prerun_common_checks() - # since it may time quite some time before jobs are created - # and the first report is displayed, print a startup banner so - # that users get some kind of feedback ... - print("Starting %s;" - " use the '-v' command-line option to get" - " a more verbose report of activity." - % (self.name,)) ## # INTERNAL METHODS ## # The following methods are for internal use; they can be - # overridden and customized in derived classes, although there - # should be no need to do so. + # overridden and customized in derived classes. ## def _make_session(self, session_uri, store_url): @@ -1118,50 +1084,15 @@ def _main(self, *args): # zero out the session index if `-N` was given if self.params.new_session: - old_jobids = self.session.list_ids() - if old_jobids: - self.log.warning( - "Abort of existing session requested:" - " will attempt to kill existing jobs." - " This may generate a few spurious error messages" - " if the jobs are too old and have already been" - " cleaned up by the system.") - for jobid in old_jobids: - job = self.session.load(jobid) - job.attach(self._core) - try: - job.kill() - except Exception as err: - self.log.info( - "Got this error while killing old job '%s'," - " ignore it: %s: %s", - job, - err.__class__.__name__, - str(err)) - try: - job.free() - except Exception as err: - self.log.info( - "Got this error while cleaning up old job '%s'," - " ignore it: %s: %s", - job, - err.__class__.__name__, - str(err)) - job.detach() - self.session.remove(jobid) - self.log.debug("Removed job '%s' from session.", job) - self.log.info( - "Done cleaning up old session jobs, starting with new one" - " afresh...") + self.__abort_old_session() # update session based on command-line args if len(self.session) == 0: self.process_args() else: self.log.warning( - "Session already exists, some command-line arguments" - " might be ignored." - ) + "Session already exists," + " some command-line arguments might be ignored.") # save the session list immediately, so newly added jobs will # be in it if the script is stopped here @@ -1171,13 +1102,12 @@ def _main(self, *args): if self.params.resource_name: self._select_resources(self.params.resource_name) self.log.info( - "Retained only resources: %s (restricted by command-line" - " option '-r %s')", - str.join( - ",", - [ - r['name'] for r in self._core.get_resources() - if r.enabled]), + "Retained only resources: %s" + " (restricted by command-line option '-r %s')", + ','.join([ + r['name'] for r in self._core.get_resources() + if r.enabled + ]), self.params.resource_name) # create an `Engine` instance to manage the job list @@ -1195,12 +1125,7 @@ def _main(self, *args): if self.params.wait > 0: self.log.info("sleeping for %d seconds..." % self.params.wait) while not self._main_loop_done(rc): - # Python scripts become unresponsive during - # `time.sleep()`, so we just do the wait in small - # steps, to allow the interpreter to process - # interrupts in the breaks. Ugly, but works... - for x in xrange(self.params.wait): - time.sleep(1) + self._sleep(self.params.wait) # ...and now repeat the submit/update/retrieve rc = self._main_loop() except KeyboardInterrupt: # gracefully intercept Ctrl+C @@ -1224,716 +1149,62 @@ def _main(self, *args): return rc - -class _CommDaemon(object): - portfile_name = 'daemon.port' - - def __init__(self, name, listenip, workingdir, parent): - self.parent = parent - self.log = self.parent.log - - # Start XMLRPC server - self.server = sxmlrpc.SimpleXMLRPCServer((listenip, 0), - logRequests=False) - self.ip, self.port = self.server.socket.getsockname() - self.log.info("XMLRPC daemon running on %s," - "port %d.", self.ip, self.port) - self.portfile = os.path.join(workingdir, self.portfile_name) - ### FIXME: we should check if the file exists already - with open(self.portfile, 'w') as fd: - self.log.debug("Writing current port (%d) I'm listening to" - " in file %s" % (self.port, self.portfile)) - fd.write("%s:%d\n" % (self.ip, self.port)) - - # Register XMLRPC methods - self.server.register_introspection_functions() - - self.server.register_function(self.help, "help") - self.server.register_function(self.list_jobs, "list") - self.server.register_function(self.show_job, "show") - self.server.register_function(self.stat_jobs, "stat") - self.server.register_function(self.kill_job, "kill") - self.server.register_function(self.resubmit_job, "resubmit") - self.server.register_function(self.remove_job, "remove") - self.server.register_function(self.terminate, "terminate") - self.server.register_function(self.json_list, "json_list") - self.server.register_function(self.json_show_job, "json_show") - - def start(self): - return self.server.serve_forever() - - def stop(self): - try: - self.server.shutdown() - self.server.socket.close() - os.remove(self.portfile) - except Exception as ex: - # self.stop() could be called twice, let's assume it's not - # an issue but log the event anyway - self.log.warning( - "Ignoring exception caught while closing socket: %s", ex) - - def terminate(self): - """Terminate daemon""" - # Start a new thread so that we can reply - def killme(): - self.log.info("Terminating as requested via IPC") - # Wait 1s so that the client connection is not hang up and - # we have time to give feedback - time.sleep(1) - # daemon cleanup will aslo call self.stop() - self.parent.cleanup() - - # Send kill signal to current process - os.kill(os.getpid(), signal.SIGTERM) - - # This should be enough, but just in case... + def __abort_old_session(self): + old_task_ids = self.session.list_ids() + if old_task_ids: self.log.warning( - "Waiting 5s to see if my sucide attempt succeeded") - time.sleep(5) - - # If this is not working, try mor aggressive approach. - - # SIGINT is interpreted by SessionBasedDaemon class. It - # will also call self.parent.cleanup(), again. - self.log.warning("Still alive: sending SIGINT signal to myself") - os.kill(os.getpid(), signal.SIGINT) - - # We whould never reach this point, but Murphy's law... - time.sleep(5) - # Revert back to SIGKILL. This will leave the pidfile - # hanging around. - self.log.warning("Still alive: forcing SIGKILL signal.") - os.kill(os.getpid(), signal.SIGKILL) - - t = threading.Thread(target=killme) - t.start() - return "Terminating %d in 1s" % os.getpid() - - ### user visible methods - - def help(self, cmd=None): - """Show available commands, or get information about a specific - command""" - - if not cmd: - return self.server.system_listMethods() - else: - return self.server.system_methodHelp(cmd) - - @staticmethod - def print_app_table(app, indent, recursive): - """Returns a list of lists containing a summary table of the jobs""" - rows = [] - try: - jobname = app.jobname - except AttributeError: - jobname = '' - - rows.append([indent + str(app.persistent_id), - jobname, - app.execution.state, - app.execution.returncode, - app.execution.info]) - if recursive and 'tasks' in app: - indent = " "*len(indent) + ' ' - for task in app.tasks: - rows.extend(_CommDaemon.print_app_table(task, indent, recursive)) - return rows - - def list_jobs(self, opts=None): - """usage: list [detail] - - List jobs""" - - if opts and 'details'.startswith(opts): - rows = [] - for app in self.parent.session.tasks.values(): - rows.extend(self.print_app_table(app, '', True)) - table = PrettyTable(["JobID", "Job name", "State", "rc", "Info"]) - table.align = 'l' - for row in rows: - table.add_row(row) - return str(table) - elif opts and 'all'.startswith(opts): - return str.join(' ', [i.persistent_id for i in self.parent.session.iter_workflow()]) - else: - return str.join(' ', self.parent.session.list_ids()) - - def json_list(self): - """usage: json_show jobid - - List jobs""" - - jobids = [i.persistent_id for i in self.parent.session.iter_workflow()] - jobs = [] - for jobid in jobids: - app = self.parent.session.store.load(jobid) - sapp = StringIO() - gc3libs.utils.prettyprint(app, output=sapp) - jobs.append(yaml.load(sapp.getvalue())) - return json.dumps(jobs) - - def json_show_job(self, jobid=None, *attrs): - """usage: json_show - - Same output as `ginfo -v - """ - - if not jobid: - return "Usage: show " - - if jobid not in self.parent.session.tasks: - all_tasks = dict((i.persistent_id,i) for i in self.parent.session.iter_workflow() if hasattr(i, 'persistent_id')) - if jobid not in all_tasks: - return "Job %s not found in session" % jobid - else: - app = all_tasks[jobid] - else: - app = self.parent.session.tasks[jobid] - sapp = StringIO() - gc3libs.utils.prettyprint(app, output=sapp) - return json.dumps(yaml.load(sapp.getvalue())) - - def show_job(self, jobid=None, *attrs): - """usage: show [attributes] - - Same output as `ginfo -v [-p attributes]` - """ - - if not jobid: - return "Usage: show [attributes]" - - if jobid not in self.parent.session.tasks: - all_tasks = dict((i.persistent_id,i) for i in self.parent.session.iter_workflow() if hasattr(i, 'persistent_id')) - if jobid not in all_tasks: - return "Job %s not found in session" % jobid - else: - app = all_tasks[jobid] - else: - app = self.parent.session.tasks[jobid] - try: - out = StringIO() - if not attrs: - attrs = None - gc3libs.utils.prettyprint(app, - indent=4, - output=out, - only_keys=attrs) - return out.getvalue() - except Exception as ex: - return "Unable to find job %s" % jobid - - def kill_job(self, jobid=None): - if not jobid: - return "Usage: kill " - - if jobid not in self.parent.session.tasks: - all_tasks = dict((i.persistent_id,i) for i in self.parent.session.iter_workflow() if hasattr(i, 'persistent_id')) - if jobid not in all_tasks: - return "Job %s not found in session" % jobid - else: - app = all_tasks[jobid] - else: - app = self.parent.session.tasks[jobid] - try: - app.attach(self.parent._controller) - app.kill() - return "Job %s successfully killed" % jobid - except Exception as ex: - return "Error while killing job %s: %s" % (jobid, ex) - - def remove_job(self, jobid=None): - if not jobid: - return "Usage: remove " - - if jobid not in self.parent.session.tasks: - return "Job %s not found in session" % jobid - app = self.parent.session.load(jobid) - if app.execution.state != gc3libs.Run.State.TERMINATED: - return "Error: you can only remove a terminated job. Current status is: %s" % app.execution.state - try: - self.parent._controller.remove(app) - self.parent.session.remove(jobid) - return "Job %s successfully removed" % jobid - except Exception as ex: - return "Error while removing job %s: %s" % (jobid, ex) - - def resubmit_job(self, jobid=None): - if not jobid: - return "Usage: resubmit " - if jobid not in self.parent.session.tasks: - all_tasks = dict((i.persistent_id,i) for i in self.parent.session.iter_workflow() if hasattr(i, 'persistent_id')) - if jobid not in all_tasks: - return "Job %s not found in session" % jobid - else: - app = all_tasks[jobid] - else: - app = self.parent.session.tasks[jobid] - - try: - self.parent._controller.redo(app) - except Exception as ex: - return "Error while resubmitting job %s: %s" % (jobid, ex) - return "Successfully resubmitted job %s" % jobid - - def stat_jobs(self): - """Print how many jobs are in any given state""" - - stats = self.parent._controller.stats() - return str.join('\n', ["%s:%s" % x for x in stats.items()]) - - -class SessionBasedDaemon(_SessionBasedCommand): - """Base class for GC3Pie daemons. Implements a long-running script - that can daemonize, provides an XML-RPC interface to interact with - the current workflow and implement the concept of "inbox" to - trigger the creation of new jobs as soon as a new file is created - on a folder, or is available on an HTTP(s) or SWIFT endpoint. - - The generic script impelemnts a command line like the following:: - - PROG [global options] server|client [specific options] - - When running as a `server` it will accepts the same options as a - :py:class:`SessionBasedScript()` class, plus some extra options: - - PROG server [server options] [INBOX [INBOX]] - - Available options: - - `-F, --foreground` - do not daemonize. - - `--syslog` - send logs to syslog instead of writing it to a file - - `--working-dir` - working directory of the daemon, where the session - will be stored and some auxiliary files will be saved - - `--notify-state [EVENT,[EVENT]]` - a comma separated list of events - on the inbox we want to be notified of - - `--listen IP` - IP or hostname we want to listen to. Default is localhost. - - `INBOX` - path or url of one or more folder/url that will be watched for - new events. - - When running as client instead:: - - PROG client [-c FILE] ARGS - - `-c` - file written by the server conatining the hostname and port the - server is listening to. Also accepts a path to the directory - (the value of `--working-dir` the server was started with) where - the `daemon.port` file created by the server is stored. - - """ - - def cleanup(self, signume=None, frame=None): - self.log.debug("Waiting for communication thread to terminate") - try: - self.comm.stop() - self.commthread._Thread__stop() - self.commthread._Thread__delete() - self.commthread.join(1) - except AttributeError: - # If the script is interrupted/killed during command line - # parsing the `self.comm` daemon is not present and we get - # an AttributeError we can safely ignore. - pass - - def setup(self): - _Script.setup(self) - self.subparsers = self.argparser.add_subparsers(title="commands") - self.parser_server = self.subparsers.add_parser('server', help="Run the main daemon script.") - self.parser_client = self.subparsers.add_parser('client', help="Connect to a running daemon.") - - # Wrapper function around add_argument. - def _add_param_server(*args, **kwargs): - action = self.parser_server.add_argument(*args, **kwargs) - self.actions[action.dest] = action - return action - self.parser_server.add_param = _add_param_server - # This method assumes `self.add_param` exists - self.setup_common_options(self.parser_server) - - # Also, when subclassing, add_param should add arguments to - # the server parser by default. - self.add_param = lambda *x, **kw: self.parser_server.add_param(*x, **kw) - - self.parser_server.set_defaults(func=self._main_server) - self.parser_client.set_defaults(func=self._main_client) - - # change default for the `-C`, `--session` and `--output` options - self.actions['wait'].default = 30 - self.actions['wait'].help = 'Check the status of the jobs every NUM' - ' seconds. Default: %(default)s' - - # Default session dir and output dir are computed from - # --working-directory. Set None here so that we can update it - # in pre_run() - self.actions['session'].default = None - self.actions['output'].default = None - - def setup_args(self): - self.parser_server.add_param('-F', '--foreground', - action='store_true', - default=False, - help="Run in foreground. Default: %(default)s") - - self.parser_server.add_param('--syslog', - action='store_true', - default=False, - help="Send log messages (also) to syslog." - " Default: %(default)s") - - self.parser_server.add_param('--working-dir', - default=os.getcwd(), - help="Run in WORKING_DIR. Ignored if run in foreground." - " Default: %(default)s") - - self.parser_server.add_param('--notify-state', - nargs='?', - default='CLOSE_WRITE', - help="Comma separated list of notify events to watch." - " Default: %(default)s. Available events: " + - str.join(', ', [i[3:] for i in notify_events.keys()])) - - self.parser_server.add_param( - '--listen', - default="localhost", - metavar="IP", - help="IP or hostname where the XML-RPC thread should" - " listen to. Default: %(default)s") - - self.parser_server.add_param('inbox', nargs='*', - help="`inbox` directories: whenever a new file is" - " created in one of these directories, a callback is" - " triggered to add new jobs") - - self.parser_client.add_argument('-c', '--connect', - metavar="FILE", - required=True, - help="Path to the file containing hostname and port of the" - " XML-RPC enpdoint of the daemon") - self.parser_client.add_argument('cmd', - metavar='COMMAND', - help="XML-RPC command. Run `help` to know" - " which commands are available.") - self.parser_client.add_argument('args', - nargs='*', - metavar='ARGS', - help="Optional arguments of CMD.") - - def pre_run(self): - ### FIXME: Some code copied from _Script.pre_run() - - self.setup_options() - self.setup_args() - cli.app.CommandLineApp.pre_run(self) - loglevel = max(1, logging.WARNING - - 10 * - max(0, self.params.verbose - - self.verbose_logging_threshold)) - gc3libs.configure_logger(loglevel, "gc3.gc3utils") # alternate: self.name - logging.root.setLevel(loglevel) - # alternate: ('gc3.' + self.name) - self.log = logging.getLogger('gc3.gc3utils') - self.log.setLevel(loglevel) - self.log.propagate = True - self.log.info("Starting %s at %s; invoked as '%s'", - self.name, time.asctime(), str.join(' ', sys.argv)) - - # FIXME: we need to ignore the process_args method as the - # `client` subparser has different options than the `server` - # one, and the default is the `server` subparser. - if self.params.func == self._main_client: - # override `process_args` with a noop. - self.process_args = lambda *x, **kw: None - return - - # Read config file(s) from command line - self.params.config_files = self.params.config_files.split(',') - # interface to the GC3Libs main functionality - self.config = self._make_config(self.params.config_files) - try: - self._core = gc3libs.core.Core(self.config) - except gc3libs.exceptions.NoResources: - # translate internal error `NoResources` to a - # user-readable message. - raise gc3libs.exceptions.FatalError( - "No computational resources defined." - " Please edit the configuration file(s): '%s'." - % (str.join("', '", self.params.config_files))) - - self.params.working_dir = os.path.abspath(self.params.working_dir) - - # Default session dir is inside the working directory - if not self.params.session: - self.params.session = os.path.join( - self.params.working_dir, self.name) - - # Convert inbox to Url objects - self.params.inbox = [gc3libs.url.Url(i) for i in self.params.inbox] - # Default output directory is the working directory. - if not self.params.output: - self.params.output = self.params.working_dir - - self._prerun_common_checks() - self.parse_args() - - # Syntax check for notify events - self.params.notify_state = self.params.notify_state.split(',') - - # Add IN_ as we use shorter names for the command line - state_names = ['IN_' + i for i in self.params.notify_state] - - # Build the notify mask, for later use - self.notify_event_mask = 0 - - # Ensure all the supplied states are correct - for istate in state_names: - if istate not in notify_events: - raise gc3libs.exceptions.InvalidUsage( - "Invalid notify state %s." % state) - self.notify_event_mask |= notify_events[istate] - - def __setup_logging(self): - # FIXME: Apparently, when running in foreground _and_ --syslog - # is used, only a few logs are sent to syslog and then nothing - # more. - - # We use a function to avoid repetitions: since when - # demonizing all open file descriptors are called, we need to - # configure logging from *within* the DaemonContext context - - # If --syslog, add a logging handler to send to local syslog - if self.params.syslog: - # Update the root logger, not just 'gc3utils' - logging.root.addHandler( - SysLogHandler(address="/dev/log", - facility=SysLogHandler.LOG_USER)) - elif not self.params.foreground: - # The default behavior when run in daemon mode - # is to log to a file in working directory called - # .log - # - # Also, update the root logger, not just 'gc3utils' - logging.root.addHandler( - logging.FileHandler( - os.path.join(self.params.working_dir, - self.name + '.log'))) - # else: log to stdout, which is the default. - - def __setup_pollers(self): - # Setup inotify on inbox directories - self.pollers = [] - # We need to create an inotify file descriptor for each - # directory, because events returned by - # `inotifyx.get_events()` do not contains the full path to the - # file. - for inbox in self.params.inbox: - self.pollers.append(get_poller(inbox, self.notify_event_mask, recurse=True)) - - def __setup_comm(self, listen): - # Communication thread must run on a different thread - try: - self.comm = _CommDaemon( - self.name, - self.params.listen, - self.params.working_dir, - self) - def commthread(): - self.log.info("Starting XML-RPC server") - self.comm.start() - - self.commthread = threading.Thread(target=commthread) - self.commthread.start() - self.log.info("Communication thread started") - except Exception as ex: - self.log.error( - "Error initializinig Communication thread: %s" % ex) - - def _main_client(self): - portfile = self.params.connect - if os.path.isdir(portfile): - gc3libs.log.debug("First argument of --client is a directory. Checking if file `%s` is present in it" % _CommDaemon.portfile_name) - portfile = os.path.join(portfile, _CommDaemon.portfile_name) - if not os.path.isfile(portfile): - self.argparser.error("First argument of --client must be a file.") - gc3libs.log.info("Using file `%s` as argument of --connect option" % portfile) - with open(portfile, 'r') as fd: - try: - ip, port = fd.read().split(':') - port = int(port) - except Exception as ex: - print("Error parsing file %s: %s" % (portfile, ex)) - server = xmlrpclib.ServerProxy('http://%s:%d' % (ip, port)) - func = getattr(server, self.params.cmd) - try: - print(func(*self.params.args)) - except xmlrpclib.Fault as ex: - print("Error while running command `%s`: %s" % (self.params.cmd, ex.faultString)) - print("Use `help` command to list all available methods") - - def _main(self): - return self.params.func() - - def _main_server(self): - self.process_args() - - if self.params.foreground: - # If --foreground, then behave like a SessionBasedScript with - # the exception that the script will never end. - self.__setup_logging() - self.__setup_pollers() - self.log.info("Running in foreground as requested") - - self.__setup_comm(self.params.listen) - return super(SessionBasedDaemon, self)._main() - else: - lockfile = os.path.join(self.params.working_dir, - self.name + '.pid') - lock = PIDLockFile(lockfile) - if lock.is_locked(): - raise gc3libs.exceptions.FatalError( - "PID File %s is already present. Ensure not other daemon" - " is running. Delete file to continue." % lockfile) - context = daemon.DaemonContext( - working_directory=self.params.working_dir, - umask=0o002, - pidfile=lock, - stdout=open( - os.path.join(self.params.working_dir, 'stdout.txt'), - 'w'), - stderr=open( - os.path.join(self.params.working_dir, 'stderr.txt'), - 'w'), - ) - - context.signal_map = {signal.SIGTERM: self.cleanup} - self.log.info("About to daemonize") - with context: - self.__setup_logging() - self.__setup_pollers() - self.log.info("Daemonizing ...") - self.__setup_comm(self.params.listen) - return super(SessionBasedDaemon, self)._main() - - def _main_loop_done(self, rc): - # Run until interrupted - return False - - def _main_loop(self): - """ - The main loop of the application. It is in a separate - function so that we can call it just once or properly loop - around it, as directed by the `self.params.wait` option. - - .. note:: - - Overriding this method can disrupt the whole functionality of - the script, so be careful. - - Invocation of this method should return a numeric exitcode, - that will be used as the scripts' exitcode. See - `_main_loop_exitcode` for an explanation. - """ - # hook method: this is the method used to add new applications - # to the session. - self.every_main_loop() - - # Check if new files were created. 1s timeout - for poller in self.pollers: - events = poller.get_events() - for url, mask in events: - self.log.debug("Received notify event %s for %s", - get_mask_description(mask), url) - - new_jobs = self.new_tasks(self.extra.copy(), - epath=url, - emask=mask) - self._add_new_tasks(list(new_jobs)) - for task in list(new_jobs): - self._controller.add(task) - - # advance all jobs - self._controller.progress() - - # summary - stats = self._controller.stats() - # compute exitcode based on the running status of jobs - self.session.save_all() - return self._main_loop_exitcode(stats) - - def _main_loop_exitcode(self, stats): - # FIXME: Do these exit statuses make sense? - """ - Compute the exit code for the `_main` function. - (And, hence, for the whole `SessionBasedScript`.) - - The exitcode is a bitfield; only the 4 least-significant bits - are used, with the following meaning: - - === ============================================================ - Bit Meaning - === ============================================================ - 0 Set if a fatal error occurred: the script could not complete - 1 Set if there are jobs in `FAILED` state - 2 Set if there are jobs in `RUNNING` or `SUBMITTED` state - 3 Set if there are jobs in `NEW` state - === ============================================================ - - Override this method if you need to alter the termination - condition for a `SessionBasedScript`. - """ - rc = 0 - if stats['failed'] > 0: - rc |= 2 - if stats[gc3libs.Run.State.RUNNING] > 0 \ - or stats[gc3libs.Run.State.SUBMITTED] > 0 \ - or stats[gc3libs.Run.State.UNKNOWN]: - rc |= 4 - if stats[gc3libs.Run.State.NEW] > 0: - rc |= 8 - return rc + "Abort of existing session requested:" + " will attempt to kill existing tasks." + " This may generate a few spurious error messages" + " if the tasks are too old and have already been" + " cleaned up by the system.") + for task_id in old_task_ids: + task = self.session.load(task_id) + task.attach(self._core) + try: + task.kill() + except Exception as err: + self.log.info( + "Got this error while killing old task '%s'," + " ignore it: %s: %s", + task, + err.__class__.__name__, + str(err)) + try: + task.free() + except Exception as err: + self.log.info( + "Got this error while cleaning up old task '%s'," + " ignore it: %s: %s", + task, + err.__class__.__name__, + str(err)) + task.detach() + self.session.remove(task_id) + self.log.debug("Removed task '%s' from session.", task) + self.log.info( + "Done cleaning up old session tasks, starting with new one" + " afresh...") - def new_tasks(self, extra, epath=None, emask=0): + def _sleep(self, lapse): """ - This method is called every time the daemon is started with - `epath=None` and `emask=0`. Similarly to - :py:meth:`SessionBasedScript.new_tasks()` method it is - supposed to return a list of :py:class:`Task()` instances to - be added to the session. - - It is also called for each event in INBOX (for instance every - time a file is created), depending on the value of - `--notify-state` option. In this case the method will be - called for each file with the file path and the mask of the - event as arguments. + Pause execution for `lapse` seconds. - :param extra: by default: `self.extra` - - :param epath: an instance of :py:class:`gc3libs.url.Url` - containing the path to the updated file or - directory :param - - :param emask: mask explaining the event type. This reflects - the inotify events, and are defined in - :py:const:`gc3libs.poller.events` + The default implementation just calls ``time.sleep(lapse)``. + This is provided as an overrideable method so that one can use + a different system call to comply with other threading models + (e.g., gevent). """ - return [] + # Python scripts become unresponsive during + # `time.sleep()`, so we just do the wait in small + # steps, to allow the interpreter to process + # interrupts in the breaks. Ugly, but works... + for x in xrange(self.params.wait): + time.sleep(1) class SessionBasedScript(_SessionBasedCommand): - """ Base class for ``grosetta``/``ggamess``/``gcodeml`` and like scripts. Implements a long-running script to submit and manage a large number @@ -1983,6 +1254,33 @@ class SessionBasedScript(_SessionBasedCommand): """ + @same_docstring_as(_SessionBasedCommand.__init__) + def __init__(self, **extra_args): + # these are parameters used in the stock `new_tasks()` + self.instances_per_file = 1 + self.instances_per_job = 1 + # catch omission of mandatory `application` ctor params (see above) + # use bogus values that should point ppl to the right place + self.input_filename_pattern = ( + 'PLEASE SET `input_filename_pattern`' + 'IN `SessionBasedScript` CONSTRUCTOR') + self.application = self.__unset_application_cls + # init base class(es) + super(SessionBasedScript, self).__init__(**extra_args) + + + # safeguard against programming errors: if the `application` ctor + # parameter has not been given to the constructor, the following + # method raises a fatal error (this function simulates a class ctor) + def __unset_application_cls(*args, **kwargs): + """ + Raise an error if users did not set `application` in + `SessionBasedScript` initialization. + """ + raise gc3libs.exceptions.InvalidArgument( + "PLEASE SET `application` in `SessionBasedScript` CONSTRUCTOR") + + ## # CUSTOMIZATION METHODS ## @@ -2005,6 +1303,13 @@ def setup_args(self): def pre_run(self): super(SessionBasedScript, self).pre_run() + # since it may take quite some time before jobs are created + # and the first report is displayed, print a startup banner so + # that users get some kind of feedback ... + print("Starting %s;" + " use the '-v' command-line option to get" + " a more verbose report of activity." + % (self.name,)) # parse the `states` list self.params.states = self.params.states.split(',') @@ -2040,17 +1345,17 @@ def new_tasks(self, extra): if self.instances_per_job > 1: yield ( "%s.%d--%s" % ( - gc3libs.utils.basename_sans(path), + basename_sans(path), seqno, min(seqno + self.instances_per_job - 1, self.instances_per_file)), self.application, [path], extra.copy()) else: - yield ("%s.%d" % (gc3libs.utils.basename_sans(path), + yield ("%s.%d" % (basename_sans(path), seqno), self.application, [path], extra.copy()) else: - yield (gc3libs.utils.basename_sans(path), + yield (basename_sans(path), self.application, [path], extra.copy()) def print_summary_table(self, output, stats): @@ -2069,7 +1374,6 @@ def print_summary_table(self, output, stats): dictionary, mapping each possible `Run.State` to the count of tasks in that state; see `Engine.stats` for a detailed description. - """ table = PrettyTable(['state', 'n', 'n%']) table.align = 'r' @@ -2177,7 +1481,7 @@ def _main_loop(self): function so that we can call it just once or properly loop around it, as directed by the `self.params.wait` option. - .. note:: + .. warning:: Overriding this method can disrupt the whole functionality of the script, so be careful. @@ -2249,8 +1553,8 @@ def _main_loop_exitcode(self, stats): def _main_loop_done(self, rc): """ - Returns True if the main loop is completed and we don't want to - continue the processing. Returns False otherwise. + Returns ``True`` if the main loop has completed and we don't want + to continue the processing. Returns ``False`` otherwise. """ if rc > 3: return False diff --git a/gc3libs/poller.py b/gc3libs/poller.py index 82dafaf6..f2c0ca9a 100755 --- a/gc3libs/poller.py +++ b/gc3libs/poller.py @@ -208,7 +208,7 @@ def __init__(self, url, mask=events['IN_ALL_EVENTS'], self.watch(self.url.path) if self._recurse: - for dirpath, dirnames, filename in os.walk(self.url.path): + for dirpath, dirnames, filenames in os.walk(self.url.path): self.watch(dirpath) for name in filenames: path = os.path.join(self.url.path, dirpath, name) diff --git a/gc3libs/tests/scripts/simpledaemon.py b/gc3libs/tests/scripts/simpledaemon.py index 730b015f..7b497eb8 100755 --- a/gc3libs/tests/scripts/simpledaemon.py +++ b/gc3libs/tests/scripts/simpledaemon.py @@ -2,7 +2,7 @@ # """ """ -# Copyright (C) 2012-2013, GC3, University of Zurich. All rights reserved. +# Copyright (C) 2017-2018, University of Zurich. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by @@ -21,7 +21,7 @@ import os import gc3libs -from gc3libs.cmdline import SessionBasedDaemon +from gc3libs.daemon import SessionBasedDaemon import inotifyx class SimpleDaemon(SessionBasedDaemon): @@ -31,28 +31,25 @@ class SimpleDaemon(SessionBasedDaemon): def new_tasks(self, extra, epath=None, emask=0): app = None if not epath: - # Startup: just submit an Hello World application - extra['jobname'] = 'EchoApp' app = gc3libs.Application( ['/bin/echo', 'first run'], [], gc3libs.ANY_OUTPUT, stdout='stdout.txt', stderr='stderr.txt', + jobname='EchoApp', **extra) return [app] else: # A new file has been created. Process it. - extra['jobname'] = 'LSApp.%s' % os.path.basename(epath.path) - # inputs = [epath] if epath.scheme == 'file' else [] - inputs = {epath:'foo'} if emask & inotifyx.IN_CLOSE_WRITE: app = gc3libs.Application( ['/bin/echo', epath], - inputs, - gc3libs.ANY_OUTPUT, + inputs={epath:'foo'}, + outputs=gc3libs.ANY_OUTPUT, stdout='stdout.txt', stderr='stderr.txt', + jobname=('LSApp.' + os.path.basename(epath.path)), **extra) return [app] diff --git a/gc3libs/tests/test_cmdline.py b/gc3libs/tests/test_cmdline.py old mode 100644 new mode 100755 index 2bfdecf9..376e28ff --- a/gc3libs/tests/test_cmdline.py +++ b/gc3libs/tests/test_cmdline.py @@ -3,7 +3,7 @@ """ Tests for the cmdline module """ -# Copyright (C) 2012-2016 S3IT, Zentrale Informatik, University of +# Copyright (C) 2012-2016, 2018 S3IT, Zentrale Informatik, University of # Zurich. All rights reserved. # # This program is free software; you can redistribute it and/or modify @@ -29,23 +29,22 @@ import re import time -import cli.test +import pytest import gc3libs.cmdline import gc3libs.session -class TestScript(cli.test.FunctionalTest): - - def __init__(self, *args, **extra_args): - cli.test.FunctionalTest.__init__(self, *args, **extra_args) - self.scriptdir = os.path.join( - os.path.dirname( - os.path.abspath(__file__)), - 'scripts') +class TestScript(object): + @pytest.fixture(autouse=True) def setUp(self): - cli.test.FunctionalTest.setUp(self) + self.scriptdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts') + + self.basedir = tempfile.mkdtemp() + orig_wd = os.getcwd() + os.chdir(self.basedir) + CONF_FILE = """ [auth/dummy] type = ssh @@ -63,35 +62,30 @@ def setUp(self): max_cores = 2 architecture = x86_64 override = False -resourcedir = %s +resourcedir = {basedir}/resource.d """ - (fd, self.cfgfile) = tempfile.mkstemp() - self.resourcedir = self.cfgfile + '.d' - f = os.fdopen(fd, 'w') - f.write(CONF_FILE % self.resourcedir) - f.close() - - def tearDown(self): - os.remove(self.cfgfile) - cli.test.FunctionalTest.tearDown(self) + self.cfgfile = os.path.join(self.basedir, 'config.ini') + with open(self.cfgfile, 'w') as cfg: + cfg.write(CONF_FILE.format(basedir=self.basedir)) + + yield + + os.chdir(orig_wd) try: - shutil.rmtree(self.resourcedir) + shutil.rmtree(basedir) except: - # Double check if some dir is still present pass + def test_simplescript(self): - """Test a very simple script based on `SessionBasedScript`:class: + """ + Test a very simple script based on `SessionBasedScript`:class: The script is found in ``gc3libs/tests/scripts/simplescript.py`` """ - # XXX: WARNING: This will only work if you have a "localhost" - # section in gc3pie.conf!!! - result = self.run_script( + proc = subprocess.Popen([ 'python', - os.path.join( - self.scriptdir, - 'simplescript.py'), + os.path.join(self.scriptdir, 'simplescript.py'), '-C', '1', '-s', @@ -99,30 +93,33 @@ def test_simplescript(self): '--config-files', self.cfgfile, '-r', - 'localhost') + 'localhost' + ], stdout=subprocess.PIPE) + stdout, stderr = proc.communicate() + gc3libs.log.debug("Script output:\n<<<<<<<<\n%s\n>>>>>>>>", stdout) assert re.match( '.*TERMINATED\s+3/3\s+\(100.0+%\).*', - result.stdout, + stdout, re.S) # FIXME: output dir should be inside session dir - session_dir = os.path.join(self.env.base_path, 'TestOne') + session_dir = os.path.join(self.basedir, 'TestOne') assert os.path.isdir( - os.path.join(self.env.base_path, 'SimpleScript.out.d') + os.path.join(self.basedir, 'SimpleScript.out.d') ) assert os.path.isfile( os.path.join( - self.env.base_path, + self.basedir, 'SimpleScript.out.d', 'SimpleScript.stdout')) assert os.path.isdir( - os.path.join(self.env.base_path, 'SimpleScript.out2.d') + os.path.join(self.basedir, 'SimpleScript.out2.d') ) assert os.path.isfile( os.path.join( - self.env.base_path, + self.basedir, 'SimpleScript.out2.d', 'SimpleScript.stdout')) @@ -140,50 +137,50 @@ def test_simplescript(self): gc3libs.session.Session.STORE_URL_FILENAME, )) + def test_simpledaemon_d(self): - wdir = os.path.join(self.env.base_path, 'wdir') proc = subprocess.Popen([ 'python', os.path.join(self.scriptdir, 'simpledaemon.py',), '--config-files', self.cfgfile, 'server', '-C', '1', - '--working-dir', wdir, + '--working-dir', self.basedir, '-r', 'localhost', ],) - clean_exit = False for i in range(10): # Wait up to 10 seconds time.sleep(1) - - if os.path.isdir(os.path.join(wdir, 'EchoApp')): + if os.path.isdir(os.path.join(self.basedir, 'EchoApp')): clean_exit = True break + else: + clean_exit = False # Kill the daemon # We should have a pidfile - pidfile = os.path.join(wdir, 'simpledaemon.pid') + pidfile = os.path.join(self.basedir, 'simpledaemon.pid') assert os.path.isfile(pidfile) pid = open(pidfile).read() os.kill(int(pid), signal.SIGTERM) assert clean_exit, "Daemon didn't complete after 10 seconds" - assert os.path.isdir(wdir) # Since it's a daemon, this shouldn't be needed proc.kill() # a logfile - assert os.path.isfile(os.path.join(wdir, 'simpledaemon.log')) + assert os.path.isfile(os.path.join(self.basedir, 'simpledaemon.log')) # the output directory - assert os.path.isdir(os.path.join(wdir, 'EchoApp')) + assert os.path.isdir(os.path.join(self.basedir, 'EchoApp')) + def test_simpledaemon_inbox(self): - wdir = os.path.join(self.env.base_path, 'wdir') - inboxdir = os.path.join(wdir, 'inbox') + inboxdir = os.path.join(self.basedir, 'inbox') + proc = subprocess.Popen([ 'python', os.path.join(self.scriptdir, 'simpledaemon.py',), @@ -191,37 +188,39 @@ def test_simpledaemon_inbox(self): '-vvv', 'server', '-C', '1', - '--working-dir', wdir, + '--working-dir', self.basedir, '-r', 'localhost', inboxdir, ],) - clean_exit = False # Wait until the daemon is up and running. # It will create the directory, so let's wait until then. - daemon_running = False for i in range(10): if os.path.isdir(inboxdir): daemon_running = True break time.sleep(1) + else: + daemon_running = False assert daemon_running - fd = open(os.path.join(inboxdir, 'foo'), 'w+') - fd.close() + + # create marker file + with open(os.path.join(inboxdir, 'foo'), 'w+') as fd: + fd.write('contents') for i in range(10): # Wait up to 10 seconds time.sleep(1) - # Create fake file - - if os.path.isdir(os.path.join(wdir, 'LSApp.foo')): + if os.path.isdir(os.path.join(self.basedir, 'LSApp.foo')): clean_exit = True break + else: + clean_exit = False # Kill the daemon # We should have a pidfile - pidfile = os.path.join(wdir, 'simpledaemon.pid') + pidfile = os.path.join(self.basedir, 'simpledaemon.pid') assert os.path.isfile(pidfile) pid = open(pidfile).read() @@ -229,16 +228,16 @@ def test_simpledaemon_inbox(self): os.kill(int(pid), signal.SIGHUP) assert clean_exit, "Daemon didn't complete after 10 seconds" - assert os.path.isdir(wdir) # Since it's a daemon, this shouldn't be needed proc.kill() # a logfile - assert os.path.isfile(os.path.join(wdir, 'simpledaemon.log')) + assert os.path.isfile(os.path.join(self.basedir, 'simpledaemon.log')) # the output directory - assert os.path.isdir(os.path.join(wdir, 'EchoApp')) + assert os.path.isdir(os.path.join(self.basedir, 'EchoApp')) + # main: run tests