Skip to content
Permalink
Browse files
[AMBARI-25078] Add metering metrics to AMS Metric Monitor. (#13)
  • Loading branch information
avijayanhwx committed Jan 2, 2019
1 parent 9f800da commit 011e39c5b4f022b7a005fd8c8413f2e39f0c584f
Showing 22 changed files with 295 additions and 34 deletions.
@@ -784,7 +784,33 @@
<filemode>755</filemode>
</mapper>
</data>

<data>
<src>${monitor.dir}/conf/unix/instance_type_provider_azure</src>
<type>file</type>
<mapper>
<type>perm</type>
<prefix>/etc/ambari-metrics-monitor/conf</prefix>
<filemode>755</filemode>
</mapper>
</data>
<data>
<src>${monitor.dir}/conf/unix/instance_type_provider_ec2</src>
<type>file</type>
<mapper>
<type>perm</type>
<prefix>/etc/ambari-metrics-monitor/conf</prefix>
<filemode>755</filemode>
</mapper>
</data>
<data>
<src>${monitor.dir}/conf/unix/instance_type_provider_gce</src>
<type>file</type>
<mapper>
<type>perm</type>
<prefix>/etc/ambari-metrics-monitor/conf</prefix>
<filemode>755</filemode>
</mapper>
</data>
<!-- Metric collector -->

<data>
@@ -36,6 +36,9 @@
<includes>
<include>metric_groups.conf</include>
<include>metric_monitor.ini</include>
<include>instance_type_provider_azure</include>
<include>instance_type_provider_ec2</include>
<include>instance_type_provider_gce</include>
</includes>
</fileSet>
<fileSet>
@@ -112,21 +112,33 @@ define([
return $q.when(emptyData(target));
}
var series = [];
var metricData = res.metrics[0].metrics;
// Added hostname to legend for templated dashboards.
var hostLegend = res.metrics[0].hostname ? ' on ' + res.metrics[0].hostname : '';
var timeSeries = {};
timeSeries = {
target: alias + hostLegend,
datapoints: []
};
for (var k in metricData) {
if (metricData.hasOwnProperty(k)) {
timeSeries.datapoints.push([metricData[k], (k - k % 1000)]);
}
}
series.push(timeSeries);
return $q.when({data: series});
var metricData = res.metrics;
_.map(metricData, function (data) {
// Added hostname to legend for templated dashboards.
var hostLegend = data.hostname ? ' on ' + data.hostname : '';
var alias = target.alias ? target.alias : target.metric;
if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "yarnqueues") {
alias = alias + ' on ' + target.qmetric; }
if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "kafka-topics") {
alias = alias + ' on ' + target.kbTopic; }
if (!alias.includes("%") || !data.metricname.includes('live_hosts')) {
if (!alias || alias.includes("%")) {
alias = data.metricname;
}
var timeSeries = {};
timeSeries = {
target: alias + hostLegend,
datapoints: []
};
for (var k in data.metrics) {
if (data.metrics.hasOwnProperty(k)) {
timeSeries.datapoints.push([data.metrics[k], (k - k % 1000)]);
}
}
series.push(timeSeries);
}
});
return $q.when({data: series});
};
};
// To speed up querying on templatized dashboards.
@@ -200,11 +212,12 @@ define([
aliasSuffix = '';
}
if (data.appid.indexOf('ambari_server') === 0) {
alias = data.metricname;
aliasSuffix = '';
}
if (!alias || alias.includes("%")) {
alias = data.metricname;
}
timeSeries = {
target: alias + aliasSuffix,
datapoints: []
};
for (var k in data.metrics) {
@@ -73,15 +73,18 @@ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregatio
double max = Integer.MIN_VALUE;
double min = Integer.MAX_VALUE;
int count = 0;
TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().iterator().next());
for (TimelineMetric metric : metrics.getMetrics()) {
for (Double value : metric.getMetricValues().values()) {
sum+=value;
max = Math.max(max, value);
min = Math.min(min, value);
count++;
}
if (metric.getStartTime() > tmpMetric.getStartTime()) {
tmpMetric.setStartTime(metric.getStartTime());
}
}
TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
tmpMetric.setMetricValues(new TreeMap<Long, Double>());
metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
}
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific

curl --silent -H Metadata:true "http://169.254.169.254/metadata/instance?api-version=2017-12-01" | grep -Po '"vmSize":.*?[^\\]",' | cut -d':' -f2 | sed 's/,/ /g' | sed 's/"//g'
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific

curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | grep 'instanceType' | awk '{ print $3 }' | sed 's/,/ /g' | sed 's/"//g'
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific

curl --silent -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/machine-type" | awk -F'/' '{print $4}'
@@ -88,6 +88,7 @@ def flatten(self, application_id = None, clear_once_flattened = False, set_insta
pass

for appId, metrics in local_metric_map.iteritems():
current_app_id = "HOST" if "HOST" in appId else appId
for metricId, metricData in dict(metrics).iteritems():
# Create a timeline metric object
result_instanceid = ""
@@ -96,7 +97,7 @@ def flatten(self, application_id = None, clear_once_flattened = False, set_insta
timeline_metric = {
"hostname" : self.hostname,
"metricname" : metricId,
"appid" : "HOST",
"appid" : current_app_id,
"instanceid" : result_instanceid,
"starttime" : self.get_start_time(appId, metricId),
"metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData
@@ -255,6 +255,21 @@ def ams_monitor_log_dir(self):
hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
return hosts

def is_metering_enabled(self):
return "true" == str(self.get("metering", "metering_enabled", "false")).lower()

def get_metering_appId(self):
return self.get("metering", "metering_appId", "metering")

def get_metering_metrics(self):
return self.get("metering", "metering_metrics", "").split(',')

def get_instance_type_script(self):
return self.get("metering", "instance_type_script", "").split(',')

def get_provider_type(self):
return self.get("metering", "host_provider_type", None)

def ams_monitor_log_file(self):
"""
:returns the log file
@@ -47,7 +47,7 @@ def __init__(self, config, stop_handler):
self.application_metric_map = ApplicationMetricMap(hostinfo.get_hostname(),
hostinfo.get_ip_address())
self.event_queue = Queue(config.get_max_queue_size())
self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo, config)
self.sleep_interval = config.get_collector_sleep_interval()
self._stop_handler = stop_handler
self.initialize_events_cache()
@@ -0,0 +1,76 @@
#!/usr/bin/env python

'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

import logging
import subprocess
import sys

logger = logging.getLogger()

class HostInstanceTypeProvider:

DEFAULT_INSTANCE_TYPE = "custom"
KNOWN_PROVIDER_SCRIPT_PREFIX = "instance_type_provider_"
KNOWN_PROVIDER_SCRIPTS = dict()

def __init__(self, config):

self.KNOWN_PROVIDER_SCRIPTS['google'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "gce"
self.KNOWN_PROVIDER_SCRIPTS['microsoft'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "azure"
self.KNOWN_PROVIDER_SCRIPTS['xen'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "ec2"

self.provider_type = config.get_provider_type()
logger.info("Provider type {0}".format(self.provider_type))

script = self.get_script_for_provider(self.provider_type)
logger.info("Script for provider {0}".format(script))

self.instance_type_script = config.get_instance_type_script()
logger.info("Custom Instance Type Script {0}".format(self.instance_type_script))

if script:
self.instance_type = self.get_instance_type_from_script(script)
elif self.instance_type_script:
self.instance_type = self.get_instance_type_from_script(self.instance_type_script)
else:
self.instance_type = self.DEFAULT_INSTANCE_TYPE
logger.info("Instance type {0}".format(self.instance_type))

def get_instance_type(self):
return self.instance_type

def get_script_for_provider(self, provider_type):
p_type = str(provider_type).lower()
if provider_type and p_type in self.KNOWN_PROVIDER_SCRIPTS:
return self.KNOWN_PROVIDER_SCRIPTS[p_type]
return None

def get_instance_type_from_script(self, script):
instance_type = self.DEFAULT_INSTANCE_TYPE
if script:
try:
osStat = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = osStat.communicate()
if 0 == osStat.returncode and 0 != len(out.strip()):
instance_type = out.strip()
logger.info("Read instance_type '{0}' using script '{1}'".format(instance_type, script))
except:
logger.warn("Unexpected error while retrieving instance_type: '{0}'".format(sys.exc_info()))
return instance_type
@@ -0,0 +1,63 @@
#!/usr/bin/env python

'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

import logging
import time
import json
from instance_type_provider import HostInstanceTypeProvider

logger = logging.getLogger()

class MeteringMetricHandler:

METERING_ALIVE_TIME_METRIC_SUFFIX = "lastKnownAliveTime"

## At startup,
def __init__(self, config):
self.appId = config.get_metering_appId()
self.instance_type_metric_appId = config.get_metering_appId() + "_instance_type"
self.metering_enabled = config.is_metering_enabled()
self.hostname = config.get_hostname_config()
self.instance_id = config.get_instanceid()
self.metering_metric_list = config.get_metering_metrics()
self.start_ts = int(round(time.time() * 1000))
if self.metering_enabled:
logger.info("Metering started with: appId = {0}, metering_metric_list = {1}, start time key = {2}"
.format(self.appId, self.metering_metric_list, self.start_ts))
self.instance_type_provider = HostInstanceTypeProvider(config)
self.instance_type = self.instance_type_provider.instance_type
self.metering_metric_key_prefix = self.hostname + "~" + self.instance_type + "~" + str(self.start_ts)
pass

# Metering Metrics
def get_metering_metrics(self, metrics):
metering_metrics = {}
curr_time = int(round(time.time() * 1000))
for metric_name, value in metrics.iteritems():
if metric_name in self.metering_metric_list:
end_time_metric_key = self.metering_metric_key_prefix + "~" + metric_name + "~" + str(value) + "~" + self.METERING_ALIVE_TIME_METRIC_SUFFIX
metering_metrics[end_time_metric_key] = curr_time

return metering_metrics

# Instance Type Metrics
def get_instance_type_metrics(self):
metering_metrics = {self.instance_type: 1}
return metering_metrics

0 comments on commit 011e39c

Please sign in to comment.