Skip to content

Commit

Permalink
Merge pull request #240 from PanDAWMS/oracle-es-consistency
Browse files Browse the repository at this point in the history
Add Oracle-ES consistency check functionality
  • Loading branch information
mgolosova committed May 28, 2019
2 parents 7ae2191 + 8f86ddd commit 51accc9
Show file tree
Hide file tree
Showing 14 changed files with 1,598 additions and 24 deletions.
6 changes: 6 additions & 0 deletions Utils/Dataflow/009_oracleConnector/README
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ them as NDJSON.
Currently works with specific set of queries only:
* prodsys2ES + datasets
* prodsys2ES
* consistency : simplified query that only obtains taskid and task_timestamp
for each task

The goal is to make it work with any number and combination of queries.

Expand Down Expand Up @@ -50,3 +52,7 @@ The 'output' directory contains two samples of production data:

sample2016.ndjson - collected with timestamps [09-05-2016 12:12:00, 09-05-2016 13:32:30]
sample2018.ndjson - collected with timestamps [15-06-2018 12:00:00, 15-06-2018 13:00:00]

Also, it contains a consistency checking sample:

consistency.ndjson - collected with timestamps [21-07-2018 00:00:00, 22-07-2018 00:00:00]
623 changes: 623 additions & 0 deletions Utils/Dataflow/009_oracleConnector/output/consistency.ndjson

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions Utils/Dataflow/009_oracleConnector/query/consistency.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Select taskid and timestamp of all tasks for specified period of time
SELECT DISTINCT
t.taskid,
TO_CHAR(t.timestamp, 'dd-mm-yyyy hh24:mi:ss') AS task_timestamp
FROM
ATLAS_DEFT.t_production_task t
WHERE
t.timestamp > :start_date AND
t.timestamp <= :end_date AND
t.pr_id %(production_or_analysis_cond)s 300
6 changes: 5 additions & 1 deletion Utils/Dataflow/016_task2es/README
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ Goes to 019 esFormat.

4. Samples
----------
The 'output' directory contains 2 samples:
The 'output' directory contains 2 samples of production data:

sample2016.ndjson - 2016 data (from 025's 2016 sample)
sample2018.ndjson - 2018 data (from 025's 2018 sample)

Also, it contains a consistency checking sample:

consistency.ndjson (from 009's consistency sample)
623 changes: 623 additions & 0 deletions Utils/Dataflow/016_task2es/output/consistency.ndjson

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions Utils/Dataflow/071_esConsistency/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
=============
* Stage 071 *
=============

1. Description
--------------
Checks that the given data is present in ElasticSearch.

Input must contain at least 2 fields:
{{{
{"_type": ..., "_id": ..., ...}
...
}}}

_type and _id are required to retrieve the document from ES. All the other
fields are compared with the document's corresponding ones. Results of the
comparison are written to stderr.

2. Running the stage
--------------------
The stage can be run as following:

./consistency.py --conf elasticsearch_config

For more information about running the stage and its arguments, use:

./consistency.py -h

235 changes: 235 additions & 0 deletions Utils/Dataflow/071_esConsistency/consistency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
#!/bin/env python
'''
Script for checking the supplied task's presence in elasticsearch.
Currently it performs the check by comparing the supplied timestamp
with the one in elasticsearch.
Authors:
Vasilii Aulov (vasilii.aulov@cern.ch)
'''
import os
import sys
import traceback

from datetime import datetime

import elasticsearch


def log(msg, prefix='DEBUG'):
''' Add prefix and current time to message and write it to stderr. '''
prefix = '(%s)' % (prefix)
# 11 = len("(CRITICAL) "), where CRITICAL is the longest log level name.
prefix = prefix.ljust(11)
sys.stderr.write('%s%s %s\n' % (prefix, datetime.now().isoformat(), msg))


try:
base_dir = os.path.dirname(__file__)
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB.dataflow.stage import JSONProcessorStage
from pyDKB.dataflow.messages import JSONMessage
from pyDKB.dataflow.exceptions import DataflowException
except Exception, err:
log('Failed to import pyDKB library: %s' % err, 'ERROR')
sys.exit(1)


es = None


INDEX = None
FOUND_DIFF = False


def load_config(fname):
''' Open elasticsearch config and obtain parameters from it.
Setup INDEX as global variable.
:param fname: config file's name
:type fname: str
'''
cfg = {
'ES_HOST': '',
'ES_PORT': '',
'ES_USER': '',
'ES_PASSWORD': '',
'ES_INDEX': ''
}
with open(fname) as f:
lines = f.readlines()
for l in lines:
if l.startswith('ES'):
key = False
value = False
try:
(key, value) = l.split()[0].split('=')
except ValueError:
pass
if key in cfg:
cfg[key] = value
global INDEX
INDEX = cfg['ES_INDEX']
return cfg


def es_connect(cfg):
''' Establish a connection to elasticsearch.
Initialize the global variable es with the resulting client object.
:param cfg: connection parameters
:type cfg: dict
'''
if not cfg['ES_HOST']:
log('No ES host specified', 'ERROR')
return False
if not cfg['ES_PORT']:
log('No ES port specified', 'ERROR')
return False

global es
if cfg['ES_USER'] and cfg['ES_PASSWORD']:
s = 'http://%s:%s@%s:%s/' % (cfg['ES_USER'],
cfg['ES_PASSWORD'],
cfg['ES_HOST'],
cfg['ES_PORT'])
else:
s = '%s:%s' % (cfg['ES_HOST'], cfg['ES_PORT'])
es = elasticsearch.Elasticsearch([s])
return True


def get_fields(index, _id, _type, fields, _parent):
''' Get fields value by given _id and _type.
:param es: elasticsearch client
:type es: elasticsearch.client.Elasticsearch
:param index: index to search in
:type index: str
:param _id: id of the document to look for
:type _id: int or str
:param _type: type of the document to look for
:type _type: str
:param fields: field names
:type fields: list
:return: field values, or False if the document was not found
:rtype: dict or bool
'''
try:
results = es.get(index=index, doc_type=_type, id=_id,
_source=fields, parent=_parent)
except elasticsearch.exceptions.NotFoundError:
return False
return results['_source']


def process(stage, message):
''' Process a message.
Implementation of :py:meth:`.AbstractProcessorStage.process` for hooking
the stage into DKB workflow.
:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with document info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
log('Incorrect data:' + str(data), 'WARN')
return False
try:
_id = data.pop('_id')
_type = data.pop('_type')
except KeyError:
log('Insufficient ES info in data:' + str(data), 'WARN')
return False

_parent = data.pop('_parent', None)

# Crutch. Remove unwanted (for now) fields added by Stage 016.
for field in ['phys_category', 'chain_data', 'chain_id']:
if field in data:
del data[field]

# Fields starting with an underscore are service fields. Some of them are
# treated in special way (see _id above). Service fields should not be
# checked, so they are removed.
data = {field: data[field] for field in data if field[0] != '_'}

# Do not check empty documents with valid _id and _type.
# It's unlikely that such documents will be produced in DKB. In general,
# such documents should be checked by es.exists(), and not es.get().
if not data:
log('Nothing to check for document (%s, %r)' % (_type, _id), 'WARN')
return False

es_data = get_fields(INDEX, _id, _type, data.keys(), _parent)
if data != es_data:
log('Document (%s, %r) differs between Oracle and ES: Oracle:%s ES:%s'
% (_type, _id, data, es_data), 'WARN')
out_message = JSONMessage({'_type': _type, '_id': _id})
stage.output(out_message)
global FOUND_DIFF
FOUND_DIFF = True
else:
log('Document (%s, %r) is up to date in ES' % (_type, _id), 'INFO')

return True


def main(args):
''' Parse command line arguments and run the stage.
:param argv: arguments
:type argv: list
'''

stage = JSONProcessorStage()
stage.add_argument('--conf', help='elasticsearch config', required=True)

exit_code = 0
exc_info = None
try:
stage.parse_args(args)
cfg = load_config(stage.ARGS.conf)
stage.process = process
if not es_connect(cfg):
exit_code = 4
elif not INDEX:
log('No ES index specified', 'ERROR')
exit_code = 5
elif not es.indices.exists(INDEX):
log('No such index: %s' % INDEX, 'ERROR')
exit_code = 6
else:
stage.run()
except (DataflowException, RuntimeError), err:
if str(err):
log(err, 'ERROR')
exit_code = 2
except Exception:
exc_info = sys.exc_info()
exit_code = 3
finally:
stage.stop()

if exc_info:
trace = traceback.format_exception(*exc_info)
for line in trace:
log(line, 'ERROR')

if exit_code == 0 and FOUND_DIFF:
exit_code = 1

exit(exit_code)


if __name__ == '__main__':
main(sys.argv[1:])
1 change: 1 addition & 0 deletions Utils/Dataflow/071_esConsistency/input
11 changes: 11 additions & 0 deletions Utils/Dataflow/README
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
1.9 019_oracle2esFormat Prepare data for bulk upload to ES
2.5 025_chicagoES Get additional metadata from Chicago ES
6.9 069_upload2es Upload data to ES
7.1 071_esConsistency Check data in ES
9.1 091_datasetsRucio Get dataset metadata from Rucio
9.3 093_datasetsFormat Update dataset metadata: add
"data_format" field
Expand Down Expand Up @@ -79,6 +80,16 @@
|
** ----> 019 -> 069 | Prepare and upload metadata to ES

3. Tasks metadata consistency check
---

009 ---> 016 ---> 071

This is a simplified and slightly changed version of the previous dataflow,
intended for making sure that information is consistent between ProdSys2 and
ES. It gets a very basic set of metadata from ProdSys2, adds ES-related
fields, and checks that it is present in ES rather than uploading it.

==============
* REFERENCES *
==============
Expand Down
25 changes: 25 additions & 0 deletions Utils/Dataflow/run/data4es-consistency-check
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash -l

base_dir=$( cd "$(dirname "$(readlink -f "$0")")"; pwd)
lib="$base_dir/../shell_lib"

# Directories with configuration files
[ -n "$DATA4ES_CONSISTENCY_CONFIG_PATH" ] && \
CONFIG_PATH="$DATA4ES_CONSISTENCY_CONFIG_PATH" || \
CONFIG_PATH="${base_dir}/../config:${base_dir}/../../Elasticsearch/config"

source $lib/get_config
source $lib/eop_filter

# Oracle
cfg009=`get_config "consistency009.cfg"`
cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009"

# Formatting
cmd_016="$base_dir/../016_task2es/task2es.py -m s"

# ES
cfg_es=`get_config "es"`
cmd_071="$base_dir/../071_esConsistency/consistency.py -m s --conf $cfg_es"

$cmd_009 | $cmd_016 | eop_filter | $cmd_071 >/dev/null
Loading

0 comments on commit 51accc9

Please sign in to comment.