From 0288ffae6d959ef1825749ddf603a18545a6ade8 Mon Sep 17 00:00:00 2001 From: Riccardo Murri Date: Fri, 5 Jan 2018 22:27:48 +0100 Subject: [PATCH] Allow using a DB URL as session name. In fact, any URL pointing to a GC3Pie `Store` will do. Since, in this case, the session metadata is missing (as it's normally only stored in the session directory) then we reconstruct it or just guess. This allows one to use GC3Utils for inspecting DB-based task stores; e.g.:: gstat --brief --session postgresql://user:passwd@db.example.org/ --- docs/glossary.rst | 10 +++- gc3libs/session.py | 131 ++++++++++++++++++++++++++++++++++--------- gc3utils/commands.py | 62 ++++++++++---------- 3 files changed, 147 insertions(+), 56 deletions(-) diff --git a/docs/glossary.rst b/docs/glossary.rst index cb29a354..e6decb40 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -88,8 +88,16 @@ Glossary to the command to tag a message as "standard output" or "standard error". Session + A :term:`persistent` collection of GC3Pie tasks and jobs. Sessions - are used by `GC3Apps`:ref: to store job status across program runs. + are used by `GC3Apps`:ref: to store job status across program + runs. A session is specified by giving the filesystem path to a + *session directory*: the directory contains some files with + meta-data about the tasks that comprise the session. It is also + possible to *simulate* a session by specifying a *task store URL* + (path to a filesystem directory where the jobs are stored, or + connection URL to a database); in this case the session meta-data + will be reconstructed from the set of tasks in the store. Walltime Short for *wall-clock time*: indicates the total running time of a diff --git a/gc3libs/session.py b/gc3libs/session.py index 6d7e2fcd..d07265a4 100755 --- a/gc3libs/session.py +++ b/gc3libs/session.py @@ -21,11 +21,13 @@ # stdlib imports +import atexit import csv import itertools import os import sys import shutil +import tempfile # GC3Pie imports import gc3libs @@ -37,7 +39,6 @@ class Session(list): - """ A 'session' is a persistent collection of tasks. @@ -131,7 +132,8 @@ class Session(list): DEFAULT_JOBS_DIR = 'jobs' - def __init__(self, path, create=True, store_or_url=None, **extra_args): + def __init__(self, path, create=True, store_or_url=None, + load=True, **extra_args): """ First argument `path` is the path to the session directory. @@ -158,6 +160,12 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args): By default `gc3libs.persistence.filesystem.FileSystemStore`:class: (which see) is used for providing a new session with a store. + + Finally, if optional argument `load` is ``False`` then an + already-existing session at `path` will be discarded and a new + one will be created in its place. By default, `load` is + ``True``, meaning that data from existing sessions is loaded + into memory. """ self.path = os.path.abspath(path) self.name = os.path.basename(self.path) @@ -167,7 +175,7 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args): self.finished = -1 # load or make session - if os.path.isdir(self.path): + if os.path.isdir(self.path) and load: # Session already exists? try: self._load_session(**extra_args) @@ -190,7 +198,7 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args): self._create_session(store_or_url, **extra_args) else: raise gc3libs.exceptions.InvalidArgument( - "Session '%s' not found" % self.path) + "Session directory '%s' not found" % self.path) def _create_session(self, store_or_url, **extra_args): if isinstance(store_or_url, gc3libs.persistence.store.Store): @@ -256,28 +264,7 @@ def _load_session(self, store=None, **extra_args): "Unable to recover starting time from existing session:" " file `%s` is missing." % (start_file)) - for task_id in ids: - try: - self.tasks[task_id] = self.store.load(task_id) - except Exception as err: - if gc3libs.error_ignored( - # context: - # - module - 'session', - # - class - 'Session', - # - method - 'load', - # - actual error class - err.__class__.__name__, - # - additional keywords - 'persistence', - ): - gc3libs.log.warning( - "Ignoring error from loading '%s': %s", task_id, err) - else: - # propagate exception back to caller - raise + self.tasks = self.load_many(ids) def destroy(self): """ @@ -430,6 +417,38 @@ def load(self, obj_id): """ return self.store.load(obj_id) + def load_many(self, obj_ids): + """ + Load objects given their IDs from persistent storage. + + Return a dictionary mapping task ID to the actual + retrieved `Task`:class: object. + """ + tasks = {} + for task_id in obj_ids: + try: + tasks[task_id] = self.store.load(task_id) + except Exception as err: + if gc3libs.error_ignored( + # context: + # - module + 'session', + # - class + 'Session', + # - method + 'load', + # - actual error class + err.__class__.__name__, + # - additional keywords + 'persistence', + ): + gc3libs.log.warning( + "Ignoring error from loading '%s': %s", task_id, err) + else: + # propagate exception back to caller + raise + return tasks + def save(self, obj): """ Save an object to the persistent storage and return @@ -531,6 +550,66 @@ def set_end_timestamp(self, time=None): self.finished = self._touch_file(self.TIMESTAMP_FILES['end'], time) +class TemporarySession(Session): + """ + Create a session from a store URL. + + In contrast with the regular `Session`:class: object, a + `TemporarySession`:class: does not persist any metadata about the + task collection. + In particular: + + - The session index (list of task IDs belonging to the session) is + initialized from the entire list of jobs present in the given + `Store`:class: (unless a list is explicitly passed in the + `task_ids` argument to the constructor). This means that, unlike + plain `Session`:class: objects, two `TemporarySession`:class: + objects cannot share the same store. + + - The session directory (``path`` in the `Session`:class: + constructor) is created on a temporary location on the + filesystem and deleted when the :class:`TemporarySession` is + destroyed. + + - Timestamps will be set to the time the `TemporarySession`:class: + Python object is created; two `TemporarySession`:class: + instances with the same backing store can have different + creation timestamps, depending on when exactly they were + instanciated. + + The `TemporarySession`:class: is only provided as a convenience to + use code that was built on top of a `Session`:class: with a + "naked" `Store`:class:. + """ + + def __init__(self, store_or_url, task_ids=None, delete=True, **extra_args): + # make temporary session dir + path = tempfile.mkdtemp( + prefix='gc3pie.TemporarySession.', + suffix='.d') + # ensure temp directory is deleted + if delete: + def cleanup(): + if os.path.exists(path): + shutil.rmtree(path) + atexit.register(cleanup) + # init `Session` class + super(TemporarySession, self).__init__( + path, True, store_or_url, False, **extra_args) + # populate index + if task_ids is None: + try: + task_ids = self.store.list() + except NotImplementedError: + raise RuntimeError( + "Cannot create temporary session:" + " Task store `{0}` does not support listing all task IDs.") + self.tasks = self.load_many(task_ids) + self._save_index_file() + # use url as the session name + self.name = str(self.store.url) + + # main: run tests if "__main__" == __name__: diff --git a/gc3utils/commands.py b/gc3utils/commands.py index fc0a3672..8e55eb28 100755 --- a/gc3utils/commands.py +++ b/gc3utils/commands.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # """ -Implementation of the `core` command-line front-ends. +Implementation of the command-line front-ends. """ # Copyright (C) 2009-2018 University of Zurich. All rights reserved. # @@ -45,10 +45,11 @@ # local modules from gc3libs import __version__, Run from gc3libs.quantity import Duration, Memory -from gc3libs.session import Session +from gc3libs.session import Session, TemporarySession import gc3libs.cmdline import gc3libs.exceptions import gc3libs.persistence +from gc3libs.url import Url import gc3libs.utils as utils @@ -171,18 +172,31 @@ def _get_tasks(self, task_ids, ignore_failures=True): else: raise - def _get_session(self, name): + def _get_session(self, url): """ Return a `gc3libs.session.Session` object corresponding to the - session identified by `name`. + session identified by `url`. :raise gc3libs.exceptions.InvalidArgument: If the session cannot be loaded (e.g., does not exist). """ try: - return Session(name, create=False) + url = Url(url) + if url.scheme == 'file': + return Session(url.path, create=False) + else: + return TemporarySession(url) except gc3libs.exceptions.InvalidArgument as err: - raise RuntimeError('Session {0} not found: {1}'.format(name, err)) + raise RuntimeError( + "Cannot load session `{0}`: {1}".format(url, err)) + + def _list_all_tasks(self): + try: + return self.session.store.list() + except NotImplementedError: + raise NotImplementedError( + "Task storage module does not allow listing all tasks." + " Please specify the task IDs you wish to operate on.") # ====== Main ======== @@ -222,7 +236,7 @@ def main(self): "Option '-A' conflicts with list of job IDs to remove.") if self.params.all: - args = [job.persistent_id for job in self.session.iter_workflow()] + self.params.args = self._list_all_tasks() if len(args) == 0: self.log.info("No jobs in session: nothing to do.") else: @@ -365,7 +379,6 @@ def main(self): only_keys = self.params.keys.split(',') else: if self.params.verbose < 2: - def names_not_starting_with_underscore(name): return not name.startswith('_') only_keys = names_not_starting_with_underscore @@ -381,13 +394,7 @@ def names_not_starting_with_underscore(name): if len(self.params.args) == 0: # if no arguments, operate on all known jobs - try: - self.params.args = [job.persistent_id - for job in self.session.iter_workflow()] - except NotImplementedError: - raise NotImplementedError( - "Job storage module does not allow listing all jobs." - " Please specify the job IDs you wish to operate on.") + self.params.args = self._list_all_tasks() if posix.isatty(sys.stdout.fileno()): # try to screen width @@ -615,9 +622,7 @@ def main(self): if len(self.params.args) == 0: # if no arguments, operate on all known jobs - # self.params.args = self.session.list_ids() - self.params.args = [job.persistent_id - for job in self.session.iter_workflow()] + self.params.args = self._list_all_tasks() if len(self.params.args) == 0: print("No jobs submitted.") @@ -668,10 +673,9 @@ def main(self): app.update_state() self.session.store.replace(jobid, app) if states is None or app.execution.in_state(*states): - # XXX: use `... if ... else ...` in Py > 2.4 - if hasattr(app, 'jobname'): + try: jobname = app.jobname - else: + except AttributeError: jobname = '' key_values = [] @@ -816,7 +820,7 @@ def main(self): " use either '-A' or explicitly list task IDs.") if self.params.all: - args = [job.persistent_id for job in self.session.iter_workflow()] + args = self._list_all_tasks() if len(args) == 0: self.log.info("No jobs in session: nothing to do.") else: @@ -916,7 +920,7 @@ def main(self): "Option '-A' conflicts with list of job IDs to remove.") if self.params.all: - args = [job.persistent_id for job in self.session.iter_workflow()] + args = self._list_all_tasks() if len(args) == 0: self.log.info("No jobs in session: nothing to do.") else: @@ -1279,14 +1283,14 @@ def delete_session(self): def list_jobs(self): """ - Called with subcommand `list`. + Called with subcommand ``list``. - List the content of a session, like `gstat -n -v -s SESSION` does. - Unlike `gstat`, though, display stops at the top-level jobs - unless option `--recursive` is also given. + List the content of a session, like ``gstat -n -v -s SESSION`` + does. Unlike ``gstat``, though, display stops at the top-level + jobs unless option `--recursive` is also given. - With option `--recursive`, indent job ids to show the tree-like - organization of jobs in the task collections. + With option ``--recursive``, indent job ids to show the + tree-like organization of jobs in the task collections. """ self.session = self._get_session(self.params.session)