Skip to content

Commit

Permalink
simplify gnocchi batch push setup
Browse files Browse the repository at this point in the history
we run through samples and format samples and resources to what
gnocchi expects. this simplifies that process a little:
- don't needlessly sort and group by metric name
- build a full resource model iff resource needs to be created

Change-Id: I2ac25b3b0978eed664c500e645bae2d1b4ae6781
  • Loading branch information
chungg authored and dchavoll committed Aug 15, 2018
1 parent c473be4 commit e58fe24
Showing 1 changed file with 23 additions and 33 deletions.
56 changes: 23 additions & 33 deletions ceilometer/publisher/gnocchi.py
Expand Up @@ -292,8 +292,7 @@ def _get_resource_definition_from_event(self, event_type):
def publish_samples(self, data):
# NOTE(sileht): skip sample generated by gnocchi itself
data = [s for s in data if not self._is_gnocchi_activity(s)]

data.sort(key=lambda s: (s.resource_id, s.name))
data.sort(key=operator.attrgetter('resource_id'))
resource_grouped_samples = itertools.groupby(
data, key=operator.attrgetter('resource_id'))

Expand All @@ -303,13 +302,8 @@ def publish_samples(self, data):
# NOTE(sileht): / is forbidden by Gnocchi
resource_id = resource_id.replace('/', '_')

metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.attrgetter('name'))

res_info = {}
for metric_name, samples in metric_grouped_samples:
samples = list(samples)
for sample in samples_of_resource:
metric_name = sample.name
rd = self.metric_map.get(metric_name)
if rd is None:
if metric_name not in self._already_logged_metric_names:
Expand All @@ -320,28 +314,22 @@ def publish_samples(self, data):
if rd.cfg.get("ignore"):
continue

res_info['resource_type'] = rd.cfg['resource_type']
res_info.setdefault("resource", {}).update({
"id": resource_id,
"user_id": samples[0].user_id,
"project_id": samples[0].project_id,
"metrics": rd.metrics,
})

for sample in samples:
res_info.setdefault("resource_extra", {}).update(
rd.sample_attributes(sample))
m = measures.setdefault(resource_id, {}).setdefault(
metric_name, [])
m.append({'timestamp': sample.timestamp,
'value': sample.volume})
unit = sample.unit
metric = sample.name
res_info['resource']['metrics'][metric]['unit'] = unit

res_info["resource"].update(res_info["resource_extra"])
if res_info:
gnocchi_data[resource_id] = res_info
if resource_id not in gnocchi_data:
gnocchi_data[resource_id] = {
'resource_type': rd.cfg['resource_type'],
'resource': {"id": resource_id,
"user_id": sample.user_id,
"project_id": sample.project_id,
"metrics": rd.metrics}}

gnocchi_data[resource_id].setdefault(
"resource_extra", {}).update(rd.sample_attributes(sample))
measures.setdefault(resource_id, {}).setdefault(
metric_name, []).append({'timestamp': sample.timestamp,
'value': sample.volume})
# TODO(gordc): unit should really be part of metric definition
gnocchi_data[resource_id]['resource']['metrics'][
metric_name]['unit'] = sample.unit

try:
self.batch_measures(measures, gnocchi_data)
Expand Down Expand Up @@ -369,7 +357,8 @@ def _extract_resources_from_error(e, resource_infos):
resource_ids = set([r['original_resource_id']
for r in e.message['detail']])
return [(resource_infos[rid]['resource_type'],
resource_infos[rid]['resource'])
resource_infos[rid]['resource'],
resource_infos[rid]['resource_extra'])
for rid in resource_ids]

def batch_measures(self, measures, resource_infos):
Expand All @@ -385,8 +374,9 @@ def batch_measures(self, measures, resource_infos):
raise

resources = self._extract_resources_from_error(e, resource_infos)
for resource_type, resource in resources:
for resource_type, resource, resource_extra in resources:
try:
resource.update(resource_extra)
self._if_not_cached("create", resource_type, resource,
self._create_resource)
except gnocchi_exc.ResourceAlreadyExists:
Expand Down

0 comments on commit e58fe24

Please sign in to comment.