Skip to content

Commit

Permalink
Merge pull request #132 from yaacov/use-cadvisor-job-for-prometheus-m…
Browse files Browse the repository at this point in the history
…etrics

Improve Prometheus metrics collection
(cherry picked from commit 63c6681)

https://bugzilla.redhat.com/show_bug.cgi?id=1530734
  • Loading branch information
Mooli Tayer authored and simaishi committed Jan 3, 2018
1 parent 1a9a787 commit d226f86
Show file tree
Hide file tree
Showing 14 changed files with 413 additions and 575 deletions.
Expand Up @@ -38,92 +38,12 @@ def ts_values
@ts_values.select { |_, v| @metrics.all? { |k| v.key?(k) } }
end

private

CPU_NANOSECONDS = 1e09

def target_name
"#{@target.class.name.demodulize}(#{@target.id})"
end

def validate_target
raise TargetValidationError, "ems not defined" unless @ext_management_system
raise TargetValidationWarning, "no associated node" unless @node_hardware

raise TargetValidationError, "cores not defined" unless @node_cores.to_i > 0
raise TargetValidationError, "memory not defined" unless @node_memory.to_i > 0
end

def fetch_counters_rate(resource)
compute_derivative(fetch_counters_data(resource))
end

def process_cpu_counters_rate(counters_rate)
@metrics |= ['cpu_usage_rate_average'] unless counters_rate.empty?
sec_cpu_time = @node_cores * CPU_NANOSECONDS
counters_rate.each do |x|
interval = (x['end'] - x['start']) / 1.in_milliseconds
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage = (x['min'] * 100.0) / (sec_cpu_time * interval)
@ts_values[timestamp]['cpu_usage_rate_average'] = avg_usage
end
end

def process_mem_gauges_data(gauges_data)
@metrics |= ['mem_usage_absolute_average'] unless gauges_data.empty?
gauges_data.each do |x|
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage = (x['min'] / 1.megabytes) * 100.0 / @node_memory
@ts_values[timestamp]['mem_usage_absolute_average'] = avg_usage
end
end

def process_net_counters_rate(counters_rate)
@metrics |= ['net_usage_rate_average'] unless counters_rate.empty?
counters_rate.each do |x|
interval = (x['end'] - x['start']) / 1.in_milliseconds
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage_kb = x['min'] / (1.kilobyte.to_f * interval)
@ts_values[timestamp]['net_usage_rate_average'] = avg_usage_kb
end
end

def compute_summation(data)
ts_data = Hash.new { |h, k| h[k] = [] }

data.flatten.each { |x| ts_data[x['start']] << x }
ts_data.delete_if { |_k, v| v.length != data.length }

ts_data.keys.sort.map do |k|
ts_data[k].inject do |sum, n|
# Add min, median, max, percentile95th, etc. if needed
{
'start' => k,
'end' => [sum['end'], n['end']].max,
'min' => sum['min'] + n['min']
}
end
end
end

def compute_derivative(counters)
counters.each_cons(2).map do |prv, n|
# Add min, median, min, percentile95th, etc. if needed
# time window:
# 00:00 01:00
# ^ (sample start time) ^ (next sample start time)
# ^ (sample min/max/avg value) ^ (next sample min/max/avg value)
# ^ (real sample time) ^ (real sample time) ^ (real sample time)
# ^ (real sample value) ^ (real sample value) ^ (real sample value)
# we use:
# (T = start of window timestamp, V = min value of window samples)
# because the min value is the value of the sample closest to start of window.
{
'start' => prv['start'],
'end' => n['start'],
'min' => n['min'] - prv['min']
}
end
end
end
end
Expand Up @@ -78,5 +78,85 @@ def sort_and_normalize(data)
norm_data = (data.sort_by { |x| x['start'] }).slice(0..-2)
norm_data.reject { |x| x.values.include?('NaN') || x['empty'] == true }
end

private

CPU_NANOSECONDS = 1e09

def target_name
"#{@target.class.name.demodulize}(#{@target.id})"
end

def fetch_counters_rate(resource)
compute_derivative(fetch_counters_data(resource))
end

def process_cpu_counters_rate(counters_rate)
@metrics |= ['cpu_usage_rate_average'] unless counters_rate.empty?
sec_cpu_time = @node_cores * CPU_NANOSECONDS
counters_rate.each do |x|
interval = (x['end'] - x['start']) / 1.in_milliseconds
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage = (x['min'] * 100.0) / (sec_cpu_time * interval)
@ts_values[timestamp]['cpu_usage_rate_average'] = avg_usage
end
end

def process_mem_gauges_data(gauges_data)
@metrics |= ['mem_usage_absolute_average'] unless gauges_data.empty?
gauges_data.each do |x|
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage = (x['min'] / 1.megabytes) * 100.0 / @node_memory
@ts_values[timestamp]['mem_usage_absolute_average'] = avg_usage
end
end

def process_net_counters_rate(counters_rate)
@metrics |= ['net_usage_rate_average'] unless counters_rate.empty?
counters_rate.each do |x|
interval = (x['end'] - x['start']) / 1.in_milliseconds
timestamp = Time.at(x['start'] / 1.in_milliseconds).utc
avg_usage_kb = x['min'] / (1.kilobyte.to_f * interval)
@ts_values[timestamp]['net_usage_rate_average'] = avg_usage_kb
end
end

def compute_summation(data)
ts_data = Hash.new { |h, k| h[k] = [] }

data.flatten.each { |x| ts_data[x['start']] << x }
ts_data.delete_if { |_k, v| v.length != data.length }

ts_data.keys.sort.map do |k|
ts_data[k].inject do |sum, n|
# Add min, median, max, percentile95th, etc. if needed
{
'start' => k,
'end' => [sum['end'], n['end']].max,
'min' => sum['min'] + n['min']
}
end
end
end

def compute_derivative(counters)
counters.each_cons(2).map do |prv, n|
# Add min, median, min, percentile95th, etc. if needed
# time window:
# 00:00 01:00
# ^ (sample start time) ^ (next sample start time)
# ^ (sample min/max/avg value) ^ (next sample min/max/avg value)
# ^ (real sample time) ^ (real sample time) ^ (real sample time)
# ^ (real sample value) ^ (real sample value) ^ (real sample value)
# we use:
# (T = start of window timestamp, V = min value of window samples)
# because the min value is the value of the sample closest to start of window.
{
'start' => prv['start'],
'end' => n['start'],
'min' => n['min'] - prv['min']
}
end
end
end
end
Expand Up @@ -3,69 +3,83 @@ class PrometheusCaptureContext
include ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusClientMixin
include CaptureContextMixin

def collect_node_metrics
# TODO: This function should be replaced to use utilization and rate endoints

# prometheus field is in sec, multiply by 1e9, sec to ns
# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-cadvisor\"}) * 1e9"
process_cpu_counters_rate(fetch_counters_rate(cpu_resid))
# NOTE: when calculating rate of net_usage and cpu_usage counters we use
# Prometheus rate function, AVG_OVER is used for range vector size.
# value of AVG_OVER is set to 2m allowing for none aligned or missing
# scrapes.
#
# rate(v range-vector) calculates the per-second average rate of increase
# of the time series in the range vector. Breaks in monotonicity (such as
# counter resets due to target restarts) are automatically adjusted for.
# Also, the calculation extrapolates to the ends of the time range,
# allowing for missed scrapes or imperfect alignment of scrape cycles with
# the range's time period.
AVG_OVER = "2m".freeze

# prometheus field is in bytes
# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
mem_resid = "sum(container_memory_usage_bytes{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-cadvisor\"})"
process_mem_gauges_data(fetch_counters_data(mem_resid))

# prometheus field is in bytes
# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
net_resid_rx = "sum(container_network_receive_bytes_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-cadvisor\",interface=~\"eth.*\"})"
net_resid_tx = "sum(container_network_transmit_bytes_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-cadvisor\",interface=~\"eth.*\"})"

net_counters = [fetch_counters_rate(net_resid_tx),
fetch_counters_rate(net_resid_rx)]
def collect_node_metrics
# set node labels
labels = labels_to_s(
:container_name => "",
:id => "/",
:instance => @target.name,
)

process_net_counters_rate(compute_summation(net_counters))
@metrics = %w(cpu_usage_rate_average mem_usage_absolute_average net_usage_rate_average)
collect_metrics_for_labels(labels)
end

def collect_container_metrics
# TODO: This function should be replaced to use utilization and rate endoints

# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"#{@target.name}\",job=\"kubernetes-cadvisor\"}) * 1e9"
process_cpu_counters_rate(fetch_counters_rate(cpu_resid))
# set container labels
labels = labels_to_s(
:container_name => @target.name,
:pod_name => @target.container_group.name,
:namespace => @target.container_project.name,
)

# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
mem_resid = "sum(container_memory_usage_bytes{container_name=\"#{@target.name}\",job=\"kubernetes-cadvisor\"})"
process_mem_gauges_data(fetch_counters_data(mem_resid))
@metrics = %w(cpu_usage_rate_average mem_usage_absolute_average)
collect_metrics_for_labels(labels)
end

def collect_group_metrics
# TODO: This function should be replaced to use utilization and rate endoints
# set pod labels
# NOTE: pod_name="X" willl yield metrics for all the containers
# belonging to pod "X" as well as the internal POD container
# (OpenShift's equivalent of kubernetes 'pause' pod)

labels = labels_to_s(
:pod_name => @target.name,
:namespace => @target.container_project.name,
)

cpu_counters = @target.containers.collect do |c|
# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"#{c.name}\",job=\"kubernetes-cadvisor\"}) * 1e9"
fetch_counters_rate(cpu_resid)
end
process_cpu_counters_rate(compute_summation(cpu_counters))
@metrics = %w(cpu_usage_rate_average mem_usage_absolute_average net_usage_rate_average)
collect_metrics_for_labels(labels)
end

mem_gauges = @target.containers.collect do |c|
# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
mem_resid = "sum(container_memory_usage_bytes{container_name=\"#{c.name}\",job=\"kubernetes-cadvisor\"})"
fetch_counters_data(mem_resid)
end
process_mem_gauges_data(compute_summation(mem_gauges))
def collect_metrics_for_labels(labels)
# prometheus field is in core usage per sec
# miq field is in pct of node cpu
#
# rate is the "usage per sec" readings avg over last 5m
cpu_resid = "sum(rate(container_cpu_usage_seconds_total{#{labels}}[#{AVG_OVER}]))"
fetch_counters_data(cpu_resid, 'cpu_usage_rate_average', @node_cores / 100.0)

# prometheus field is in bytes, @node_memory is in mb
# miq field is in pct of node memory
mem_resid = "sum(container_memory_usage_bytes{#{labels}})"
fetch_counters_data(mem_resid, 'mem_usage_absolute_average', @node_memory * 1e6 / 100.0)

# FIXME: we must update this labels to 3.7 labeling scheme and make sure it's uniqe (using type, id, and namespace labeiing)
net_resid_rx = "sum(container_network_receive_bytes_total{container_name=\"POD\",pod_name=\"#{@target.name}\",job=\"kubernetes-cadvisor\",interface=~\"eth.*\"})"
net_resid_tx = "sum(container_network_transmit_bytes_total{container_name=\"POD\",pod_name=\"#{@target.name}\",job=\"kubernetes-cadvisor\",interface=~\"eth.*\"})"
# prometheus field is in bytes
# miq field is on kb ( / 1000 )
if @metrics.include?('net_usage_rate_average')
net_resid = "sum(rate(container_network_receive_bytes_total{#{labels},interface=~\"eth.*\"}[#{AVG_OVER}])) + " \
"sum(rate(container_network_transmit_bytes_total{#{labels},interface=~\"eth.*\"}[#{AVG_OVER}]))"
fetch_counters_data(net_resid, 'net_usage_rate_average', 1000.0)
end

net_counters = [fetch_counters_rate(net_resid_tx),
fetch_counters_rate(net_resid_rx)]
process_net_counters_rate(compute_summation(net_counters))
@ts_values
end

def fetch_counters_data(resource)
def fetch_counters_data(resource, metric_title, conversion_factor = 1)
start_sec = (@starts / 1_000) - @interval
end_sec = @ends ? (@ends / 1_000).to_i : Time.now.utc.to_i

Expand All @@ -76,13 +90,15 @@ def fetch_counters_data(resource)
:start => start_sec.to_i,
:end => end_sec,
:step => "#{@interval}s"
)
),
metric_title,
conversion_factor
)
rescue StandardError => e
raise CollectionFailure, "#{e.class.name}: #{e.message}"
end

def sort_and_normalize(response)
def sort_and_normalize(response, metric_title, conversion_factor)
response = JSON.parse(response.body)

if response["status"] == "error"
Expand All @@ -97,12 +113,10 @@ def sort_and_normalize(response)
# prometheus gives the time of last reading:
# devide and multiply to convert time to start of interval window
start_sec = (x[0] / @interval).to_i * @interval
timekey = Time.at(start_sec).utc
value = x[1].to_f / conversion_factor.to_f

{
"start" => start_sec.to_i.in_milliseconds,
"end" => (start_sec.to_i + @interval.to_i).in_milliseconds,
"min" => x[1].to_f
}
@ts_values[timekey][metric_title] = value
end
end
end
Expand Down
Expand Up @@ -53,6 +53,10 @@ def prometheus_options
}
end

def labels_to_s(labels, job = "kubernetes-cadvisor")
labels.merge(:job => job).compact.sort.map { |k, v| "#{k}=\"#{v}\"" }.join(',')
end

def prometheus_try_connect
begin
response = prometheus_client.get("query", :query => "ALL")
Expand Down
14 changes: 7 additions & 7 deletions manageiq-providers-kubernetes.gemspec
Expand Up @@ -13,12 +13,12 @@ Gem::Specification.new do |s|

s.files = Dir["{app,config,lib}/**/*"]

s.add_runtime_dependency "hawkular-client", "~> 4.1"
s.add_runtime_dependency "image-inspector-client", "~>1.0.3"
s.add_runtime_dependency "kubeclient", "~>2.4.0"
s.add_runtime_dependency "prometheus-alert-buffer-client", "~>0.2.0"
s.add_runtime_dependency("hawkular-client", "~> 4.1")
s.add_runtime_dependency("image-inspector-client", "~>1.0.3")
s.add_runtime_dependency("kubeclient", "~>2.4.0")
s.add_runtime_dependency("prometheus-alert-buffer-client", "~> 0.2.0")

s.add_development_dependency "codeclimate-test-reporter", "~> 1.0.0"
s.add_development_dependency "recursive-open-struct", "~> 1.0.0"
s.add_development_dependency "simplecov"
s.add_development_dependency("codeclimate-test-reporter", "~> 1.0.0")
s.add_development_dependency("recursive-open-struct", "~> 1.0.0")
s.add_development_dependency("simplecov")
end

0 comments on commit d226f86

Please sign in to comment.