-
Notifications
You must be signed in to change notification settings - Fork 151
/
collector.rb
107 lines (92 loc) · 2.95 KB
/
collector.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# frozen_string_literal: true
module PrometheusExporter::Server
class Collector < CollectorBase
def initialize(json_serializer: nil)
@process_metrics = []
@metrics = {}
@mutex = Mutex.new
@collectors = {}
@json_serializer = PrometheusExporter.detect_json_serializer(json_serializer)
register_collector(WebCollector.new)
register_collector(ProcessCollector.new)
register_collector(SidekiqCollector.new)
register_collector(SidekiqQueueCollector.new)
register_collector(SidekiqProcessCollector.new)
register_collector(SidekiqStatsCollector.new)
register_collector(DelayedJobCollector.new)
register_collector(PumaCollector.new)
register_collector(HutchCollector.new)
register_collector(UnicornCollector.new)
register_collector(ActiveRecordCollector.new)
register_collector(ShoryukenCollector.new)
register_collector(ResqueCollector.new)
register_collector(GoodJobCollector.new)
end
def register_collector(collector)
@collectors[collector.type] = collector
end
def process(str)
process_hash(@json_serializer.parse(str))
end
def process_hash(obj)
@mutex.synchronize do
if collector = @collectors[obj["type"]]
collector.collect(obj)
else
metric = @metrics[obj["name"]]
if !metric
metric = register_metric_unsafe(obj)
end
keys = obj["keys"] || {}
if obj["custom_labels"]
keys = obj["custom_labels"].merge(keys)
end
case obj["prometheus_exporter_action"]
when 'increment'
metric.increment(keys, obj["value"])
when 'decrement'
metric.decrement(keys, obj["value"])
else
metric.observe(obj["value"], keys)
end
end
end
end
def prometheus_metrics_text
@mutex.synchronize do
(@metrics.values + @collectors.values.map(&:metrics).flatten)
.map(&:to_prometheus_text).join("\n")
end
end
def register_metric(metric)
@mutex.synchronize do
@metrics[metric.name] = metric
end
end
protected
def register_metric_unsafe(obj)
name = obj["name"]
help = obj["help"]
opts = symbolize_keys(obj["opts"] || {})
metric =
case obj["type"]
when "gauge"
PrometheusExporter::Metric::Gauge.new(name, help)
when "counter"
PrometheusExporter::Metric::Counter.new(name, help)
when "summary"
PrometheusExporter::Metric::Summary.new(name, help, opts)
when "histogram"
PrometheusExporter::Metric::Histogram.new(name, help, opts)
end
if metric
@metrics[name] = metric
else
STDERR.puts "failed to register metric #{obj}"
end
end
def symbolize_keys(hash)
hash.inject({}) { |memo, k| memo[k.first.to_sym] = k.last; memo }
end
end
end