-
Notifications
You must be signed in to change notification settings - Fork 33
/
mod.rs
94 lines (78 loc) · 2.82 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//! Metrics endpoint over a Unix-domain socket.
use actix::prelude::*;
use failure::{Fallible, ResultExt};
use std::os::unix::net as std_net;
use tokio::net as tokio_net;
/// Unix socket path.
static SOCKET_PATH: &str = "/run/zincati/public/metrics.promsock";
/// Metrics exposition service.
pub struct MetricsService {
listener: std_net::UnixListener,
}
impl MetricsService {
/// Create metrics service and bind to the Unix-domain socket.
pub fn bind_socket() -> Fallible<Self> {
let _ = std::fs::remove_file(SOCKET_PATH);
let listener =
std_net::UnixListener::bind(SOCKET_PATH).context("failed to bind metrics service")?;
Ok(Self { listener })
}
/// Gather metrics from the default registry and encode them in textual format.
fn prometheus_text_encode() -> Fallible<Vec<u8>> {
use prometheus::Encoder;
let metric_families = prometheus::gather();
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(buffer)
}
}
/// Incoming Unix-domain socket connection.
struct Connection {
stream: tokio_net::UnixStream,
}
impl Message for Connection {
type Result = ();
}
impl Actor for MetricsService {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let listener = self
.listener
.try_clone()
.expect("failed to clone metrics listener");
let async_listener = tokio_net::UnixListener::from_std(listener)
.expect("failed to create async metrics listener");
// This uses manual stream unfolding in order to keep the async listener
// alive for the whole duration of the stream.
let connections = futures::stream::unfold(async_listener, |mut l| async move {
loop {
let next = l.accept().await;
if let Ok((stream, _addr)) = next {
let conn = Connection { stream };
break Some((conn, l));
}
}
});
ctx.add_stream(connections);
log::debug!(
"started metrics service on Unix-domain socket '{}'",
SOCKET_PATH
);
}
}
impl actix::io::WriteHandler<std::io::Error> for MetricsService {
fn error(&mut self, _err: std::io::Error, _ctx: &mut Self::Context) -> Running {
actix::Running::Continue
}
fn finished(&mut self, _ctx: &mut Self::Context) {}
}
impl StreamHandler<Connection> for MetricsService {
fn handle(&mut self, item: Connection, ctx: &mut Context<MetricsService>) {
let mut wr = actix::io::Writer::new(item.stream, ctx);
if let Ok(metrics) = MetricsService::prometheus_text_encode() {
wr.write(&metrics);
}
wr.close();
}
}