-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.rs
168 lines (149 loc) · 5.77 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
mod balance_monitor;
mod config;
use anyhow::{anyhow, Context as _, Result};
use ethcontract::DynTransport;
use prometheus::Encoder as _;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
use structopt::StructOpt;
use url::Url;
use web3::transports;
use web3::types::U256;
#[derive(Debug, StructOpt)]
#[structopt()]
struct Opt {
/// Path to the config file.
#[structopt(long, parse(from_os_str))]
config: PathBuf,
/// Url of the ethereum node to communicate with.
#[structopt(long)]
node: Url,
/// Serve the prometheus metrics at this address.
#[structopt(long, default_value = "0.0.0.0:8080")]
bind: SocketAddr,
/// Update the balances in this interval in seconds.
#[structopt(long, default_value = "100", parse(try_from_str = duration_from_seconds))]
update_interval: Duration,
/// Print balances to stdout on update.
#[structopt(long)]
print_balances: bool,
}
fn duration_from_seconds(s: &str) -> Result<Duration, std::num::ParseIntError> {
s.parse().map(Duration::from_secs)
}
fn create_transport(url: &Url) -> Result<(web3::transports::EventLoopHandle, DynTransport)> {
// TODO: transport with timeouts
match url.scheme() {
"http" | "https" => {
let (handle, transport) = transports::Http::new(url.as_str())?;
Ok((handle, DynTransport::new(transport)))
}
other => Err(anyhow!("unknown scheme: {}", other)),
}
}
fn print_balance(address_name: &str, token_name: &str, balance: &Result<U256>) {
match balance {
Ok(balance) => println!(
"address {} {} balance is {}",
address_name, token_name, balance
),
Err(err) => println!(
"failed to get balance for address {} token {}: {}",
address_name, token_name, err
),
}
}
// Copied from ethcontract-rs.
/// Lossy conversion from a `U256` to a `f64`.
pub fn u256_to_f64(value: U256) -> f64 {
// NOTE: IEEE 754 double precision floats (AKA `f64`) have 53 bits of
// precision, take 1 extra bit so that the `u64` to `f64` conversion does
// rounding for us, instead of implementing it ourselves.
let exponent = value.bits().saturating_sub(54);
let mantissa = (value >> U256::from(exponent)).as_u64();
(mantissa as f64) * 2.0f64.powi(exponent as i32)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let opt = Opt::from_args();
println!("Beginning service with configuration parameters {:?}", opt);
let config: config::Config = toml::from_str(&std::fs::read_to_string(opt.config)?)?;
println!("Monitoring accounts {:?}", config);
// web3
let (event_loop_handle, transport) =
create_transport(&opt.node).context("failed to create transport from node uri")?;
event_loop_handle.into_remote();
let web3 = web3::Web3::new(transport);
let monitor = balance_monitor::BalanceMonitor::new(config, web3)?;
// metrics
let balance_metric = prometheus::GaugeVec::new(
prometheus::Opts::new(
"etherbalance_balance",
"The ether or IERC20 balance of an ethereum address.",
),
&["address_name", "token_name", "address", "tag"],
)?;
let last_update_metric = prometheus::Gauge::new(
"etherbalance_last_update",
"Unix time of last update of balances.",
)?;
let registry = prometheus::Registry::new();
registry.register(Box::new(balance_metric.clone()))?;
registry.register(Box::new(last_update_metric.clone()))?;
// http server for metrics
let address = opt.bind;
std::thread::spawn(move || {
let encoder = prometheus::TextEncoder::new();
rouille::start_server(address, move |_request| {
// We always serve the the metrics regardless of path even though
// the readme states the path should be /metrics.
let metric_families = registry.gather();
let mut buffer = vec![];
encoder
.encode(&metric_families, &mut buffer)
.expect("could not encode metrics");
rouille::Response::from_data("text/plain; charset=utf-8", buffer)
});
});
// update balances
let print_balances = opt.print_balances;
loop {
futures::executor::block_on(monitor.do_with_balances(|params| {
if print_balances {
print_balance(params.address_name, params.token_name, ¶ms.balance);
}
match params.balance {
Ok(balance) => balance_metric
.with_label_values(&[
params.address_name,
params.token_name,
&format!("{:#x}", params.address),
params.tag,
])
.set(u256_to_f64(balance)),
Err(err) => println!(
"failed to get balance for address {} token {}: {}",
params.address, params.token_name, err
),
}
}));
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(duration) => last_update_metric.set(duration.as_secs_f64()),
Err(err) => println!("system time before epoch: {}", err),
};
// Retrieving the balances takes some time so sleeping for
// update_interval makes us actually update the balances less frequently
// than update_interval. We could be more accurate and sleep the exact
// time needed. In practice it does not matter.
std::thread::sleep(opt.update_interval);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_parse_example_config() {
let config = include_str!("../example_config.toml");
let _: config::Config = toml::from_str(config).unwrap();
}
}