Skip to content

Commit

Permalink
Rework per-worker CommandProcessedMetrics to reduce duplicate code
Browse files Browse the repository at this point in the history
By combining enum_kinds and num_enum, we can change the worker command
representation to be an array of counters & values by which we increment
them. That reduces the number of names we have to duplicate in code, and
requires one fewer match statement on Command variants.

It also gets rid of some metrics that were represented but never got
incremented.
  • Loading branch information
antifuchs committed Aug 4, 2021
1 parent f8fcd5d commit 30b1298
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 174 deletions.
21 changes: 17 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ dataflow-types = { path = "../dataflow-types" }
dec = { version = "0.4.5", features = ["serde"] }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
dogsdogsdogs = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
enum-iterator = "0.7.0"
enum-kinds = "0.5.0"
num_enum = "0.5.3"
expr = { path = "../expr" }
flate2 = "1.0.20"
futures = "0.3.16"
Expand Down Expand Up @@ -48,6 +51,7 @@ rusoto_s3 = "0.47.0"
rusoto_sqs = "0.47.0"
serde = { version = "1.0.127", features = ["derive"] }
serde_json = "1.0.66"
serde_variant = "0.1.0"
tempfile = "3.2.0"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tokio = { version = "1.9.0", features = ["fs", "rt", "sync"] }
Expand Down
14 changes: 13 additions & 1 deletion src/dataflow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::Collection;
use enum_iterator::IntoEnumIterator;
use enum_kinds::EnumKind;
use num_enum::IntoPrimitive;
use ore::metrics::MetricsRegistry;
use serde::{Deserialize, Serialize};
use timely::communication::initialize::WorkerGuards;
Expand Down Expand Up @@ -59,8 +62,17 @@ mod metrics;
/// back to the coordinator.
static TS_BINDING_FEEDBACK_INTERVAL_MS: u128 = 1_000;

// A dirty hack to allow us to derive EnumKind:

/// Explicit instructions for timely dataflow workers.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, EnumKind)]
#[enum_kind(
CommandKind,
derive(Serialize, IntoPrimitive, IntoEnumIterator),
repr(usize),
serde(rename_all = "snake_case"),
doc = "The kind of command that was received"
)]
pub enum Command {
/// Create a sequence of dataflows.
///
Expand Down
194 changes: 25 additions & 169 deletions src/dataflow/src/server/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
Expand All @@ -9,13 +8,15 @@

//! Prometheus metrics for our interactive dataflow server

use enum_iterator::IntoEnumIterator;
use ore::metrics::MetricsRegistry;
use ore::{
metric,
metrics::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec},
};
use serde_variant::to_variant_name;

use super::{Command, PendingPeek};
use super::{Command, CommandKind, PendingPeek};

#[derive(Clone)]
pub(super) struct ServerMetrics {
Expand Down Expand Up @@ -84,7 +85,7 @@ impl WorkerMetrics {

/// Observe that we have executed a command. Must be paired with [`WorkerMetrics::observe_command_finish`]
pub(super) fn observe_command(&mut self, command: &Command) {
self.commands_processed.observe(command)
self.commands_processed.observe(command.into());
}

/// Observe that we have executed a command
Expand All @@ -96,183 +97,38 @@ impl WorkerMetrics {
/// Count of how many metrics we have processed for a given command type
#[derive(Debug)]
struct CommandsProcessedMetrics {
create_dataflows_int: i32,
create_dataflows: IntCounter,
drop_sources_int: i32,
drop_sources: IntCounter,
drop_sinks_int: i32,
drop_sinks: IntCounter,
drop_indexes_int: i32,
drop_indexes: IntCounter,
peek_int: i32,
peek: IntCounter,
cancel_peek_int: i32,
cancel_peek: IntCounter,
create_local_input_int: i32,
create_local_input: IntCounter,
insert_int: i32,
insert: IntCounter,
allow_compaction_int: i32,
allow_compaction: IntCounter,
durability_frontier_updates_int: i32,
durability_frontier_updates: IntCounter,
append_log_int: i32,
append_log: IntCounter,
add_source_timestamping_int: i32,
add_source_timestamping: IntCounter,
advance_source_timestamp_int: i32,
advance_source_timestamp: IntCounter,
drop_source_timestamping_int: i32,
drop_source_timestamping: IntCounter,
enable_feedback_int: i32,
enable_feedback: IntCounter,
enable_logging_int: i32,
enable_logging: IntCounter,
shutdown_int: i32,
shutdown: IntCounter,
advance_all_local_inputs_int: i32,
advance_all_local_inputs: IntCounter,
cache: Vec<i64>,
counters: Vec<IntCounter>,
}

impl CommandsProcessedMetrics {
fn new(worker: &str, commands_processed_metric: &IntCounterVec) -> CommandsProcessedMetrics {
CommandsProcessedMetrics {
create_dataflows_int: 0,
create_dataflows: commands_processed_metric
.with_label_values(&[worker, "create_dataflows"]),
drop_sources_int: 0,
drop_sources: commands_processed_metric.with_label_values(&[worker, "drop_sources"]),
drop_sinks_int: 0,
drop_sinks: commands_processed_metric.with_label_values(&[worker, "drop_sinks"]),
drop_indexes_int: 0,
drop_indexes: commands_processed_metric.with_label_values(&[worker, "drop_indexes"]),
peek_int: 0,
peek: commands_processed_metric.with_label_values(&[worker, "peek"]),
cancel_peek_int: 0,
cancel_peek: commands_processed_metric.with_label_values(&[worker, "cancel_peek"]),
create_local_input_int: 0,
create_local_input: commands_processed_metric
.with_label_values(&[worker, "create_local_input"]),
insert_int: 0,
insert: commands_processed_metric.with_label_values(&[worker, "insert"]),
allow_compaction_int: 0,
allow_compaction: commands_processed_metric
.with_label_values(&[worker, "allow_compaction"]),
durability_frontier_updates_int: 0,
durability_frontier_updates: commands_processed_metric
.with_label_values(&[worker, "durability_frontier_updates"]),
append_log_int: 0,
append_log: commands_processed_metric.with_label_values(&[worker, "append_log"]),
add_source_timestamping_int: 0,
add_source_timestamping: commands_processed_metric
.with_label_values(&[worker, "add_source_timestamping"]),
advance_source_timestamp_int: 0,
advance_source_timestamp: commands_processed_metric
.with_label_values(&[worker, "advance_source_timestamp"]),
drop_source_timestamping_int: 0,
drop_source_timestamping: commands_processed_metric
.with_label_values(&[worker, "drop_source_timestamping"]),
enable_feedback_int: 0,
enable_feedback: commands_processed_metric
.with_label_values(&[worker, "enable_feedback"]),
enable_logging_int: 0,
enable_logging: commands_processed_metric
.with_label_values(&[worker, "enable_logging"]),
shutdown_int: 0,
shutdown: commands_processed_metric.with_label_values(&[worker, "shutdown"]),
advance_all_local_inputs_int: 0,
advance_all_local_inputs: commands_processed_metric
.with_label_values(&[worker, "advance_all_local_inputs"]),
cache: CommandKind::into_enum_iter().map(|_| 0).collect(),
counters: CommandKind::into_enum_iter()
.map(|kind| {
commands_processed_metric
.with_label_values(&[worker, Self::seq_command_label(kind)])
})
.collect(),
}
}

fn observe(&mut self, command: &Command) {
match command {
Command::CreateDataflows(..) => self.create_dataflows_int += 1,
Command::DropSources(..) => self.drop_sources_int += 1,
Command::DropSinks(..) => self.drop_sinks_int += 1,
Command::DropIndexes(..) => self.drop_indexes_int += 1,
Command::Peek { .. } => self.peek_int += 1,
Command::CancelPeek { .. } => self.cancel_peek_int += 1,
Command::Insert { .. } => self.insert_int += 1,
Command::AllowCompaction(..) => self.allow_compaction_int += 1,
Command::DurabilityFrontierUpdates(..) => self.durability_frontier_updates_int += 1,
Command::AddSourceTimestamping { .. } => self.add_source_timestamping_int += 1,
Command::AdvanceSourceTimestamp { .. } => self.advance_source_timestamp_int += 1,
Command::DropSourceTimestamping { .. } => self.drop_source_timestamping_int += 1,
Command::EnableLogging(_) => self.enable_logging_int += 1,
Command::Shutdown { .. } => self.shutdown_int += 1,
Command::AdvanceAllLocalInputs { .. } => self.advance_all_local_inputs_int += 1,
}
fn seq_command_label(command: CommandKind) -> &'static str {
to_variant_name(&command).expect("Failed to convert enum with only unit variants")
}

fn observe(&mut self, command: CommandKind) {
let idx: usize = command.into();
self.cache[idx] += 1;
}

fn finish(&mut self) {
if self.create_dataflows_int > 0 {
self.create_dataflows
.inc_by(self.create_dataflows_int as i64);
self.create_dataflows_int = 0;
}
if self.drop_sources_int > 0 {
self.drop_sources.inc_by(self.drop_sources_int as i64);
self.drop_sources_int = 0;
}
if self.drop_sinks_int > 0 {
self.drop_sinks.inc_by(self.drop_sinks_int as i64);
self.drop_sinks_int = 0;
}
if self.drop_indexes_int > 0 {
self.drop_indexes.inc_by(self.drop_indexes_int as i64);
self.drop_indexes_int = 0;
}
if self.peek_int > 0 {
self.peek.inc_by(self.peek_int as i64);
self.peek_int = 0;
}
if self.cancel_peek_int > 0 {
self.cancel_peek.inc_by(self.cancel_peek_int as i64);
self.cancel_peek_int = 0;
}
if self.insert_int > 0 {
self.insert.inc_by(self.insert_int as i64);
self.insert_int = 0;
}
if self.allow_compaction_int > 0 {
self.allow_compaction
.inc_by(self.allow_compaction_int as i64);
self.allow_compaction_int = 0;
}
if self.durability_frontier_updates_int > 0 {
self.durability_frontier_updates
.inc_by(self.durability_frontier_updates_int as i64);
self.durability_frontier_updates_int = 0;
}
if self.add_source_timestamping_int > 0 {
self.add_source_timestamping
.inc_by(self.add_source_timestamping_int as i64);
self.add_source_timestamping_int = 0;
}
if self.advance_source_timestamp_int > 0 {
self.advance_source_timestamp
.inc_by(self.advance_source_timestamp_int as i64);
self.advance_source_timestamp_int = 0;
}
if self.drop_source_timestamping_int > 0 {
self.drop_source_timestamping
.inc_by(self.drop_source_timestamping_int as i64);
self.drop_source_timestamping_int = 0;
}
if self.enable_feedback_int > 0 {
self.enable_feedback.inc_by(self.enable_feedback_int as i64);
self.enable_feedback_int = 0;
}
if self.shutdown_int > 0 {
self.shutdown.inc_by(self.shutdown_int as i64);
self.shutdown_int = 0;
}
if self.advance_all_local_inputs_int > 0 {
self.advance_all_local_inputs
.inc_by(self.advance_all_local_inputs_int as i64);
self.advance_all_local_inputs_int = 0;
for (cache_entry, counter) in self.cache.iter_mut().zip(self.counters.iter()) {
if *cache_entry != 0 {
counter.inc_by(*cache_entry);
}
*cache_entry = 0;
}
}
}

0 comments on commit 30b1298

Please sign in to comment.