-
Notifications
You must be signed in to change notification settings - Fork 357
/
instances_stats_reporter.rb
163 lines (139 loc) · 5.55 KB
/
instances_stats_reporter.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
require 'logcache/client'
require 'cloud_controller/diego/reporters/reporter_mixins'
module VCAP::CloudController
module Diego
class InstancesStatsReporter
include ReporterMixins
def initialize(bbs_instances_client, logstats_client)
@bbs_instances_client = bbs_instances_client
@logstats_client = logstats_client
end
def stats_for_app(process)
result = {}
current_time = Time.now.to_f
formatted_current_time = Time.now.to_datetime.rfc3339
logger.debug('stats_for_app.fetching_container_metrics', process_guid: process.guid)
desired_lrp = bbs_instances_client.desired_lrp_instance(process)
log_cache_data, log_cache_errors = envelopes(desired_lrp, process)
stats = formatted_process_stats(log_cache_data, formatted_current_time)
quota_stats = formatted_quota_stats(log_cache_data)
bbs_instances_client.lrp_instances(process).each do |actual_lrp|
index = actual_lrp.actual_lrp_key.index
next unless index < process.instances
info = {
state: LrpStateTranslator.translate_lrp_state(actual_lrp),
isolation_segment: desired_lrp.PlacementTags.first,
stats: {
name: process.name,
uris: process.uris,
host: actual_lrp.actual_lrp_net_info.address,
port: get_default_port(actual_lrp.actual_lrp_net_info),
net_info: actual_lrp.actual_lrp_net_info.to_h,
uptime: nanoseconds_to_seconds((current_time * 1e9) - actual_lrp.since),
fds_quota: process.file_descriptors
}.merge(metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index))
}
info[:details] = actual_lrp.placement_error if actual_lrp.placement_error.present?
info[:routable] = (actual_lrp.routable if actual_lrp.optional_routable)
result[actual_lrp.actual_lrp_key.index] = info
end
fill_unreported_instances_with_down_instances(result, process, flat: false)
warnings = [log_cache_errors].compact
[result, warnings]
rescue CloudController::Errors::NoRunningInstances => e
logger.info('stats_for_app.error', error: e.to_s)
[fill_unreported_instances_with_down_instances({}, process, flat: false), []]
rescue StandardError => e
logger.error('stats_for_app.error', error: e.to_s)
raise e if e.is_a?(CloudController::Errors::ApiError) && e.name == 'ServiceUnavailable'
exception = CloudController::Errors::InstancesUnavailable.new(e.message)
exception.set_backtrace(e.backtrace)
raise exception
end
private
attr_reader :bbs_instances_client
def metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index)
if log_cache_errors.blank?
{
mem_quota: quota_stats[index]&.memory_bytes_quota,
disk_quota: quota_stats[index]&.disk_bytes_quota,
log_rate_limit: quota_stats[index]&.log_rate_limit,
usage: stats[index] || missing_process_stats(formatted_current_time)
}
else
{
mem_quota: nil,
disk_quota: nil,
log_rate_limit: nil,
usage: {}
}
end
end
def missing_process_stats(formatted_current_time)
{
time: formatted_current_time,
cpu: 0,
mem: 0,
disk: 0,
log_rate: 0
}
end
def formatted_process_stats(log_cache_data, formatted_current_time)
log_cache_data.
map do |e|
[
e.instance_index,
converted_container_metrics(e, formatted_current_time)
]
end.to_h
end
def formatted_quota_stats(log_cache_data)
log_cache_data.
index_by(&:instance_index)
end
def envelopes(desired_lrp, process)
if desired_lrp.metric_tags['process_id']
filter = ->(envelope) { envelope.tags.any? { |key, value| key == 'process_id' && value == process.guid } }
source_guid = process.app_guid
else
filter = ->(_) { true }
source_guid = process.guid
end
[@logstats_client.container_metrics(
source_guid: source_guid,
auth_token: VCAP::CloudController::SecurityContext.auth_token,
logcache_filter: filter
), nil]
rescue GRPC::BadStatus, CloudController::Errors::ApiError => e
logger.error('stats_for_app.error', error: e.message, backtrace: e.backtrace.join("\n"))
[[], 'Stats server temporarily unavailable.']
end
def logger
@logger ||= Steno.logger('cc.diego.instances_reporter')
end
def converted_container_metrics(container_metrics, formatted_current_time)
cpu = container_metrics.cpu_percentage
mem = container_metrics.memory_bytes
disk = container_metrics.disk_bytes
log_rate = container_metrics.log_rate
if cpu.nil? || mem.nil? || disk.nil? || log_rate.nil?
missing_process_stats(formatted_current_time)
else
{
time: formatted_current_time,
cpu: cpu / 100,
mem: mem,
disk: disk,
log_rate: log_rate
}
end
end
def get_default_port(net_info)
net_info.ports.each do |port_mapping|
return port_mapping.host_port if port_mapping.container_port == DEFAULT_APP_PORT
end
0
end
end
end
end