Skip to content

Commit

Permalink
Merge c6677c8 into 9931b3d
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaslavandeira committed Nov 20, 2019
2 parents 9931b3d + c6677c8 commit 493031f
Show file tree
Hide file tree
Showing 22 changed files with 549 additions and 614 deletions.
4 changes: 0 additions & 4 deletions series_tiempo_ar_api/apps/api/models.py

This file was deleted.

144 changes: 64 additions & 80 deletions series_tiempo_ar_api/apps/api/query/es_query/es_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
from django.conf import settings
from elasticsearch_dsl import MultiSearch

from series_tiempo_ar_api.apps.api.exceptions import QueryError
from series_tiempo_ar_api.apps.api.query import constants
from series_tiempo_ar_api.apps.api.query import strings
from series_tiempo_ar_api.apps.api.query.es_query.response_formatter import ResponseFormatter
from series_tiempo_ar_api.apps.api.query.es_query.series import Serie

Expand All @@ -20,120 +18,106 @@ def __init__(self, index):
"""
self.index = index
self.series = []
self.data = None
self.count = None
self.start_dates = None
self.reverse_results = False

# Parámetros que deben ser guardados y accedidos varias veces
self.args = {
constants.PARAM_START: constants.API_DEFAULT_VALUES[constants.PARAM_START],
constants.PARAM_LIMIT: constants.API_DEFAULT_VALUES[constants.PARAM_LIMIT],
constants.PARAM_START: constants.API_DEFAULT_VALUES[constants.PARAM_START],
constants.PARAM_SORT: constants.API_DEFAULT_VALUES[constants.PARAM_SORT]
}

def add_series(self, series_id, rep_mode, periodicity,
collapse_agg=constants.API_DEFAULT_VALUES[constants.PARAM_COLLAPSE_AGG]):
self._init_series(series_id, rep_mode, collapse_agg, periodicity)
if constants.PARAM_PERIODICITY not in self.args:
self.add_collapse(periodicity)

def get_series_ids(self):
"""Devuelve una lista de series cargadas"""
return [serie.series_id for serie in self.series]

def sort(self, how):
"""Ordena los resultados por ascendiente o descendiente"""
for serie in self.series:
serie.sort(how)

# Guardo el parámetro, necesario en el evento de hacer un collapse
self.args[constants.PARAM_SORT] = how

def add_collapse(self, interval):
self.args[constants.PARAM_PERIODICITY] = interval
for serie in self.series:
serie.add_collapse(interval)

def _init_series(self, series_id, rep_mode, collapse_agg, periodicity):
self.series.append(Serie(series_id=series_id,
index=self.index,
rep_mode=rep_mode,
periodicity=periodicity,
collapse_agg=collapse_agg))

def add_pagination(self, start, limit, start_dates=None):
if not self.series:
raise QueryError(strings.EMPTY_QUERY_ERROR)

# Aplicamos paginación luego, por ahora guardamos los parámetros
def add_pagination(self, start, limit):
self.args[constants.PARAM_START] = start
self.args[constants.PARAM_LIMIT] = limit
self.start_dates = start_dates or {}

def setup_series_pagination(self):
def add_filter(self, start=None, end=None):
self.args[constants.PARAM_START_DATE] = start
self.args[constants.PARAM_END_DATE] = end

def reverse(self):
self.reverse_results = True

def run_for_series(self, series):
self.series = []
for serie in series:
self.series.append(Serie(index=self.index,
series_id=serie.identifier(),
rep_mode=serie.rep_mode,
periodicity=serie.periodicity(),
collapse_agg=serie.collapse_agg()))

start_dates = {serie.get_identifiers()['id']: serie.start_date() for serie in series}

for serie in self.series:
serie.add_pagination(self.args[constants.PARAM_START],
self.args[constants.PARAM_LIMIT],
request_start_dates=self.start_dates)

def add_filter(self, start=None, end=None):
if not self.series:
raise QueryError(strings.EMPTY_QUERY_ERROR)
start_dates or {})

if self.args.get(constants.PARAM_PERIODICITY):
for serie in self.series:
serie.add_collapse(self.args.get(constants.PARAM_PERIODICITY))
else:
self.args[constants.PARAM_PERIODICITY] = self.get_max_periodicity([
x.periodicity
for x in self.series
])

if self.args.get(constants.PARAM_START_DATE) or self.args.get(constants.PARAM_END_DATE):
for serie in self.series:
serie.add_range_filter(self.args.get(constants.PARAM_START_DATE),
self.args.get(constants.PARAM_END_DATE))

for serie in self.series:
serie.add_range_filter(start, end)
serie.sort(self.args[constants.PARAM_SORT])

def get_data_start_end_dates(self):
if not self.data:
return {}
return self._run()

return {
constants.PARAM_START_DATE: self.data[0][0],
constants.PARAM_END_DATE: self.data[-1][0]
}
@staticmethod
def get_max_periodicity(periodicities):
"""Devuelve la periodicity máxima en la lista periodicities"""
order = constants.COLLAPSE_INTERVALS
index = 0
for periodicity in periodicities:
field_index = order.index(periodicity)
index = index if index > field_index else field_index

return order[index]

def _run(self) -> dict:
result = self.execute_searches()

# Devuelvo hasta LIMIT values
result['data'] = result['data'][:self.args[constants.PARAM_LIMIT]]

if self.reverse_results:
result['data'].reverse()
return result

def execute_searches(self):
"""Ejecuta la query de todas las series agregadas, e inicializa
los atributos data y count a partir de las respuestas.
"""
if not self.series:
raise QueryError(strings.EMPTY_QUERY_ERROR)

multi_search = MultiSearch(index=self.index,
doc_type=settings.TS_DOC_TYPE)

for serie in self.series:
self.setup_series_pagination()
multi_search = multi_search.add(serie.search)

responses = multi_search.execute()
formatter = ResponseFormatter(self.series, responses, self.args)
self.data = formatter.format_response()

self.count = max([response.hits.total for response in responses])

def get_results_data(self) -> list:
if self.data is None:
raise RuntimeError(strings.DATA_NOT_INITIALIZED)

# Devuelvo hasta LIMIT values
data = self.data[:self.args[constants.PARAM_LIMIT]]

if self.reverse_results:
data.reverse()
return data

def get_results_count(self) -> int:
if self.count is None:
raise RuntimeError(strings.DATA_NOT_INITIALIZED)

return self.count
formatter = ResponseFormatter(self.series, responses,
self.args[constants.PARAM_SORT],
self.args[constants.PARAM_PERIODICITY])

def run(self) -> list:
"""Equivalente a execute_searches + get_results_data"""
self.execute_searches()
return self.get_results_data()

def reverse(self):
self.reverse_results = True
return {
'data': (formatter.format_response()),
'count': max([response.hits.total for response in responses])
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

class ResponseFormatter:

def __init__(self, series, responses, args):
def __init__(self, series, responses, sort, periodicity):
self.series = series
self.responses = responses
self.data_dict = {}
self.args = args
self.sort = sort
self.periodicity = periodicity

def format_response(self):
"""Procesa la respuesta recibida de Elasticsearch, la guarda en
Expand Down Expand Up @@ -46,7 +47,7 @@ def format_response(self):
max(self.data_dict.keys()))

for timestamp in sorted(self.data_dict.keys(),
reverse=self.args[constants.PARAM_SORT] != constants.SORT_ASCENDING):
reverse=self.sort != constants.SORT_ASCENDING):
row = [timestamp]

for series in self.series:
Expand Down Expand Up @@ -83,5 +84,5 @@ def _make_date_index_continuous(self, start_date, end_date):
end_date = iso8601.parse_date(end_date)

while current_date < end_date:
current_date += get_relative_delta(self.args[constants.PARAM_PERIODICITY])
current_date += get_relative_delta(self.periodicity)
self.data_dict.setdefault(str(current_date.date()), {})
75 changes: 55 additions & 20 deletions series_tiempo_ar_api/apps/api/query/es_query/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,23 @@


class Serie:
"""Patrón Builder sobre una query de Elasticsearch"""
def __init__(self, index, series_id, rep_mode, periodicity, collapse_agg=None):
self.index = index
self.series_id = series_id
self.rep_mode = rep_mode
self.original_periodicity = periodicity
self.periodicity = periodicity
self.collapse_agg = collapse_agg or constants.API_DEFAULT_VALUES[constants.PARAM_COLLAPSE_AGG]
self._search = self._init_search()

self._search = None
self.search_options = {
constants.PARAM_START: constants.API_DEFAULT_VALUES[constants.PARAM_START],
constants.PARAM_LIMIT: constants.API_DEFAULT_VALUES[constants.PARAM_LIMIT],
constants.PARAM_START_DATE: None,
constants.PARAM_END_DATE: None,
constants.PARAM_SORT: settings.TS_TIME_INDEX_FIELD, # Sort ascendiente por key
}

def _init_search(self):
search = Search(index=self.index)
Expand All @@ -27,24 +36,29 @@ def _init_search(self):
return search

def add_range_filter(self, start, end):
_filter = {
'lte': end,
'gte': start
}
self._search = self._search.filter('range', timestamp=_filter)
self.search_options[constants.PARAM_START_DATE] = start
self.search_options[constants.PARAM_END_DATE] = end

def add_collapse(self, periodicity):
self.periodicity = periodicity

def add_pagination(self, start, limit, request_start_dates=None):
# ☢️☢️☢️
es_start = self.get_es_start(request_start_dates, start)

es_offset = start + limit
if self.rep_mode != constants.API_DEFAULT_VALUES[constants.PARAM_REP_MODE]:
es_offset += extra_offset(self.periodicity)
self.search_options[constants.PARAM_START] = self.get_es_start(request_start_dates, start)

offset = start + limit
extra_offsets = {
constants.VALUE: 0,
constants.CHANGE: 1,
constants.PCT_CHANGE: 1,
constants.CHANGE_BEG_YEAR: 0,
constants.PCT_CHANGE_BEG_YEAR: 0,
constants.CHANGE_YEAR_AGO: extra_offset(self.periodicity),
constants.PCT_CHANGE_YEAR_AGO: extra_offset(self.periodicity),
}
self.search_options[constants.PARAM_START] += extra_offsets[self.rep_mode]
offset += extra_offsets[self.rep_mode]

self._search = self._search[es_start:es_offset]
self.search_options[constants.PARAM_LIMIT] = offset

def get_es_start(self, request_start_dates, start):
"""Calcula el comienzo de la query para esta serie particular. El parámetro
Expand All @@ -57,31 +71,47 @@ def get_es_start(self, request_start_dates, start):
"""
min_date = min(request_start_dates.values()) if request_start_dates else None
start_date = request_start_dates.get(self.series_id) if request_start_dates else None
es_start = start - self.serie_first_date_offset(start_date,
min_date)
es_start = start - self._serie_first_date_offset(start_date,
min_date)
es_start = max(es_start, 0)
return es_start

def serie_first_date_offset(self, series_start_date: datetime, min_start_date: datetime):
def _serie_first_date_offset(self, series_start_date: datetime, min_start_date: datetime):
if series_start_date is None or min_start_date is None:
return 0

return periods_between(series_start_date, min_start_date, self.periodicity)

def sort(self, how):
if how == constants.SORT_ASCENDING:
order = settings.TS_TIME_INDEX_FIELD
sort = settings.TS_TIME_INDEX_FIELD

elif how == constants.SORT_DESCENDING:
order = '-' + settings.TS_TIME_INDEX_FIELD
sort = '-' + settings.TS_TIME_INDEX_FIELD
else:
msg = strings.INVALID_SORT_PARAMETER.format(how)
raise ValueError(msg)

self._search = self._search.sort(order)
self.search_options[constants.PARAM_SORT] = sort

@property
def search(self):
if self._search:
return self._search

self._search = self._init_search()
start_date = self.search_options[constants.PARAM_START_DATE]
end_date = self.search_options[constants.PARAM_END_DATE]
_filter = {
'lte': end_date,
'gte': start_date,
}
self._search = self._search.filter('range', timestamp=_filter)

self._search = self._search.sort(self.search_options[constants.PARAM_SORT])

self._search = self._search[self.search_options[constants.PARAM_START]:
self.search_options[constants.PARAM_LIMIT]]
self._add_collapse()
self._add_collapse_agg()
return self._search
Expand All @@ -92,11 +122,16 @@ def _add_collapse(self):

elif self._has_collapse():
self._search = self._search.filter('bool', must=[Q('match', interval=self.original_periodicity)])

interval = self.periodicity
if interval == 'semester':
interval = '6M'

# Agregamos la aggregation (?) para que se ejecute en ES en runtime
self._search.aggs.bucket('test',
A('date_histogram',
field='timestamp',
interval=self.periodicity,
interval=interval,
format='yyyy-MM-dd').
metric('test', self.collapse_agg, field=self.rep_mode))
else: # Ignoramos la in memory ag
Expand Down
2 changes: 1 addition & 1 deletion series_tiempo_ar_api/apps/api/query/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def add_series(self, query, series_id, rep_mode, collapse_agg):
if not field_model:
return

query.add_series(series_id, field_model, rep_mode, collapse_agg)
query.add_series(field_model, rep_mode, collapse_agg)

def _get_model(self, series_id):
"""Valida si el 'series_id' es válido, es decir, si la serie
Expand Down
Loading

0 comments on commit 493031f

Please sign in to comment.