Skip to content

Commit

Permalink
metricd: publish statistics about processing speed
Browse files Browse the repository at this point in the history
Closes #600
  • Loading branch information
jd authored and pastamaker[bot] committed Mar 22, 2018
1 parent 8e041df commit 72a7a39
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
2 changes: 2 additions & 0 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ def _run_job(self):
LOG.error("Unexpected error processing assigned job",
exc_info=True)
LOG.debug("%d metrics processed from %d sacks", m_count, s_count)
# Update statistics
self.coord.update_capabitilities(self.GROUP_ID, self.statistics)
if sacks == self._get_sacks_to_process():
# We just did a full scan of all sacks, reset the timer
self._last_full_sack_scan.reset()
Expand Down
13 changes: 12 additions & 1 deletion gnocchi/rest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,9 +2005,20 @@ def get(details=True):
report_dict["storage"]["measures_to_process"] = report['details']
report_dict['metricd'] = {}
if members_req:
report_dict['metricd']['processors'] = members_req.get()
members = members_req.get()
caps = [
pecan.request.coordinator.get_member_capabilities(
metricd.MetricProcessor.GROUP_ID, member)
for member in members
]
report_dict['metricd']['processors'] = members
report_dict['metricd']['statistics'] = {
member: cap.get()
for member, cap in six.moves.zip(members, caps)
}
else:
report_dict['metricd']['processors'] = None
report_dict['metricd']['statistics'] = {}
return report_dict


Expand Down
62 changes: 38 additions & 24 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import itertools
import operator

Expand Down Expand Up @@ -97,11 +98,31 @@ def get_driver(conf):
conf.storage)


class Statistics(collections.defaultdict):
class StatisticsTimeContext(object):
def __init__(self, stats, name):
self.stats = stats
self.name = name + " time"

def __enter__(self):
self.sw = utils.StopWatch()
self.sw.start()
return self

def __exit__(self, type, value, traceback):
self.stats[self.name] += self.sw.elapsed()

def __init__(self):
super(Statistics, self).__init__(lambda: 0)

def time(self, name):
return self.StatisticsTimeContext(self, name)


class StorageDriver(object):

@staticmethod
def __init__(conf):
pass
def __init__(self, conf):
self.statistics = Statistics()

@staticmethod
def upgrade():
Expand Down Expand Up @@ -538,11 +559,12 @@ def add_measures_to_metrics(self, metrics_and_measures):
objects and values are timeseries array of
the new measures.
"""
with utils.StopWatch() as sw:
with self.statistics.time("raw measures fetch"):
raw_measures = self._get_or_create_unaggregated_timeseries(
metrics_and_measures.keys())
LOG.debug("Retrieve unaggregated measures for %d metric in %.2fs",
len(metrics_and_measures), sw.elapsed())
self.statistics["raw measures fetch"] += len(metrics_and_measures)
self.statistics["processed measures"] += sum(
map(len, metrics_and_measures.values()))

new_boundts = []
splits_to_delete = {}
Expand All @@ -554,7 +576,6 @@ def add_measures_to_metrics(self, metrics_and_measures):
agg_methods = list(metric.archive_policy.aggregation_methods)
block_size = metric.archive_policy.max_block_size
back_window = metric.archive_policy.back_window
definition = metric.archive_policy.definition
# NOTE(sileht): We keep one more blocks to calculate rate of change
# correctly
if any(filter(lambda x: x.startswith("rate:"), agg_methods)):
Expand Down Expand Up @@ -627,7 +648,7 @@ def _map_compute_splits_operations(bound_timeserie):
deleted_keys,
keys_and_split_to_store)

with utils.StopWatch() as sw:
with self.statistics.time("aggregated measures compute"):
(new_first_block_timestamp,
deleted_keys,
keys_and_splits_to_store) = ts.set_values(
Expand All @@ -641,22 +662,15 @@ def _map_compute_splits_operations(bound_timeserie):

new_boundts.append((metric, ts.serialize()))

number_of_operations = (len(agg_methods) * len(definition))
perf = ""
elapsed = sw.elapsed()
if elapsed > 0:
perf = " (%d points/s, %d measures/s)" % (
((number_of_operations * computed_points['number']) /
elapsed),
((number_of_operations * len(measures)) / elapsed)
)
LOG.debug("Computed new metric %s with %d new measures "
"in %.2f seconds%s",
metric.id, len(measures), elapsed, perf)

self._delete_metric_splits(splits_to_delete)
self._update_metric_splits(splits_to_update)
self._store_unaggregated_timeseries(new_boundts)
with self.statistics.time("splits delete"):
self._delete_metric_splits(splits_to_delete)
self.statistics["splits delete"] += len(splits_to_delete)
with self.statistics.time("splits update"):
self._update_metric_splits(splits_to_update)
self.statistics["splits delete"] += len(splits_to_update)
with self.statistics.time("raw measures store"):
self._store_unaggregated_timeseries(new_boundts)
self.statistics["raw measures store"] += len(new_boundts)

def find_measure(self, metric, predicate, granularity, aggregation="mean",
from_timestamp=None, to_timestamp=None):
Expand Down
4 changes: 2 additions & 2 deletions gnocchi/tests/functional/gabbits/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ tests:
authorization: "basic YWRtaW46"
response_json_paths:
$.storage.`len`: 2
$.metricd.`len`: 1
$.metricd.`len`: 2

- name: get status, no details
GET: /v1/status?details=False
Expand All @@ -141,4 +141,4 @@ tests:
authorization: "basic YWRtaW46"
response_json_paths:
$.storage.`len`: 1
$.metricd.`len`: 1
$.metricd.`len`: 2

0 comments on commit 72a7a39

Please sign in to comment.