diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index 6990851b3..1f750dc1a 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -37,6 +37,7 @@ sys.path.append(dkb_dir) import pyDKB from pyDKB import storages + from pyDKB.common.utils import read_es_config except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) @@ -103,34 +104,6 @@ def main(argv): sys.exit(exit_code) -def read_es_config(cfg_file): - """ Read ES configuration file. - - :param cfg_file: open file descriptor with ES access configuration - :type cfg_file: file descriptor - """ - keys = {'ES_HOST': 'host', - 'ES_PORT': 'port', - 'ES_USER': 'user', - 'ES_PASSWORD': '__passwd', - 'ES_INDEX': 'index' - } - cfg = {} - for line in cfg_file.readlines(): - if line.strip().startswith('#'): - continue - line = line.split('#')[0].strip() - if '=' not in line: - continue - key, val = line.split('=')[:2] - try: - cfg[keys[key]] = val - except KeyError: - sys.stderr.write("(WARN) Unknown configuration parameter: " - "'%s'.\n" % key) - return cfg - - def init_rucio_client(): """ Initialize global variable `rucio_client`. """ global rucio_client diff --git a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py index 9f90b5244..02536f511 100755 --- a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py +++ b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py @@ -3,6 +3,8 @@ import re import sys import os +import argparse + try: import pyAMI.client import pyAMI.atlas.api as AtlasAPI @@ -17,6 +19,8 @@ dkb_dir = os.path.join(base_dir, os.pardir) sys.path.append(dkb_dir) import pyDKB + from pyDKB import storages + from pyDKB.common.utils import read_es_config except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) @@ -39,10 +43,22 @@ def main(argv): stage.add_argument('--userkey', help='PEM key file', required=True) stage.add_argument('--usercert', help='PEM certificate file', required=True) + stage.add_argument('--es-config', action='store', + type=argparse.FileType('r'), + help=u'Use ES as a backup source for dataset info' + ' in order to save information even if it was' + ' removed from the original source', + nargs='?', + dest='es' + ) exit_code = 0 try: stage.parse_args(argv) + if stage.ARGS.es: + cfg = read_es_config(stage.ARGS.es) + stage.ARGS.es.close() + storages.create("ES", storages.storageType.ES, cfg) stage.process = process init_ami_client(stage.ARGS.userkey, stage.ARGS.usercert) stage.run() @@ -91,6 +107,7 @@ def process(stage, message): # or not set at all. if update or not formats: amiPhysValues(data) + fix_ds_info(data) stage.output(pyDKB.dataflow.messages.JSONMessage(data)) return True @@ -153,5 +170,43 @@ def remove_tid(dataset): return re.sub('_tid(.)+', '', dataset) +def fix_ds_info(data): + """ Fix dataset metadata with data from ES, if needed and possible. + + :param data: data + :type data: dict + + :return: None if update is not requested (ES client not configured) or + not possible (ES does not contain information of given dataset); + else -- True + :rtype: bool, NoneType + """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + return None + mfields = [item['es'] for item in PHYS_VALUES] + update_required = False + for f in mfields: + if not data.get(f): + update_required = True + break + if update_required: + try: + r = es.get(data.get('datasetname'), mfields, + doc_type='output_dataset', + parent=data.get('_parent')) + except storages.exceptions.NotFound, err: + sys.stderr.write("(DEBUG) %s.\n" % err) + return None + for f in mfields: + if not data.get(f) and r.get(f) != data.get(f): + sys.stderr.write("(DEBUG) Update AMI info with data from ES:" + " %s = '%s' (was: '%s')\n" % (f, r.get(f), + data.get(f))) + data[f] = r[f] + return True + + if __name__ == '__main__': main(sys.argv[1:]) diff --git a/Utils/Dataflow/pyDKB/common/__init__.py b/Utils/Dataflow/pyDKB/common/__init__.py index 024e2871f..84fad2747 100644 --- a/Utils/Dataflow/pyDKB/common/__init__.py +++ b/Utils/Dataflow/pyDKB/common/__init__.py @@ -5,5 +5,4 @@ from exceptions import * import hdfs import json_utils as json -from custom_readline import custom_readline from Type import Type diff --git a/Utils/Dataflow/pyDKB/common/custom_readline.py b/Utils/Dataflow/pyDKB/common/utils.py similarity index 61% rename from Utils/Dataflow/pyDKB/common/custom_readline.py rename to Utils/Dataflow/pyDKB/common/utils.py index 3a84d28af..dc5aa0d37 100644 --- a/Utils/Dataflow/pyDKB/common/custom_readline.py +++ b/Utils/Dataflow/pyDKB/common/utils.py @@ -1,5 +1,7 @@ """ -Implementation of "readline"-like functionality for custom separator. +pyDKB.common.utils + +Miscellaneous useful functions. .. todo:: make import of ``fcntl`` (or of this module) optional to avoid errors when library is used under Windows. @@ -52,3 +54,34 @@ def custom_readline(f, newline): pos = buf.index(newline) yield buf[:pos] buf = buf[pos + len(newline):] + + +def read_es_config(cfg_file): + """ Read ES configuration file. + + We have ES config in form of file with shell variables declaration, + but sometimes need to parse it in Python as well. + + :param cfg_file: open file descriptor with ES access configuration + :type cfg_file: file descriptor + """ + keys = {'ES_HOST': 'host', + 'ES_PORT': 'port', + 'ES_USER': 'user', + 'ES_PASSWORD': '__passwd', + 'ES_INDEX': 'index' + } + cfg = {} + for line in cfg_file.readlines(): + if line.strip().startswith('#'): + continue + line = line.split('#')[0].strip() + if '=' not in line: + continue + key, val = line.split('=')[:2] + try: + cfg[keys[key]] = val + except KeyError: + sys.stderr.write("(WARN) Unknown configuration parameter: " + "'%s'.\n" % key) + return cfg diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py index c9b254063..77b123513 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py @@ -49,7 +49,7 @@ from . import Message from pyDKB.dataflow import DataflowException from pyDKB.common import hdfs -from pyDKB.common import custom_readline +from pyDKB.common.utils import custom_readline class AbstractProcessorStage(AbstractStage):