Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[frozen] API: campaign/task events processing daily progress. #360

Draft
wants to merge 3 commits into
base: api
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Utils/API/server/dkb.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ storages:
index:
production_tasks: index_name_1
analysis_tasks: index_name_2
daily_progress: index_name_3
2 changes: 1 addition & 1 deletion Utils/API/server/lib/dkb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
CONFIG_DIR = '%%CFG_DIR%%'


__version__ = '0.2.dev20200416'
__version__ = '0.2.dev20200706'


STATUS_CODES = {
Expand Down
75 changes: 75 additions & 0 deletions Utils/API/server/lib/dkb/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,81 @@ def campaign_stat(path, rtype='json', step_type=None, events_src=None,
methods.add('/campaign', 'stat', campaign_stat)


def campaign_daily_progress(path, rtype='json', step_type=None, **kwargs):
""" Get daily progress in terms of events processing.

Returns JSON document of the following format:
```
{
...
"data": {
<step>: {
<date>: <n_events_processed_by_tasks_finished_"today">,
...
},
...
}
}
```

:param path: full path to the method
:type path: str
:param rtype: response type (only 'json' supported)
:type rtype: str

:param step_type: step definition type: 'step', 'ctag_format'
(default: 'step')
:type step_type: str

:param <selection_parameter>: defines condition to select tasks for
statistics. Parameter names are mapped
to storage record fields (names and/or
aliases). Values should be provided in
one of the following forms:
* ``None`` (field must not be presented
in selected records);
* exact field value;
* exact field value with logical prefix:
- ``&`` -- field must value this value;
- ``|`` -- field must have one of values
marked with this prefix
(default);
- ``!`` -- field must not have this value;
* list of field values (prefixed or not).
Supported parameters are:
* htag (hashtag_list);
* taskid.
:type <selection_parameter>: NoneType, str, number, list

:returns: campaign (or task) daily progress
:rtype: dict
"""
method_name = '/campaign/daily_progress'
if kwargs.get('rtype', 'json') is not 'json':
raise MethodException(method_name, "Unsupported response type: '%s'"
% kwargs['rtype'])
allowed_types = STEP_TYPES
if step_type is None:
step_type = allowed_types[0]
if (step_type not in allowed_types):
raise InvalidArgument(method_name, ('step_type', step_type,
allowed_types))
params = {}
for param in kwargs:
vals = kwargs[param]
if not isinstance(vals, list):
vals = [vals]
if vals:
vals = sort_by_prefixes(vals, ['|', '&', '!'])
params[param] = vals

return storages.campaign_daily_progress(selection_params=params,
step_type=step_type)


methods.add('/campaign', 'daily_progress', campaign_daily_progress)


def step_stat(path, rtype='json', step_type=None, **kwargs):
""" Get tasks statistics.

Expand Down
40 changes: 40 additions & 0 deletions Utils/API/server/lib/dkb/api/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,46 @@ def campaign_stat(**kwargs):
return es.campaign_stat(**kwargs)


def campaign_daily_progress(**kwargs):
""" Generate events processing daily progress report.

:param step_type: step definition type: 'step', 'ctag_format'
(default: 'step')
:type step_type: str

:param selection_params: defines conditions to select tasks for
statistics. Parameter names are mapped
to storage record fields (names and/or
aliases). Values should be provided in
one of the following forms:
* ``None`` (field must not be presented
in selected records);
* (list of) exact field value(s).
Field values are broken into categories:
* ``&`` -- field must have all these values;
* ``|`` -- field must have at least one of
these values;
* ``!`` -- field must not have none of these
values.
Expected format:
```
{
<selection_param>: {
<category>: [<values>],
...
},
...
}
```
:type selection_params: dict

:returns: daily progress of events processing in the form required
by :py:func:`api.handlers.campaign_daily_progress`
:rtype: dict
"""
return es.campaign_daily_progress(**kwargs)


def step_stat(**kwargs):
""" Calculate statistics for tasks by execution steps.

Expand Down
1 change: 1 addition & 0 deletions Utils/API/server/lib/dkb/api/storages/es/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
task_chain,
task_kwsearch,
task_derivation_statistics,
campaign_daily_progress,
campaign_stat,
step_stat)
17 changes: 13 additions & 4 deletions Utils/API/server/lib/dkb/api/storages/es/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ def output_formats(**kwargs):
r['aggregations']['formats']['buckets']]


def get_step_aggregation_query(step_type=None, selection_params={}):
def get_step_aggregation_query(step_type=None, selection_params={},
progress=False):
""" Construct "aggs" part of ES query for steps aggregation.

:raises: `ValueError`: unknown step type.
Expand All @@ -345,6 +346,9 @@ def get_step_aggregation_query(step_type=None, selection_params={}):
(see :py:func:`get_selection_query`)
:type selection_params: dict

:param progress: flag parameter for progress data
:type progress: bool

:return: "aggs" part of ES query
:rtype: dict
"""
Expand All @@ -354,7 +358,14 @@ def get_step_aggregation_query(step_type=None, selection_params={}):
elif step_type not in STEP_TYPES:
raise ValueError(step_type, "Unknown step type (expected one of: %s)"
% STEP_TYPES)
if step_type == 'ctag_format':
step_fields = {'progress_ctag_format': 'ctag_format_step',
'progress_step': 'mc_step',
'step': 'step_name.keyword'}
if progress:
step_type = 'progress_' + step_type
if step_type in step_fields:
aggs = {'steps': {'terms': {'field': step_fields[step_type]}}}
elif step_type == 'ctag_format':
formats = output_formats(**selection_params)
filters = {}
for f in formats:
Expand All @@ -366,8 +377,6 @@ def get_step_aggregation_query(step_type=None, selection_params={}):
}
aggs = {'steps': {'filters': {'filters': filters},
'aggs': {'substeps': {'terms': {'field': 'ctag'}}}}}
elif step_type == 'step':
aggs = {'steps': {'terms': {'field': 'step_name.keyword'}}}
else:
raise DkbApiNotImplemented("Aggregation by steps of type '%s' is not"
" implemented yet.")
Expand Down
86 changes: 86 additions & 0 deletions Utils/API/server/lib/dkb/api/storages/es/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,92 @@ def campaign_stat(selection_params, step_type='step', events_src=None):
return r


def campaign_daily_progress(selection_params, step_type='step'):
""" Generate events processing daily progress report.

:param step_type: step definition type: 'step', 'ctag_format'
(default: 'step')
:type step_type: str

:param selection_params: defines conditions to select tasks for
statistics. Parameter names are mapped
to storage record fields (names and/or
aliases). Values should be provided in
one of the following forms:
* ``None`` (field must not be presented
in selected records);
* (list of) exact field value(s).
Field values are broken into categories:
* ``&`` -- field must have all these values;
* ``|`` -- field must have at least one of
these values;
* ``!`` -- field must not have none of these
values.
Expected format:
```
{
<selection_param>: {
<category>: [<values>],
...
},
...
}
```
:type selection_params: dict

:returns: daily progress of events processing in the form required
by :py:func:`api.handlers.campaign_daily_progress`
:rtype: dict
"""
init()
# Construct query
query = {}
# * index with progress data
try:
query['index'] = common.CONFIG['index']['daily_progress']
except KeyError, e:
raise MethodException("Missed ES index name in configuration: %s" % e)
# * doc type
query['doc_type'] = 'task_progress'
# * and query body:
# - select tasks
q = get_selection_query(**selection_params)
# - divide them into 'steps'
step_agg = get_step_aggregation_query(step_type, progress=True)
# - get agg values for each step ('instep' aggs)
instep_aggs = get_query('campaign-daily-progress-aggs')
# - put 'instep' aggs into the innermost (sub) step clause
instep_clause = step_agg['steps']
while instep_clause.get('aggs'):
instep_clause = instep_clause['aggs'].get('substeps')
if instep_clause:
instep_clause['aggs'] = {}
instep_clause = instep_clause['aggs']
instep_clause.update(instep_aggs)
# - join 'query' and 'aggs' parts within request body
q_body = {'query': q, 'aggs': step_agg}

query['body'] = q_body
query['size'] = 0

r = {}
data = {}
try:
data = client().search(**query)
except Exception, err:
msg = "(%s) Failed to execute search query: %s." % (STORAGE_NAME,
str(err))
raise MethodException(reason=msg)
try:
data = transform.campaign_daily_progress(data)
except Exception, err:
msg = "Failed to parse storage response: %s." % str(err)
raise MethodException(reason=msg)

r.update(data)
return r


def step_stat(selection_params, step_type='step'):
""" Calculate statistics for tasks by execution steps.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"daily_progress": {
"date_histogram": {
"field": "date",
"interval": "day",
"format": "yyyy-MM-dd",
"min_doc_count": 1
},
"aggs": {
"processed_events": {
"sum": {"field": "processed_events"}
}
}
}
}
28 changes: 28 additions & 0 deletions Utils/API/server/lib/dkb/api/storages/es/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,34 @@ def campaign_stat(stat_data, events_src=None):
return r


def campaign_daily_progress(es_data):
""" ES query response transformation for ``campaign/daily_progress``.

:param data: ES response
:type data: dict

:return: prepared API response for ``campaign/daily_progress``
:rtype: dict
"""
r = {}
data = {}
r['_took_storage_ms'] = es_data.pop('took', None)
r['_total'] = es_data.get('hits', {}).pop('total', None)
r['_data'] = data
data['date_format'] = '%y-%m-%d'
steps = steps_iterator(es_data.get('aggregations', {}))
for name, step_data in steps:
data[name] = {}
hist_data = step_data.get('daily_progress', {}) \
.get('buckets', [])
for d in hist_data:
date = d.get('key_as_string')
data[name][date] = d.get('processed_events', {}) \
.get('value')

return r


def step_stat(data, agg_units=[], step_type=None):
""" Transform ES query response to required response format.

Expand Down