Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[obsolete] DF/095: safe dataset metadata update #246

Open
wants to merge 4 commits into
base: 91-ds-safe
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
sys.path.append(dkb_dir)
import pyDKB
from pyDKB import storages
from pyDKB.common.utils import read_es_config
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand Down Expand Up @@ -103,34 +104,6 @@ def main(argv):
sys.exit(exit_code)


def read_es_config(cfg_file):
""" Read ES configuration file.

:param cfg_file: open file descriptor with ES access configuration
:type cfg_file: file descriptor
"""
keys = {'ES_HOST': 'host',
'ES_PORT': 'port',
'ES_USER': 'user',
'ES_PASSWORD': '__passwd',
'ES_INDEX': 'index'
}
cfg = {}
for line in cfg_file.readlines():
if line.strip().startswith('#'):
continue
line = line.split('#')[0].strip()
if '=' not in line:
continue
key, val = line.split('=')[:2]
try:
cfg[keys[key]] = val
except KeyError:
sys.stderr.write("(WARN) Unknown configuration parameter: "
"'%s'.\n" % key)
return cfg


def init_rucio_client():
""" Initialize global variable `rucio_client`. """
global rucio_client
Expand Down
55 changes: 55 additions & 0 deletions Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
import sys
import os
import argparse

try:
import pyAMI.client
import pyAMI.atlas.api as AtlasAPI
Expand All @@ -17,6 +19,8 @@
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB import storages
from pyDKB.common.utils import read_es_config
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand All @@ -39,10 +43,22 @@ def main(argv):
stage.add_argument('--userkey', help='PEM key file', required=True)
stage.add_argument('--usercert', help='PEM certificate file',
required=True)
stage.add_argument('--es-config', action='store',
type=argparse.FileType('r'),
help=u'Use ES as a backup source for dataset info'
' in order to save information even if it was'
' removed from the original source',
nargs='?',
dest='es'
)

exit_code = 0
try:
stage.parse_args(argv)
if stage.ARGS.es:
cfg = read_es_config(stage.ARGS.es)
stage.ARGS.es.close()
storages.create("ES", storages.storageType.ES, cfg)
stage.process = process
init_ami_client(stage.ARGS.userkey, stage.ARGS.usercert)
stage.run()
Expand Down Expand Up @@ -91,6 +107,7 @@ def process(stage, message):
# or not set at all.
if update or not formats:
amiPhysValues(data)
fix_ds_info(data)
stage.output(pyDKB.dataflow.messages.JSONMessage(data))

return True
Expand Down Expand Up @@ -153,5 +170,43 @@ def remove_tid(dataset):
return re.sub('_tid(.)+', '', dataset)


def fix_ds_info(data):
""" Fix dataset metadata with data from ES, if needed and possible.

:param data: data
:type data: dict

:return: None if update is not requested (ES client not configured) or
not possible (ES does not contain information of given dataset);
else -- True
:rtype: bool, NoneType
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
return None
mfields = [item['es'] for item in PHYS_VALUES]
update_required = False
for f in mfields:
if not data.get(f):
update_required = True
break
if update_required:
try:
r = es.get(data.get('datasetname'), mfields,
doc_type='output_dataset',
parent=data.get('_parent'))
except storages.exceptions.NotFound, err:
sys.stderr.write("(DEBUG) %s.\n" % err)
return None
for f in mfields:
if not data.get(f) and r.get(f) != data.get(f):
sys.stderr.write("(DEBUG) Update AMI info with data from ES:"
" %s = '%s' (was: '%s')\n" % (f, r.get(f),
data.get(f)))
data[f] = r[f]
return True


if __name__ == '__main__':
main(sys.argv[1:])
1 change: 0 additions & 1 deletion Utils/Dataflow/pyDKB/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@
from exceptions import *
import hdfs
import json_utils as json
from custom_readline import custom_readline
from Type import Type
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Implementation of "readline"-like functionality for custom separator.
pyDKB.common.utils

Miscellaneous useful functions.

.. todo:: make import of ``fcntl`` (or of this module) optional
to avoid errors when library is used under Windows.
Expand Down Expand Up @@ -52,3 +54,34 @@ def custom_readline(f, newline):
pos = buf.index(newline)
yield buf[:pos]
buf = buf[pos + len(newline):]


def read_es_config(cfg_file):
""" Read ES configuration file.

We have ES config in form of file with shell variables declaration,
but sometimes need to parse it in Python as well.

:param cfg_file: open file descriptor with ES access configuration
:type cfg_file: file descriptor
"""
keys = {'ES_HOST': 'host',
'ES_PORT': 'port',
'ES_USER': 'user',
'ES_PASSWORD': '__passwd',
'ES_INDEX': 'index'
}
cfg = {}
for line in cfg_file.readlines():
if line.strip().startswith('#'):
continue
line = line.split('#')[0].strip()
if '=' not in line:
continue
key, val = line.split('=')[:2]
try:
cfg[keys[key]] = val
except KeyError:
sys.stderr.write("(WARN) Unknown configuration parameter: "
"'%s'.\n" % key)
return cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from . import Message
from pyDKB.dataflow import DataflowException
from pyDKB.common import hdfs
from pyDKB.common import custom_readline
from pyDKB.common.utils import custom_readline


class AbstractProcessorStage(AbstractStage):
Expand Down