Skip to content

Commit

Permalink
Merge pull request #256 from datosgobar/236-add-max-min-aggs
Browse files Browse the repository at this point in the history
236 add max min aggs
  • Loading branch information
lucaslavandeira committed Apr 12, 2018
2 parents 446182c + 34bbc61 commit 29c1deb
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 133 deletions.
33 changes: 9 additions & 24 deletions series_tiempo_ar_api/apps/api/query/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
AGG_DEFAULT = 'avg'
AGG_SUM = 'sum'
AGG_END_OF_PERIOD = 'end_of_period'
AGG_MAX = 'max'
AGG_MIN = 'min'
AGGREGATIONS = [
AGG_DEFAULT,
AGG_SUM,
AGG_END_OF_PERIOD,
AGG_MAX,
AGG_MIN,
]

COLLAPSE_INTERVALS = [ # EN ORDEN DE MENOR A MAYOR
Expand Down Expand Up @@ -127,28 +131,9 @@

CSV_RESPONSE_FILENAME = 'data.csv'

PARAM_PERIODICITY = 'periodicity'

# Scripts de map reduce para calcular End of Period en Elasticsearch
EOP_INIT = """
params._agg.last_date = -1;
params._agg.value = 0;
"""

EOP_MAP = """
if (doc.timestamp.value > params._agg.last_date) {
params._agg.last_date = doc.timestamp.value;
params._agg.value = doc.value.value;
}
"""

EOP_REDUCE = """
double value = -1;
long last_date = 0;
for (a in params._aggs) {
if (a != null && a.last_date > last_date && a.value != 0.0) {
value = a.value;
last_date = a.last_date;
}
}
return value
"""
IN_MEMORY_AGGS = [
AGG_MAX,
AGG_MIN
]
108 changes: 11 additions & 97 deletions series_tiempo_ar_api/apps/api/query/es_query/es_query.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#! coding: utf-8
from django.conf import settings
from elasticsearch_dsl import MultiSearch, Q, Search
from iso8601 import iso8601
from elasticsearch_dsl import MultiSearch

from series_tiempo_ar_api.apps.api.exceptions import QueryError
from series_tiempo_ar_api.apps.api.helpers import get_relative_delta
from series_tiempo_ar_api.apps.api.query import constants
from series_tiempo_ar_api.apps.api.query import strings
from series_tiempo_ar_api.apps.api.query.es_query.response_formatter import ResponseFormatter
from series_tiempo_ar_api.apps.api.query.es_query.series import Series
from series_tiempo_ar_api.libs.indexing.elastic import ElasticInstance

Expand All @@ -25,8 +24,6 @@ def __init__(self, index):
self.elastic = ElasticInstance()
self.data = []

self.data_dict = {}
self.periodicity = None
# Parámetros que deben ser guardados y accedidos varias veces
self.args = {
constants.PARAM_START: constants.API_DEFAULT_VALUES[constants.PARAM_START],
Expand All @@ -41,8 +38,8 @@ def add_series(self, series_id, rep_mode, periodicity,
if periodicity == constants.COLLAPSE_INTERVALS[-1]:
collapse_agg = constants.AGG_DEFAULT

self.args[constants.PARAM_PERIODICITY] = periodicity
self._init_series(series_id, rep_mode, collapse_agg)
self.periodicity = periodicity

def get_series_ids(self):
"""Devuelve una lista de series cargadas"""
Expand All @@ -66,20 +63,13 @@ def sort(self, how):
self.args[constants.PARAM_SORT] = how

def add_collapse(self, interval):
self.periodicity = interval
self.args[constants.PARAM_PERIODICITY] = interval

def _init_series(self, series_id, rep_mode, collapse_agg):
search = Search(using=self.elastic, index=self.index)
end = self.args[constants.PARAM_START] + self.args[constants.PARAM_LIMIT]
search = search[self.args[constants.PARAM_START]:end]
search = search.sort(settings.TS_TIME_INDEX_FIELD) # Default: ascending sort
# Filtra los resultados por la serie pedida
search = search.filter('bool',
must=[Q('match', series_id=series_id),
Q('match', aggregation=collapse_agg)])
self.series.append(Series(series_id=series_id,
index=self.index,
rep_mode=rep_mode,
search=search,
args=self.args,
collapse_agg=collapse_agg))

def add_pagination(self, start, limit):
Expand All @@ -97,14 +87,8 @@ def add_filter(self, start=None, end=None):
if not len(self.series):
raise QueryError(strings.EMPTY_QUERY_ERROR)

_filter = {
'lte': end,
'gte': start
}
for serie in self.series:
# Agrega un filtro de rango temporal a la query de ES
serie.search = serie.search.filter('range',
timestamp=_filter)
serie.add_range_filter(start, end)

def get_data_start_end_dates(self):
if not self.data:
Expand All @@ -128,81 +112,11 @@ def run(self):
using=self.elastic)

for serie in self.series:
search = serie.search
search = search.filter('bool',
must=[Q('match', interval=self.periodicity)])
multi_search = multi_search.add(search)
serie.add_collapse(self.args[constants.PARAM_PERIODICITY])
multi_search = multi_search.add(serie.search)

responses = multi_search.execute()
self._format_response(responses)
formatter = ResponseFormatter(self.series, responses, self.args)
self.data = formatter.format_response()
# Devuelvo hasta LIMIT values
return self.data[:self.args[constants.PARAM_LIMIT]]

def _format_response(self, responses):
"""Procesa la respuesta recibida de Elasticsearch, la guarda en
el diccionario data_dict con el siguiente formato
self.data_dict = {
"1990-01-01": { "serie_1": valor1, "serie_2": valor2, ... },
"1990-02-01": { "serie_1": valor1, "serie_2": valor2, ... }
}
Luego el diccionario es pasado a la lista de listas final
self.data para conformar la respuesta esperada de lista de listas
"""

for i, response in enumerate(responses):
rep_mode = self.series[i].rep_mode

for hit in response:
data = hit[rep_mode] if rep_mode in hit else None
timestamp_dict = self.data_dict.setdefault(hit.timestamp, {})
timestamp_dict[self._data_dict_series_key(self.series[i])] = data

if not self.data_dict: # No hay datos
return

self._make_date_index_continuous(min(self.data_dict.keys()),
max(self.data_dict.keys()))

# Ordeno las timestamp según si el sort es asc o desc usando función de comparación
def cmp_func(one, other):
if one == other:
return 0

if self.args[constants.PARAM_SORT] == constants.SORT_ASCENDING:
return -1 if one < other else 1
else:
return 1 if one < other else -1

for timestamp in sorted(self.data_dict.keys(), cmp=cmp_func):
row = [timestamp]

for series in self.series:
row.append(self.data_dict[timestamp].get(self._data_dict_series_key(series)))

self.data.append(row)

@staticmethod
def _data_dict_series_key(series):
"""Key única para identificar la serie pedida en el data_dict armado. Evita
que se pisen series en queries que piden la misma serie con distintos rep modes
o aggs (ver issue #243)
"""
return series.series_id + series.rep_mode + series.collapse_agg

def _make_date_index_continuous(self, start_date, end_date):
"""Hace el índice de tiempo de los resultados continuo (según
el intervalo de resultados), sin saltos, entre start_date y end_date.
Esto implica llenar el diccionario self.data_dict con claves de los
timestamp faltantes para asegurar la continuidad
"""

# Si no hay datos cargados no hay nada que hacer
if not len(self.data_dict):
return

current_date = iso8601.parse_date(start_date)
end_date = iso8601.parse_date(end_date)

while current_date < end_date:
current_date += get_relative_delta(self.periodicity)
self.data_dict.setdefault(unicode(current_date.date()), {})
91 changes: 91 additions & 0 deletions series_tiempo_ar_api/apps/api/query/es_query/response_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#! coding: utf-8
from iso8601 import iso8601

from series_tiempo_ar_api.apps.api.helpers import get_relative_delta
from series_tiempo_ar_api.apps.api.query import constants


class ResponseFormatter(object):

def __init__(self, series, responses, args):
self.series = series
self.responses = responses
self.data_dict = {}
self.args = args

def format_response(self):
"""Procesa la respuesta recibida de Elasticsearch, la guarda en
el diccionario data_dict con el siguiente formato
self.data_dict = {
"1990-01-01": { "serie_1": valor1, "serie_2": valor2, ... },
"1990-02-01": { "serie_1": valor1, "serie_2": valor2, ... }
}
Luego el diccionario es pasado a la lista de listas final
self.data para conformar la respuesta esperada de lista de listas
"""
final_data = []
for i, response in enumerate(self.responses):
rep_mode = self.series[i].rep_mode

if self.series[i].collapse_agg in (constants.AGG_MIN, constants.AGG_MAX):
for hit in response.aggregations.test.buckets:
data = hit['test'][rep_mode]
timestamp_dict = self.data_dict.setdefault(hit['key_as_string'], {})
timestamp_dict[self._data_dict_series_key(self.series[i])] = data
else:
for hit in response:
data = hit[rep_mode] if rep_mode in hit else None
timestamp_dict = self.data_dict.setdefault(hit.timestamp, {})
timestamp_dict[self._data_dict_series_key(self.series[i])] = data

if not self.data_dict: # No hay datos
return []

self._make_date_index_continuous(min(self.data_dict.keys()),
max(self.data_dict.keys()))

# Ordeno las timestamp según si el sort es asc o desc usando función de comparación
def cmp_func(one, other):
if one == other:
return 0

if self.args[constants.PARAM_SORT] == constants.SORT_ASCENDING:
return -1 if one < other else 1
else:
return 1 if one < other else -1

for timestamp in sorted(self.data_dict.keys(), cmp=cmp_func):
row = [timestamp]

for series in self.series:
row.append(self.data_dict[timestamp].get(self._data_dict_series_key(series)))

final_data.append(row)

return final_data

@staticmethod
def _data_dict_series_key(series):
"""Key única para identificar la serie pedida en el data_dict armado. Evita
que se pisen series en queries que piden la misma serie con distintos rep modes
o aggs (ver issue #243)
"""
return series.series_id + series.rep_mode + series.collapse_agg

def _make_date_index_continuous(self, start_date, end_date):
"""Hace el índice de tiempo de los resultados continuo (según
el intervalo de resultados), sin saltos, entre start_date y end_date.
Esto implica llenar el diccionario self.data_dict con claves de los
timestamp faltantes para asegurar la continuidad
"""

# Si no hay datos cargados no hay nada que hacer
if not len(self.data_dict):
return

current_date = iso8601.parse_date(start_date)
end_date = iso8601.parse_date(end_date)

while current_date < end_date:
current_date += get_relative_delta(self.args[constants.PARAM_PERIODICITY])
self.data_dict.setdefault(unicode(current_date.date()), {})
53 changes: 42 additions & 11 deletions series_tiempo_ar_api/apps/api/query/es_query/series.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
#! coding: utf-8
from elasticsearch_dsl import Search
from django.conf import settings
from elasticsearch_dsl import Search, Q, A

from series_tiempo_ar_api.apps.api.query import constants
from series_tiempo_ar_api.libs.indexing.elastic import ElasticInstance


class Series(object):
def __init__(self, series_id, rep_mode, search, collapse_agg=None):
def __init__(self, index, series_id, rep_mode, args, collapse_agg=None):
self.index = index
self.elastic = ElasticInstance.get()
self.series_id = series_id
self.rep_mode = rep_mode
self.search = search or Search()
self.collapse_agg = collapse_agg
self.meta = None
self.args = args.copy()
self.collapse_agg = collapse_agg or constants.API_DEFAULT_VALUES[constants.PARAM_COLLAPSE_AGG]
self.search = self.init_search()

def init_search(self):
search = Search(using=self.elastic, index=self.index)
end = self.args[constants.PARAM_START] + self.args[constants.PARAM_LIMIT]
search = search[self.args[constants.PARAM_START]:end]
search = search.sort(settings.TS_TIME_INDEX_FIELD) # Default: ascending sort
# Filtra los resultados por la serie pedida. Si se hace en memoria filtramos
# por la agg default, y calculamos la agg pedida en runtime
agg = self.collapse_agg if self.collapse_agg not in constants.IN_MEMORY_AGGS else constants.AGG_DEFAULT
search = search.filter('bool',
must=[Q('match', series_id=self.series_id),
Q('match', aggregation=agg)])

return search

def __getitem__(self, item):
return self.__getattribute__(item)
def add_range_filter(self, start, end):
_filter = {
'lte': end,
'gte': start
}
self.search = self.search.filter('range', timestamp=_filter)

def __setitem__(self, key, value):
return self.__setattr__(key, value)
def add_collapse(self, periodicity):
if self.collapse_agg not in constants.IN_MEMORY_AGGS:
self.search = self.search.filter('bool', must=[Q('match', interval=periodicity)])

def get(self, item, default=None):
return getattr(self, item, default)
else: # Agregamos la aggregation (?) para que se ejecute en ES en runtime
self.search = self.search.filter('bool', must=[Q('match', interval=self.args['periodicity'])])
self.search.aggs.bucket('test',
A('date_histogram',
field='timestamp',
interval=periodicity,
format='yyyy-MM-dd').
metric('test', self.collapse_agg, field=self.rep_mode))
2 changes: 1 addition & 1 deletion series_tiempo_ar_api/apps/api/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _calculate_data_frequency(self):
"""Devuelve la periodicidad de la o las series pedidas. Si son
muchas devuelve el intervalo de tiempo colapsado
"""
return self.es_query.periodicity
return self.es_query.args[constants.PARAM_PERIODICITY]

def sort(self, how):
return self.es_query.sort(how)
Expand Down
Loading

0 comments on commit 29c1deb

Please sign in to comment.