Skip to content

Commit

Permalink
Make metrics modular & report from a per-server registry (#7493)
Browse files Browse the repository at this point in the history
This PR takes the work in https://github.com/MaterializeInc/materialize/compare/main...benesch:metrics?expand=1, expands it and makes it apply to all the bits of the system. Some high points:

* More ergonomic creation / registration of metrics via type inference
* It's no longer necessary to import the prometheus crate at all (it's all re-exported nicely from the ore crate), unless a crate needs to read from the registry.
* This allows setting up per-server-object metrics, and so you can make assertions about the state of a metric in unit/integration tests.
  • Loading branch information
antifuchs authored Jul 27, 2021
1 parent 1907a56 commit 480aa30
Show file tree
Hide file tree
Showing 50 changed files with 1,691 additions and 632 deletions.
16 changes: 2 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use futures::future::{self, FutureExt, TryFutureExt};
use futures::stream::{self, StreamExt};
use itertools::Itertools;
use lazy_static::lazy_static;
use ore::metrics::MetricsRegistry;
use rand::Rng;
use repr::adt::numeric;
use timely::communication::WorkerGuards;
Expand Down Expand Up @@ -186,6 +187,7 @@ pub struct Config<'a> {
pub experimental_mode: bool,
pub safe_mode: bool,
pub build_info: &'static BuildInfo,
pub metrics_registry: MetricsRegistry,
}

/// Glues the external world to the Timely workers.
Expand Down Expand Up @@ -3307,6 +3309,7 @@ pub async fn serve(
experimental_mode,
safe_mode,
build_info,
metrics_registry,
}: Config<'_>,
) -> Result<(Handle, Client), CoordError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -3341,6 +3344,7 @@ pub async fn serve(
timely_worker,
experimental_mode,
now: system_time,
metrics_registry: metrics_registry.clone(),
})
.map_err(|s| CoordError::Unstructured(anyhow!("{}", s)))?;

Expand All @@ -3354,7 +3358,7 @@ pub async fn serve(
let mut scraper = Scraper::new(
granularity,
retain_readings_for,
::prometheus::default_registry(),
metrics_registry.clone(),
rx,
internal_cmd_tx.clone(),
);
Expand All @@ -3375,8 +3379,12 @@ pub async fn serve(
// Spawn timestamper after any fallible operations so that if bootstrap fails we still
// tell it to shut down.
let (ts_tx, ts_rx) = std::sync::mpsc::channel();
let mut timestamper =
Timestamper::new(Duration::from_millis(10), internal_cmd_tx.clone(), ts_rx);
let mut timestamper = Timestamper::new(
Duration::from_millis(10),
internal_cmd_tx.clone(),
ts_rx,
&metrics_registry,
);
let executor = TokioHandle::current();
let timestamper_thread_handle = thread::Builder::new()
.name("timestamper".to_string())
Expand Down Expand Up @@ -3472,6 +3480,7 @@ pub async fn serve(

pub fn serve_debug(
catalog_path: &Path,
metrics_registry: MetricsRegistry,
) -> (
JoinOnDropHandle<()>,
Client,
Expand Down Expand Up @@ -3505,6 +3514,7 @@ pub fn serve_debug(
timely_worker: timely::WorkerConfig::default(),
experimental_mode: true,
now: get_debug_timestamp,
metrics_registry,
})
.unwrap();

Expand Down
11 changes: 6 additions & 5 deletions src/coord/src/coord/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::{
};

use chrono::NaiveDateTime;
use prometheus::{proto::MetricType, Registry};
use ore::metrics::MetricsRegistry;
use prometheus::proto::MetricType;
use repr::{Datum, Diff, Row, Timestamp};
use tokio::sync::mpsc::UnboundedSender;

Expand All @@ -31,10 +32,10 @@ use super::{

/// Scrapes the prometheus registry in a regular interval and submits a batch of metric data to a
/// logging worker, to be inserted into a table.
pub struct Scraper<'a> {
pub struct Scraper {
interval: Duration,
retain_for: u64,
registry: &'a Registry,
registry: MetricsRegistry,
command_rx: std::sync::mpsc::Receiver<ScraperMessage>,
internal_tx: UnboundedSender<super::Message>,
}
Expand Down Expand Up @@ -131,11 +132,11 @@ fn metric_family_metadata(family: &prometheus::proto::MetricFamily) -> Row {
])
}

impl<'a> Scraper<'a> {
impl Scraper {
pub fn new(
interval: Duration,
retain_for: Duration,
registry: &'a Registry,
registry: MetricsRegistry,
command_rx: std::sync::mpsc::Receiver<ScraperMessage>,
internal_tx: UnboundedSender<super::Message>,
) -> Self {
Expand Down
48 changes: 38 additions & 10 deletions src/coord/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use lazy_static::lazy_static;
use log::{debug, error, info, log_enabled, warn};
use mz_avro::schema::Schema;
use mz_avro::types::Value;
use prometheus::{register_int_gauge_vec, IntGaugeVec};
use ore::metric;
use ore::metrics::{IntGaugeVec, MetricsRegistry};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -85,12 +86,6 @@ lazy_static! {
"connect.name": "io.debezium.connector.common.TransactionMetadataValue"
}"#.parse().unwrap()
};

static ref MAX_AVAILABLE_OFFSET: IntGaugeVec = register_int_gauge_vec!(
"mz_kafka_partition_offset_max",
"The high watermark for a partition, the maximum offset that we could hope to ingest",
&["topic", "source_id", "partition_id"]
).unwrap();
}

#[derive(Debug)]
Expand Down Expand Up @@ -255,6 +250,26 @@ pub struct Timestamper {

/// Frequency at which thread should run
timestamp_frequency: Duration,

/// Metrics that the timestamper reports.
metrics: Metrics,
}

#[derive(Clone)]
struct Metrics {
max_available_offset: IntGaugeVec,
}

impl Metrics {
fn register_with(registry: &MetricsRegistry) -> Self {
Self {
max_available_offset: registry.register(metric!(
name: "mz_kafka_partition_offset_max",
help: "The high watermark for a partition, the maximum offset that we could hope to ingest",
var_labels: ["topic", "source_id", "partition_id"],
))
}
}
}

/// Implements the byo timestamping logic
Expand Down Expand Up @@ -466,6 +481,7 @@ impl Timestamper {
frequency: Duration,
tx: mpsc::UnboundedSender<coord::Message>,
rx: std::sync::mpsc::Receiver<TimestampMessage>,
registry: &MetricsRegistry,
) -> Self {
info!(
"Starting Timestamping Thread. Frequency: {} ms.",
Expand All @@ -478,6 +494,7 @@ impl Timestamper {
tx,
rx,
timestamp_frequency: frequency,
metrics: Metrics::register_with(registry),
}
}

Expand Down Expand Up @@ -718,8 +735,14 @@ impl Timestamper {
.name("rt_kafka_meta".to_string())
.spawn({
let connector = connector.clone();
let metrics = self.metrics.clone();
move || {
rt_kafka_metadata_fetch_loop(connector, consumer, metadata_refresh_frequency)
rt_kafka_metadata_fetch_loop(
connector,
consumer,
metadata_refresh_frequency,
&metrics,
)
}
})
.unwrap();
Expand Down Expand Up @@ -861,7 +884,12 @@ impl Timestamper {
}
}

fn rt_kafka_metadata_fetch_loop(c: RtKafkaConnector, consumer: BaseConsumer, wait: Duration) {
fn rt_kafka_metadata_fetch_loop(
c: RtKafkaConnector,
consumer: BaseConsumer,
wait: Duration,
metrics: &Metrics,
) {
debug!(
"Starting realtime Kafka thread for {} (source {})",
&c.topic, &c.id
Expand Down Expand Up @@ -924,7 +952,7 @@ fn rt_kafka_metadata_fetch_loop(c: RtKafkaConnector, consumer: BaseConsumer, wai
for pid in 0..current_partition_count {
match consumer.fetch_watermarks(&c.topic, pid, Duration::from_secs(30)) {
Ok((_low, high)) => {
let max_offset = MAX_AVAILABLE_OFFSET.with_label_values(&[
let max_offset = metrics.max_available_offset.with_label_values(&[
&c.topic,
&c.id.to_string(),
&pid.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/coordtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use std::time::{Duration, Instant};

use anyhow::anyhow;
use futures::future::FutureExt;
use ore::metrics::MetricsRegistry;
use tempfile::{NamedTempFile, TempDir};
use tokio::sync::mpsc;

Expand Down Expand Up @@ -86,13 +87,15 @@ pub struct CoordTest {
uppers: HashMap<GlobalId, Timestamp>,
timestamp: Arc<Mutex<u64>>,
_verbose: bool,
_metrics_registry: MetricsRegistry,
}

impl CoordTest {
pub async fn new() -> anyhow::Result<Self> {
let catalog_file = NamedTempFile::new()?;
let metrics_registry = MetricsRegistry::new();
let (handle, client, coord_feedback_tx, dataflow_feedback_rx, timestamp) =
coord::serve_debug(catalog_file.path());
coord::serve_debug(catalog_file.path(), metrics_registry.clone());
let coordtest = CoordTest {
_handle: handle,
client: Some(client),
Expand All @@ -103,6 +106,7 @@ impl CoordTest {
uppers: HashMap::new(),
_verbose: std::env::var_os("COORDTEST_VERBOSE").is_some(),
timestamp,
_metrics_registry: metrics_registry,
};
Ok(coordtest)
}
Expand Down
1 change: 0 additions & 1 deletion src/dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pdqselect = "0.1.0"
postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" }
postgres-util = { path = "../postgres-util" }
prometheus = { git = "https://github.com/MaterializeInc/rust-prometheus.git", default-features = false }
prometheus-static-metric = { git = "https://github.com/MaterializeInc/rust-prometheus.git" }
pubnub-hyper = { git = "https://github.com/MaterializeInc/pubnub-rust", default-features = false }
rand = "0.8.4"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "ssl-vendored", "gssapi-vendored", "libz-static", "zstd"] }
Expand Down
Loading

0 comments on commit 480aa30

Please sign in to comment.