Skip to content

Commit

Permalink
DF/095: allow stage to use DKB ES storage as second (backup) source.
Browse files Browse the repository at this point in the history
If `--es-config FILE` parameter is specified, use DKB ES storage
as a backup metadata source in case that in primary source (AMI)
information was removed.

I am not sure under what curcumstances information can be removed from
AMI, so for now we just check if there are empty/missed fields in the
data taken from AMI and then, if such firlds found, check ES for
(possibly) already known values.

The problem is that there are quite a lot of "missed" values, so almost
for every record we have to check both AMI and ES. Maybe there is more
delicate trigger, like "No data at all", or "if dataset property
'deleted' is set to True", or...

Or maybe AMI just doesn't remove data at all?..
  • Loading branch information
mgolosova committed Apr 27, 2019
1 parent 2b4a50e commit a9d10ac
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
import sys
import os
import argparse

try:
import pyAMI.client
import pyAMI.atlas.api as AtlasAPI
Expand All @@ -17,6 +19,8 @@
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB import storages
from pyDKB.common.utils import read_es_config
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand All @@ -39,10 +43,22 @@ def main(argv):
stage.add_argument('--userkey', help='PEM key file', required=True)
stage.add_argument('--usercert', help='PEM certificate file',
required=True)
stage.add_argument('--es-config', action='store',
type=argparse.FileType('r'),
help=u'Use ES as a backup source for dataset info'
' in order to save information even if it was'
' removed from the original source',
nargs='?',
dest='es'
)

exit_code = 0
try:
stage.parse_args(argv)
if stage.ARGS.es:
cfg = read_es_config(stage.ARGS.es)
stage.ARGS.es.close()
storages.create("ES", storages.storageType.ES, cfg)
stage.process = process
init_ami_client(stage.ARGS.userkey, stage.ARGS.usercert)
stage.run()
Expand Down Expand Up @@ -91,6 +107,7 @@ def process(stage, message):
# or not set at all.
if update or not formats:
amiPhysValues(data)
fix_ds_info(data)
stage.output(pyDKB.dataflow.messages.JSONMessage(data))

return True
Expand Down Expand Up @@ -153,5 +170,42 @@ def remove_tid(dataset):
return re.sub('_tid(.)+', '', dataset)


def fix_ds_info(data):
""" Fix dataset metadata with data from ES, if needed and possible.
:param data: data
:type data: dict
:return: None if update is not requested (ES client not configured) or
not possible (ES does not contain information of given dataset);
else -- True
:rtype: bool, NoneType
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
return None
mfields = [item['es'] for item in PHYS_VALUES]
update_required = False
for f in mfields:
if not data.get(f):
update_required = True
break
if update_required:
try:
r = es.get(data.get('datasetname'), mfields,
parent=data.get('_parent'))
except storages.exceptions.NotFound, err:
sys.stderr.write("(DEBUG) %s.\n" % err)
return None
for f in mfields:
if not data.get(f) and r.get(f) != data.get(f):
sys.stderr.write("(DEBUG) Update AMI info with data from ES:"
" %s = '%s' (was: '%s')\n" % (f, r.get(f),
data.get(f)))
data[f] = r[f]
return True


if __name__ == '__main__':
main(sys.argv[1:])

0 comments on commit a9d10ac

Please sign in to comment.