Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the amount of duplicate code in per-dataflow-worker metrics #7690

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dataflow-types = { path = "../dataflow-types" }
dec = { version = "0.4.5", features = ["serde"] }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
dogsdogsdogs = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
enum-iterator = "0.7.0"
enum-kinds = "0.5.0"
expr = { path = "../expr" }
flate2 = "1.0.20"
futures = "0.3.16"
Expand All @@ -30,6 +32,7 @@ kafka-util = { path = "../kafka-util" }
lazy_static = "1.4.0"
log = "0.4.13"
mz-avro = { path = "../avro", features = ["snappy"] }
num_enum = "0.5.3"
ore = { path = "../ore" }
pdqselect = "0.1.0"
persist = { path = "../persist" }
Expand All @@ -48,6 +51,7 @@ rusoto_s3 = "0.47.0"
rusoto_sqs = "0.47.0"
serde = { version = "1.0.127", features = ["derive"] }
serde_json = "1.0.66"
serde_variant = "0.1.0"
tempfile = "3.2.0"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tokio = { version = "1.9.0", features = ["fs", "rt", "sync"] }
Expand Down
12 changes: 11 additions & 1 deletion src/dataflow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::Collection;
use enum_iterator::IntoEnumIterator;
use enum_kinds::EnumKind;
use num_enum::IntoPrimitive;
use ore::metrics::MetricsRegistry;
use serde::{Deserialize, Serialize};
use timely::communication::initialize::WorkerGuards;
Expand Down Expand Up @@ -60,7 +63,14 @@ mod metrics;
static TS_BINDING_FEEDBACK_INTERVAL_MS: u128 = 1_000;

/// Explicit instructions for timely dataflow workers.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, EnumKind)]
#[enum_kind(
CommandKind,
derive(Serialize, IntoPrimitive, IntoEnumIterator),
repr(usize),
serde(rename_all = "snake_case"),
doc = "The kind of command that was received"
)]
pub enum Command {
/// Create a sequence of dataflows.
///
Expand Down
193 changes: 25 additions & 168 deletions src/dataflow/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

//! Prometheus metrics for our interactive dataflow server

use enum_iterator::IntoEnumIterator;
use ore::metrics::MetricsRegistry;
use ore::{
metric,
metrics::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec},
};
use serde_variant::to_variant_name;

use super::{Command, PendingPeek};
use super::{Command, CommandKind, PendingPeek};

#[derive(Clone)]
pub(super) struct ServerMetrics {
Expand Down Expand Up @@ -84,7 +86,7 @@ impl WorkerMetrics {

/// Observe that we have executed a command. Must be paired with [`WorkerMetrics::observe_command_finish`]
pub(super) fn observe_command(&mut self, command: &Command) {
self.commands_processed.observe(command)
self.commands_processed.observe(command.into());
}

/// Observe that we have executed a command
Expand All @@ -96,183 +98,38 @@ impl WorkerMetrics {
/// Count of how many metrics we have processed for a given command type
#[derive(Debug)]
struct CommandsProcessedMetrics {
create_dataflows_int: i32,
create_dataflows: IntCounter,
drop_sources_int: i32,
drop_sources: IntCounter,
drop_sinks_int: i32,
drop_sinks: IntCounter,
drop_indexes_int: i32,
drop_indexes: IntCounter,
peek_int: i32,
peek: IntCounter,
cancel_peek_int: i32,
cancel_peek: IntCounter,
create_local_input_int: i32,
create_local_input: IntCounter,
insert_int: i32,
insert: IntCounter,
allow_compaction_int: i32,
allow_compaction: IntCounter,
durability_frontier_updates_int: i32,
durability_frontier_updates: IntCounter,
append_log_int: i32,
append_log: IntCounter,
add_source_timestamping_int: i32,
add_source_timestamping: IntCounter,
advance_source_timestamp_int: i32,
advance_source_timestamp: IntCounter,
drop_source_timestamping_int: i32,
drop_source_timestamping: IntCounter,
enable_feedback_int: i32,
enable_feedback: IntCounter,
enable_logging_int: i32,
enable_logging: IntCounter,
shutdown_int: i32,
shutdown: IntCounter,
advance_all_local_inputs_int: i32,
advance_all_local_inputs: IntCounter,
cache: Vec<i64>,
counters: Vec<IntCounter>,
}

impl CommandsProcessedMetrics {
fn new(worker: &str, commands_processed_metric: &IntCounterVec) -> CommandsProcessedMetrics {
CommandsProcessedMetrics {
create_dataflows_int: 0,
create_dataflows: commands_processed_metric
.with_label_values(&[worker, "create_dataflows"]),
drop_sources_int: 0,
drop_sources: commands_processed_metric.with_label_values(&[worker, "drop_sources"]),
drop_sinks_int: 0,
drop_sinks: commands_processed_metric.with_label_values(&[worker, "drop_sinks"]),
drop_indexes_int: 0,
drop_indexes: commands_processed_metric.with_label_values(&[worker, "drop_indexes"]),
peek_int: 0,
peek: commands_processed_metric.with_label_values(&[worker, "peek"]),
cancel_peek_int: 0,
cancel_peek: commands_processed_metric.with_label_values(&[worker, "cancel_peek"]),
create_local_input_int: 0,
create_local_input: commands_processed_metric
.with_label_values(&[worker, "create_local_input"]),
insert_int: 0,
insert: commands_processed_metric.with_label_values(&[worker, "insert"]),
allow_compaction_int: 0,
allow_compaction: commands_processed_metric
.with_label_values(&[worker, "allow_compaction"]),
durability_frontier_updates_int: 0,
durability_frontier_updates: commands_processed_metric
.with_label_values(&[worker, "durability_frontier_updates"]),
append_log_int: 0,
append_log: commands_processed_metric.with_label_values(&[worker, "append_log"]),
add_source_timestamping_int: 0,
add_source_timestamping: commands_processed_metric
.with_label_values(&[worker, "add_source_timestamping"]),
advance_source_timestamp_int: 0,
advance_source_timestamp: commands_processed_metric
.with_label_values(&[worker, "advance_source_timestamp"]),
drop_source_timestamping_int: 0,
drop_source_timestamping: commands_processed_metric
.with_label_values(&[worker, "drop_source_timestamping"]),
enable_feedback_int: 0,
enable_feedback: commands_processed_metric
.with_label_values(&[worker, "enable_feedback"]),
enable_logging_int: 0,
enable_logging: commands_processed_metric
.with_label_values(&[worker, "enable_logging"]),
shutdown_int: 0,
shutdown: commands_processed_metric.with_label_values(&[worker, "shutdown"]),
advance_all_local_inputs_int: 0,
advance_all_local_inputs: commands_processed_metric
.with_label_values(&[worker, "advance_all_local_inputs"]),
cache: CommandKind::into_enum_iter().map(|_| 0).collect(),
counters: CommandKind::into_enum_iter()
.map(|kind| {
commands_processed_metric
.with_label_values(&[worker, Self::seq_command_label(kind)])
})
.collect(),
}
}

fn observe(&mut self, command: &Command) {
match command {
Command::CreateDataflows(..) => self.create_dataflows_int += 1,
Command::DropSources(..) => self.drop_sources_int += 1,
Command::DropSinks(..) => self.drop_sinks_int += 1,
Command::DropIndexes(..) => self.drop_indexes_int += 1,
Command::Peek { .. } => self.peek_int += 1,
Command::CancelPeek { .. } => self.cancel_peek_int += 1,
Command::Insert { .. } => self.insert_int += 1,
Command::AllowCompaction(..) => self.allow_compaction_int += 1,
Command::DurabilityFrontierUpdates(..) => self.durability_frontier_updates_int += 1,
Command::AddSourceTimestamping { .. } => self.add_source_timestamping_int += 1,
Command::AdvanceSourceTimestamp { .. } => self.advance_source_timestamp_int += 1,
Command::DropSourceTimestamping { .. } => self.drop_source_timestamping_int += 1,
Command::EnableLogging(_) => self.enable_logging_int += 1,
Command::Shutdown { .. } => self.shutdown_int += 1,
Command::AdvanceAllLocalInputs { .. } => self.advance_all_local_inputs_int += 1,
}
fn seq_command_label(command: CommandKind) -> &'static str {
to_variant_name(&command).expect("Failed to convert enum with only unit variants")
}

fn observe(&mut self, command: CommandKind) {
let idx: usize = command.into();
self.cache[idx] += 1;
}

fn finish(&mut self) {
if self.create_dataflows_int > 0 {
self.create_dataflows
.inc_by(self.create_dataflows_int as i64);
self.create_dataflows_int = 0;
}
if self.drop_sources_int > 0 {
self.drop_sources.inc_by(self.drop_sources_int as i64);
self.drop_sources_int = 0;
}
if self.drop_sinks_int > 0 {
self.drop_sinks.inc_by(self.drop_sinks_int as i64);
self.drop_sinks_int = 0;
}
if self.drop_indexes_int > 0 {
self.drop_indexes.inc_by(self.drop_indexes_int as i64);
self.drop_indexes_int = 0;
}
if self.peek_int > 0 {
self.peek.inc_by(self.peek_int as i64);
self.peek_int = 0;
}
if self.cancel_peek_int > 0 {
self.cancel_peek.inc_by(self.cancel_peek_int as i64);
self.cancel_peek_int = 0;
}
if self.insert_int > 0 {
self.insert.inc_by(self.insert_int as i64);
self.insert_int = 0;
}
if self.allow_compaction_int > 0 {
self.allow_compaction
.inc_by(self.allow_compaction_int as i64);
self.allow_compaction_int = 0;
}
if self.durability_frontier_updates_int > 0 {
self.durability_frontier_updates
.inc_by(self.durability_frontier_updates_int as i64);
self.durability_frontier_updates_int = 0;
}
if self.add_source_timestamping_int > 0 {
self.add_source_timestamping
.inc_by(self.add_source_timestamping_int as i64);
self.add_source_timestamping_int = 0;
}
if self.advance_source_timestamp_int > 0 {
self.advance_source_timestamp
.inc_by(self.advance_source_timestamp_int as i64);
self.advance_source_timestamp_int = 0;
}
if self.drop_source_timestamping_int > 0 {
self.drop_source_timestamping
.inc_by(self.drop_source_timestamping_int as i64);
self.drop_source_timestamping_int = 0;
}
if self.enable_feedback_int > 0 {
self.enable_feedback.inc_by(self.enable_feedback_int as i64);
self.enable_feedback_int = 0;
}
if self.shutdown_int > 0 {
self.shutdown.inc_by(self.shutdown_int as i64);
self.shutdown_int = 0;
}
if self.advance_all_local_inputs_int > 0 {
self.advance_all_local_inputs
.inc_by(self.advance_all_local_inputs_int as i64);
self.advance_all_local_inputs_int = 0;
for (cache_entry, counter) in self.cache.iter_mut().zip(self.counters.iter()) {
if *cache_entry != 0 {
counter.inc_by(*cache_entry);
}
*cache_entry = 0;
antifuchs marked this conversation as resolved.
Show resolved Hide resolved
}
}
}