Permalink
Browse files

ES: raise error when accessing points from Elasticsearch accessor

Signed-off-by: Alban Dericbourg <a.dericbourg@criteo.com>
  • Loading branch information...
adericbourg authored and iksaif committed Jul 12, 2018
1 parent 41b4908 commit c4afda4b5cab725b15b617250c3a9e00801329d3
Showing with 8 additions and 49 deletions.
  1. +8 −49 biggraphite/drivers/elasticsearch.py
@@ -30,9 +30,7 @@
from biggraphite import accessor as bg_accessor
from biggraphite import glob_utils as bg_glob
from biggraphite.drivers import _downsampling
from biggraphite.drivers import _utils
from biggraphite.drivers import _delayed_writer
from biggraphite.drivers.ttls import DEFAULT_UPDATED_ON_TTL_SEC
from biggraphite.drivers.ttls import str_to_datetime, str_to_timestamp
@@ -302,6 +300,10 @@ def _get_depth_from_components(components):
return len(components) - 1
def _raise_unsupported():
raise NotImplementedError("Elasticsearch accessor does not support data operations")
class _ElasticSearchAccessor(bg_accessor.Accessor):
"""A ElasticSearch acessor that doubles as a ElasticSearch MetadataCache."""
@@ -326,11 +328,8 @@ def __init__(
):
"""Create a new ElasticSearchAccessor."""
super(_ElasticSearchAccessor, self).__init__("ElasticSearch")
self._metric_to_points = collections.defaultdict(sortedcontainers.SortedDict)
self._name_to_metric = {}
self._directory_names = sortedcontainers.SortedSet()
self.__downsampler = _downsampling.Downsampler()
self.__delayed_writer = _delayed_writer.DelayedWriter(self)
self._hosts = list(hosts)
self._port = port
self._index_prefix = index
@@ -393,15 +392,10 @@ def _shutdown(self):
def background(self):
"""Perform periodic background operations."""
if self.__downsampler:
self.__downsampler.purge()
if self.__delayed_writer:
self.__delayed_writer.write_some()
pass
def flush(self):
"""Flush any internal buffers."""
if self.__delayed_writer:
self.__delayed_writer.flush()
if self.client:
self.client.indices.flush(
index="%s*" % self._index_prefix,
@@ -432,34 +426,19 @@ def insert_points_async(self, metric, datapoints, on_done=None):
super(_ElasticSearchAccessor, self).insert_points_async(
metric, datapoints, on_done
)
if metric.name not in self._name_to_metric:
self.create_metric(metric)
datapoints = self.__downsampler.feed(metric, datapoints)
if self.__delayed_writer:
datapoints = self.__delayed_writer.feed(metric, datapoints)
return self.insert_downsampled_points_async(metric, datapoints, on_done)
_raise_unsupported()
def insert_downsampled_points_async(self, metric, datapoints, on_done=None):
"""See the real Accessor for a description."""
super(_ElasticSearchAccessor, self).insert_downsampled_points_async(
metric, datapoints, on_done
)
if metric.name not in self._name_to_metric:
self.create_metric(metric)
for datapoint in datapoints:
timestamp, value, count, stage = datapoint
points = self._metric_to_points[(metric.name, stage)]
points[timestamp] = (value, count)
if on_done:
on_done(None)
_raise_unsupported()
def drop_all_metrics(self, *args, **kwargs):
"""See the real Accessor for a description."""
super(_ElasticSearchAccessor, self).drop_all_metrics(*args, **kwargs)
# Drop indices.
self._metric_to_points.clear()
self._name_to_metric.clear()
self._directory_names.clear()
@@ -648,27 +627,7 @@ def fetch_points(self, metric, time_start, time_end, stage, aggregated=True):
super(_ElasticSearchAccessor, self).fetch_points(
metric, time_start, time_end, stage
)
points = self._metric_to_points[(metric.name, stage)]
rows = []
for ts in points.irange(time_start, time_end):
# A row is time_base_ms, time_offset_ms, value, count
if stage.aggregated():
row = self.Row(ts * 1000.0, 0, 0, float(points[ts][0]), points[ts][1])
else:
row = self.Row0(ts * 1000.0, 0, float(points[ts][0]))
rows.append(row)
query_results = [(True, rows)]
time_start_ms = int(time_start) * 1000
time_end_ms = int(time_end) * 1000
return bg_accessor.PointGrouper(
metric,
time_start_ms,
time_end_ms,
stage,
query_results,
aggregated=aggregated,
)
_raise_unsupported()
def touch_metric(self, metric_name):
"""See the real Accessor for a description."""

0 comments on commit c4afda4

Please sign in to comment.