Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle-ES consistency #240

Merged
merged 28 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9903938
Add consistency query to stage 009.
Evildoor Mar 27, 2019
42d55b3
Add script for consistency check to stage 069.
Evildoor Mar 29, 2019
f27f278
Get index name from config rather than code.
Evildoor Apr 2, 2019
af9f212
Check for index' existence before working.
Evildoor Apr 2, 2019
4c560a6
Update documentation.
Evildoor Apr 2, 2019
2296f6d
Add a very basic consistency check script.
Evildoor Apr 2, 2019
b8a2ab1
Generalize 069-consistency.
Evildoor Apr 4, 2019
acfe45a
Save and display the info about different tasks.
Evildoor Apr 5, 2019
e39efe2
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 5, 2019
8a71791
Move certain shell functions to library.
Evildoor Apr 5, 2019
90380a9
Remove DEBUG mode.
Evildoor Apr 17, 2019
7bac202
Move ES consistency script into a separate stage.
Evildoor Apr 17, 2019
12dd86e
Update a query description.
Evildoor Apr 18, 2019
944b5a2
Update and explain a magic number.
Evildoor Apr 18, 2019
26a1dfe
Reword es_connect() description.
Evildoor Apr 18, 2019
20875b1
Change log prefixes to standard ones.
Evildoor Apr 18, 2019
46cf0af
Fix pop() results handling.
Evildoor Apr 18, 2019
72d85a9
Update ES parameters handling.
Evildoor Apr 18, 2019
cacba11
Remove batching of inconsistent records.
Evildoor Apr 18, 2019
4d8eb83
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 19, 2019
181fb14
Add consistency data samples.
Evildoor Apr 19, 2019
7117242
Update the dataflow README.
Evildoor Apr 19, 2019
165c5d2
Ignore two additional fields.
Evildoor May 21, 2019
8f84d22
Change messages formatting.
Evildoor May 22, 2019
d195650
Add _parent field handling.
Evildoor May 22, 2019
612bf52
Remove service fields before checking.
Evildoor May 22, 2019
01ae258
Remove interpreter directives from lib files.
Evildoor May 22, 2019
8f86ddd
Simplify a field retrieval.
Evildoor May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Utils/Dataflow/009_oracleConnector/README
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ them as NDJSON.
Currently works with specific set of queries only:
* prodsys2ES + datasets
* prodsys2ES
* consistency : simplified query that only obtains taskid and task_timestamp
for each task

The goal is to make it work with any number and combination of queries.

Expand All @@ -30,3 +32,7 @@ The 'output' directory contains two samples of production data:

sample2016.ndjson - collected with timestamps [09-05-2016 12:12:00, 09-05-2016 13:32:30]
sample2018.ndjson - collected with timestamps [15-06-2018 12:00:00, 15-06-2018 13:00:00]

Also, it contains a consistency checking sample:

consistency.ndjson - collected with timestamps [21-07-2018 00:00:00, 22-07-2018 00:00:00]
623 changes: 623 additions & 0 deletions Utils/Dataflow/009_oracleConnector/output/consistency.ndjson

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions Utils/Dataflow/009_oracleConnector/query/consistency.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Select taskid and timestamp of all tasks for specified period of time
SELECT DISTINCT
t.taskid,
TO_CHAR(t.timestamp, 'dd-mm-yyyy hh24:mi:ss') AS task_timestamp
FROM
ATLAS_DEFT.t_production_task t
WHERE
t.timestamp > :start_date AND
t.timestamp <= :end_date AND
t.pr_id %(production_or_analysis_cond)s 300
6 changes: 5 additions & 1 deletion Utils/Dataflow/016_task2es/README
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ Goes to 019 esFormat.

4. Samples
----------
The 'output' directory contains 2 samples:
The 'output' directory contains 2 samples of production data:

sample2016.ndjson - 2016 data (from 025's 2016 sample)
sample2018.ndjson - 2018 data (from 025's 2018 sample)

Also, it contains a consistency checking sample:

consistency.ndjson (from 009's consistency sample)
623 changes: 623 additions & 0 deletions Utils/Dataflow/016_task2es/output/consistency.ndjson

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions Utils/Dataflow/071_esConsistency/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
=============
* Stage 071 *
=============

1. Description
--------------
Checks that the given data is present in ElasticSearch.

Input must contain at least 2 fields:
{{{
{"_type": ..., "_id": ..., ...}
...
}}}

_type and _id are required to retrieve the document from ES. All the other
fields are compared with the document's corresponding ones. Results of the
comparison are written to stderr.

2. Running the stage
--------------------
The stage can be run as following:

./consistency.py --conf elasticsearch_config

For more information about running the stage and its arguments, use:

./consistency.py -h

235 changes: 235 additions & 0 deletions Utils/Dataflow/071_esConsistency/consistency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
#!/bin/env python
'''
Script for checking the supplied task's presence in elasticsearch.

Currently it performs the check by comparing the supplied timestamp
with the one in elasticsearch.

Authors:
Vasilii Aulov (vasilii.aulov@cern.ch)
'''
import os
import sys
import traceback

from datetime import datetime

import elasticsearch


def log(msg, prefix='DEBUG'):
''' Add prefix and current time to message and write it to stderr. '''
prefix = '(%s)' % (prefix)
# 11 = len("(CRITICAL) "), where CRITICAL is the longest log level name.
prefix = prefix.ljust(11)
sys.stderr.write('%s%s %s\n' % (prefix, datetime.now().isoformat(), msg))


try:
base_dir = os.path.dirname(__file__)
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB.dataflow.stage import JSONProcessorStage
from pyDKB.dataflow.messages import JSONMessage
from pyDKB.dataflow.exceptions import DataflowException
except Exception, err:
log('Failed to import pyDKB library: %s' % err, 'ERROR')
sys.exit(1)


es = None


INDEX = None
FOUND_DIFF = False


def load_config(fname):
''' Open elasticsearch config and obtain parameters from it.

Setup INDEX as global variable.

:param fname: config file's name
:type fname: str
'''
cfg = {
'ES_HOST': '',
'ES_PORT': '',
'ES_USER': '',
'ES_PASSWORD': '',
'ES_INDEX': ''
}
with open(fname) as f:
lines = f.readlines()
for l in lines:
if l.startswith('ES'):
key = False
value = False
try:
(key, value) = l.split()[0].split('=')
except ValueError:
pass
if key in cfg:
cfg[key] = value
global INDEX
INDEX = cfg['ES_INDEX']
return cfg


def es_connect(cfg):
''' Establish a connection to elasticsearch.

Initialize the global variable es with the resulting client object.

:param cfg: connection parameters
:type cfg: dict
'''
if not cfg['ES_HOST']:
log('No ES host specified', 'ERROR')
return False
if not cfg['ES_PORT']:
log('No ES port specified', 'ERROR')
return False

global es
if cfg['ES_USER'] and cfg['ES_PASSWORD']:
s = 'http://%s:%s@%s:%s/' % (cfg['ES_USER'],
cfg['ES_PASSWORD'],
cfg['ES_HOST'],
cfg['ES_PORT'])
else:
s = '%s:%s' % (cfg['ES_HOST'], cfg['ES_PORT'])
es = elasticsearch.Elasticsearch([s])
return True


def get_fields(index, _id, _type, fields, _parent):
''' Get fields value by given _id and _type.

:param es: elasticsearch client
:type es: elasticsearch.client.Elasticsearch
:param index: index to search in
:type index: str
:param _id: id of the document to look for
:type _id: int or str
:param _type: type of the document to look for
:type _type: str
:param fields: field names
:type fields: list

:return: field values, or False if the document was not found
:rtype: dict or bool
'''
try:
results = es.get(index=index, doc_type=_type, id=_id,
_source=fields, parent=_parent)
except elasticsearch.exceptions.NotFoundError:
return False
return results['_source']


def process(stage, message):
''' Process a message.

Implementation of :py:meth:`.AbstractProcessorStage.process` for hooking
the stage into DKB workflow.

:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with document info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
log('Incorrect data:' + str(data), 'WARN')
return False
try:
_id = data.pop('_id')
_type = data.pop('_type')
except KeyError:
log('Insufficient ES info in data:' + str(data), 'WARN')
return False

_parent = data.pop('_parent', None)

# Crutch. Remove unwanted (for now) fields added by Stage 016.
for field in ['phys_category', 'chain_data', 'chain_id']:
if field in data:
del data[field]

# Fields starting with an underscore are service fields. Some of them are
# treated in special way (see _id above). Service fields should not be
# checked, so they are removed.
data = {field: data[field] for field in data if field[0] != '_'}
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

# Do not check empty documents with valid _id and _type.
# It's unlikely that such documents will be produced in DKB. In general,
# such documents should be checked by es.exists(), and not es.get().
if not data:
log('Nothing to check for document (%s, %r)' % (_type, _id), 'WARN')
return False

es_data = get_fields(INDEX, _id, _type, data.keys(), _parent)
if data != es_data:
log('Document (%s, %r) differs between Oracle and ES: Oracle:%s ES:%s'
% (_type, _id, data, es_data), 'WARN')
out_message = JSONMessage({'_type': _type, '_id': _id})
stage.output(out_message)
global FOUND_DIFF
FOUND_DIFF = True
else:
log('Document (%s, %r) is up to date in ES' % (_type, _id), 'INFO')

return True


def main(args):
''' Parse command line arguments and run the stage.

:param argv: arguments
:type argv: list
'''

stage = JSONProcessorStage()
stage.add_argument('--conf', help='elasticsearch config', required=True)

exit_code = 0
exc_info = None
try:
stage.parse_args(args)
cfg = load_config(stage.ARGS.conf)
stage.process = process
if not es_connect(cfg):
exit_code = 4
elif not INDEX:
log('No ES index specified', 'ERROR')
exit_code = 5
elif not es.indices.exists(INDEX):
log('No such index: %s' % INDEX, 'ERROR')
exit_code = 6
else:
stage.run()
except (DataflowException, RuntimeError), err:
if str(err):
log(err, 'ERROR')
exit_code = 2
except Exception:
exc_info = sys.exc_info()
exit_code = 3
finally:
stage.stop()

if exc_info:
trace = traceback.format_exception(*exc_info)
for line in trace:
log(line, 'ERROR')

if exit_code == 0 and FOUND_DIFF:
exit_code = 1

exit(exit_code)


if __name__ == '__main__':
main(sys.argv[1:])
1 change: 1 addition & 0 deletions Utils/Dataflow/071_esConsistency/input
11 changes: 11 additions & 0 deletions Utils/Dataflow/README
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
1.9 019_oracle2esFormat Prepare data for bulk upload to ES
2.5 025_chicagoES Get additional metadata from Chicago ES
6.9 069_upload2es Upload data to ES
7.1 071_esConsistency Check data in ES
9.1 091_datasetsRucio Get dataset metadata from Rucio
9.3 093_datasetsFormat Update dataset metadata: add
"data_format" field
Expand Down Expand Up @@ -79,6 +80,16 @@
|
** ----> 019 -> 069 | Prepare and upload metadata to ES

3. Tasks metadata consistency check
---

009 ---> 016 ---> 071

This is a simplified and slightly changed version of the previous dataflow,
intended for making sure that information is consistent between ProdSys2 and
ES. It gets a very basic set of metadata from ProdSys2, adds ES-related
fields, and checks that it is present in ES rather than uploading it.

==============
* REFERENCES *
==============
Expand Down
25 changes: 25 additions & 0 deletions Utils/Dataflow/run/data4es-consistency-check
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash -l

base_dir=$( cd "$(dirname "$(readlink -f "$0")")"; pwd)
lib="$base_dir/../shell_lib"

# Directories with configuration files
[ -n "$DATA4ES_CONSISTENCY_CONFIG_PATH" ] && \
CONFIG_PATH="$DATA4ES_CONSISTENCY_CONFIG_PATH" || \
CONFIG_PATH="${base_dir}/../config:${base_dir}/../../Elasticsearch/config"

source $lib/get_config
source $lib/eop_filter

# Oracle
cfg009=`get_config "consistency009.cfg"`
cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009"

# Formatting
cmd_016="$base_dir/../016_task2es/task2es.py -m s"

# ES
cfg_es=`get_config "es"`
cmd_071="$base_dir/../071_esConsistency/consistency.py -m s --conf $cfg_es"

$cmd_009 | $cmd_016 | eop_filter | $cmd_071 >/dev/null
Loading