/
restream.go
101 lines (85 loc) · 3.14 KB
/
restream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package prometheus
import (
"github.com/datarhei/core/monitor/metric"
"github.com/prometheus/client_golang/prometheus"
)
type restreamCollector struct {
core string
collector metric.Reader
ffmpegProcessDesc *prometheus.Desc
ffmpegProcessStatesDesc *prometheus.Desc
ffmpegProcessIODesc *prometheus.Desc
ffmpegStatesDesc *prometheus.Desc
ffmpegStatesTotalDesc *prometheus.Desc
}
func NewRestreamCollector(core string, c metric.Reader) prometheus.Collector {
return &restreamCollector{
core: core,
collector: c,
ffmpegProcessDesc: prometheus.NewDesc(
"ffmpeg_process",
"General stats per process",
[]string{"core", "process", "state", "order", "name"}, nil),
ffmpegProcessStatesDesc: prometheus.NewDesc(
"ffmpeg_process_states",
"Accumulated states per process",
[]string{"core", "process", "state"}, nil),
ffmpegProcessIODesc: prometheus.NewDesc(
"ffmpeg_process_io",
"Stats per input and output of a process",
[]string{"core", "process", "type", "id", "index", "stream", "media", "name"}, nil),
ffmpegStatesDesc: prometheus.NewDesc(
"ffmpeg_states",
"Current process states",
[]string{"core", "state"}, nil),
ffmpegStatesTotalDesc: prometheus.NewDesc(
"ffmpeg_states_total",
"Accumulated process states",
[]string{"core", "state"}, nil),
}
}
func (c *restreamCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.ffmpegProcessDesc
ch <- c.ffmpegProcessStatesDesc
ch <- c.ffmpegProcessIODesc
ch <- c.ffmpegStatesDesc
ch <- c.ffmpegStatesTotalDesc
}
func (c *restreamCollector) Collect(ch chan<- prometheus.Metric) {
metrics := c.collector.Collect([]metric.Pattern{
metric.NewPattern("restream_process"),
metric.NewPattern("restream_process_states"),
metric.NewPattern("restream_io"),
metric.NewPattern("ffmpeg_process"),
})
for _, m := range metrics.Values("restream_process") {
ch <- prometheus.MustNewConstMetric(c.ffmpegProcessDesc, prometheus.GaugeValue, m.Val(), c.core, m.L("processid"), m.L("state"), m.L("order"), m.L("name"))
}
for _, m := range metrics.Values("restream_process_states") {
ch <- prometheus.MustNewConstMetric(c.ffmpegProcessStatesDesc, prometheus.GaugeValue, m.Val(), c.core, m.L("processid"), m.L("state"))
}
for _, m := range metrics.Values("restream_io") {
ch <- prometheus.MustNewConstMetric(c.ffmpegProcessIODesc, prometheus.GaugeValue, m.Val(), c.core, m.L("processid"), m.L("type"), m.L("id"), m.L("index"), m.L("stream"), m.L("media"), m.L("name"))
}
states := map[string]float64{
"failed": 0,
"finished": 0,
"finishing": 0,
"killed": 0,
"running": 0,
"starting": 0,
}
for _, processid := range metrics.Labels("restream_process", "processid") {
s := metrics.Value("restream_process", "processid", "^"+processid+"$").L("state")
if _, ok := states[s]; !ok {
continue
}
states[s]++
}
for state, value := range states {
ch <- prometheus.MustNewConstMetric(c.ffmpegStatesDesc, prometheus.GaugeValue, value, c.core, state)
}
for _, m := range metrics.Values("ffmpeg_process") {
ch <- prometheus.MustNewConstMetric(c.ffmpegStatesTotalDesc, prometheus.CounterValue, m.Val(), c.core, m.L("state"))
}
}