Skip to content

Commit

Permalink
Add configuration version metrics (#504)
Browse files Browse the repository at this point in the history
### What

This PR adds a metric representing the configuration version that a
running ndc-postgres instance uses.

This will help us track adoption of new versions and inform deprecation
timelines and documentation.

### How

Rather than extending the existing metrics in the query_engine crate we
make a new Metrics struct in the ndc_postgres_configuration crate. This
is to prevent the query_engine metrics from accumulating unrelated cruft
and reduce the complexity of the module graph.
  • Loading branch information
plcplc committed Jun 21, 2024
1 parent 53565d6 commit 7fa6c56
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
query-engine-metadata = { path = "../query-engine/metadata" }

anyhow = { workspace = true }
prometheus = {workspace = true }
schemars = { workspace = true, features = ["smol_str", "preserve_order"] }
serde = { workspace = true }
serde_json = { workspace = true, features = ["raw_value"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::error::{
use crate::values::{IsolationLevel, PoolSettings};
use crate::version3;
use crate::version4;
use crate::VersionTag;
use schemars::{gen::SchemaSettings, schema::RootSchema};

pub fn generate_latest_schema() -> RootSchema {
Expand Down Expand Up @@ -60,12 +61,12 @@ impl ParsedConfiguration {
#[derive(Debug)]
pub struct Configuration {
pub metadata: metadata::Metadata,
pub configuration_version_tag: VersionTag,
pub pool_settings: PoolSettings,
pub connection_uri: String,
pub isolation_level: IsolationLevel,
pub mutations_version: Option<metadata::mutations::MutationsVersion>,
}

pub async fn introspect(
input: ParsedConfiguration,
environment: impl Environment,
Expand Down
10 changes: 10 additions & 0 deletions crates/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod values;

pub mod environment;
pub mod error;
pub mod metrics;

mod version3;
mod version4;

Expand All @@ -13,6 +15,14 @@ pub use configuration::{
};
pub use values::{ConnectionUri, IsolationLevel, PoolSettings, Secret};

pub use metrics::Metrics;

#[derive(Debug, Copy, Clone)]
pub enum VersionTag {
Version3,
Version4,
}

#[cfg(test)]
pub mod tests;

Expand Down
61 changes: 61 additions & 0 deletions crates/configuration/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! Metrics setup and update for our connector.

use prometheus::{IntGauge, Registry};

use crate::VersionTag;

/// The collection of configuration-related metrics exposed through the `/metrics` endpoint.
#[derive(Debug, Clone)]
pub struct Metrics {
configuration_version_3: IntGauge,
configuration_version_4: IntGauge,
}

impl Metrics {
/// Set up counters and gauges used to produce Prometheus metrics
pub fn initialize(metrics_registry: &mut Registry) -> Result<Self, prometheus::Error> {
let configuration_version_3 = add_int_gauge_metric(
metrics_registry,
"ndc_postgres_configuration_version_3",
"Get whether configuration version 3 is used",
)?;

let configuration_version_4 = add_int_gauge_metric(
metrics_registry,
"ndc_postgres_configuration_version_4",
"Get whether configuration version 4 is used",
)?;

Ok(Self {
configuration_version_3,
configuration_version_4,
})
}

/// Set the configuration version used by this connector instance.
pub fn set_configuration_version(&self, version: VersionTag) {
match version {
VersionTag::Version3 => self.configuration_version_3.set(1),
VersionTag::Version4 => self.configuration_version_4.set(1),
}
}
}

/// Create a new int gauge metric and register it with the provided Prometheus Registry
fn add_int_gauge_metric(
metrics_registry: &mut Registry,
metric_name: &str,
metric_description: &str,
) -> Result<IntGauge, prometheus::Error> {
let int_gauge = IntGauge::with_opts(prometheus::Opts::new(metric_name, metric_description))?;
register_collector(metrics_registry, int_gauge)
}

/// Register a new collector with the registry, and returns it for later use.
fn register_collector<Collector: prometheus::core::Collector + std::clone::Clone + 'static>(
metrics_registry: &mut Registry,
collector: Collector,
) -> Result<Collector, prometheus::Error> {
metrics_registry.register(Box::new(collector.clone()))?;
Ok(collector)
}
2 changes: 2 additions & 0 deletions crates/configuration/src/version3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::error::{
MakeRuntimeConfigurationError, ParseConfigurationError, WriteParsedConfigurationError,
};
use crate::values::{ConnectionUri, Secret};
use crate::VersionTag;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -551,6 +552,7 @@ pub fn make_runtime_configuration(
connection_uri,
isolation_level: configuration.connection_settings.isolation_level,
mutations_version: convert_mutations_version(configuration.mutations_version),
configuration_version_tag: VersionTag::Version3,
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/configuration/src/version4/to_runtime_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::ParsedConfiguration;
use crate::environment::Environment;
use crate::error::MakeRuntimeConfigurationError;
use crate::values::{ConnectionUri, Secret};
use crate::VersionTag;

/// Convert the parsed configuration metadata to internal engine metadata
/// That can be used by the connector at runtime.
Expand All @@ -30,6 +31,7 @@ pub fn make_runtime_configuration(
connection_uri,
isolation_level: parsed_config.connection_settings.isolation_level,
mutations_version: convert_mutations_version(parsed_config.mutations_version),
configuration_version_tag: VersionTag::Version4,
})
}

Expand Down
3 changes: 2 additions & 1 deletion crates/connectors/ndc-postgres/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Connector for Postgres {
_configuration: &Self::Configuration,
state: &Self::State,
) -> Result<(), connector::FetchMetricsError> {
state.metrics.update_pool_metrics(&state.pool);
state.query_metrics.update_pool_metrics(&state.pool);
Ok(())
}

Expand Down Expand Up @@ -292,6 +292,7 @@ impl<Env: Environment + Send + Sync> ConnectorSetup for PostgresSetup<Env> {
&configuration.connection_uri,
&configuration.pool_settings,
metrics,
configuration.configuration_version_tag,
)
.instrument(info_span!("Initialise state"))
.await
Expand Down
12 changes: 6 additions & 6 deletions crates/connectors/ndc-postgres/src/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn mutation(
state: &state::State,
request: models::MutationRequest,
) -> Result<JsonResponse<models::MutationResponse>, connector::MutationError> {
let timer = state.metrics.time_mutation_total();
let timer = state.query_metrics.time_mutation_total();

// See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code
let result = async move {
Expand All @@ -41,7 +41,7 @@ pub async fn mutation(

let plan = async {
plan_mutation(configuration, state, request).map_err(|err| {
record::translation_error(&err, &state.metrics);
record::translation_error(&err, &state.query_metrics);
convert::translation_error_to_mutation_error(&err)
})
}
Expand All @@ -50,14 +50,14 @@ pub async fn mutation(

let result = async {
execute_mutation(state, plan).await.map_err(|err| {
record::execution_error(&err, &state.metrics);
record::execution_error(&err, &state.query_metrics);
convert::execution_error_to_mutation_error(err)
})
}
.instrument(info_span!("Execute mutation"))
.await?;

state.metrics.record_successful_mutation();
state.query_metrics.record_successful_mutation();
Ok(result)
}
.instrument(info_span!("/mutation"))
Expand All @@ -75,7 +75,7 @@ fn plan_mutation(
sql::execution_plan::ExecutionPlan<sql::execution_plan::Mutations>,
translation::error::Error,
> {
let timer = state.metrics.time_mutation_plan();
let timer = state.query_metrics.time_mutation_plan();
let mutations = request
.operations
.into_iter()
Expand All @@ -101,7 +101,7 @@ async fn execute_mutation(
query_engine_execution::mutation::execute(
&state.pool,
&state.database_info,
&state.metrics,
&state.query_metrics,
plan,
)
.await
Expand Down
8 changes: 4 additions & 4 deletions crates/connectors/ndc-postgres/src/mutation/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn explain(
.instrument(info_span!("Plan mutation"))
.await
.map_err(|err| {
record::translation_error(&err, &state.metrics);
record::translation_error(&err, &state.query_metrics);
convert::translation_error_to_explain_error(&err)
})?;

Expand All @@ -46,19 +46,19 @@ pub async fn explain(
query_engine_execution::mutation::explain(
&state.pool,
&state.database_info,
&state.metrics,
&state.query_metrics,
plan,
)
.await
.map_err(|err| {
record::execution_error(&err, &state.metrics);
record::execution_error(&err, &state.query_metrics);
convert::execution_error_to_explain_error(err)
})
}
.instrument(info_span!("Explain mutation"))
.await?;

state.metrics.record_successful_explain();
state.query_metrics.record_successful_explain();

let details: BTreeMap<String, String> = results
.into_iter()
Expand Down
21 changes: 13 additions & 8 deletions crates/connectors/ndc-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn query(
state: &state::State,
query_request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
let timer = state.metrics.time_query_total();
let timer = state.query_metrics.time_query_total();

// See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code
let result = async move {
Expand All @@ -40,7 +40,7 @@ pub async fn query(

let plan = async {
plan_query(configuration, state, query_request).map_err(|err| {
record::translation_error(&err, &state.metrics);
record::translation_error(&err, &state.query_metrics);
convert::translation_error_to_query_error(&err)
})
}
Expand All @@ -49,14 +49,14 @@ pub async fn query(

let result = async {
execute_query(state, plan).await.map_err(|err| {
record::execution_error(&err, &state.metrics);
record::execution_error(&err, &state.query_metrics);
convert::execution_error_to_query_error(err)
})
}
.instrument(info_span!("Execute query"))
.await?;

state.metrics.record_successful_query();
state.query_metrics.record_successful_query();
Ok(result)
}
.instrument(info_span!("/query"))
Expand All @@ -71,7 +71,7 @@ fn plan_query(
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>, translation::error::Error>
{
let timer = state.metrics.time_query_plan();
let timer = state.query_metrics.time_query_plan();
let result = translation::query::translate(&configuration.metadata, query_request);
timer.complete_with(result)
}
Expand All @@ -80,7 +80,12 @@ async fn execute_query(
state: &state::State,
plan: sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>,
) -> Result<JsonResponse<models::QueryResponse>, query_engine_execution::error::Error> {
query_engine_execution::query::execute(&state.pool, &state.database_info, &state.metrics, plan)
.await
.map(JsonResponse::Serialized)
query_engine_execution::query::execute(
&state.pool,
&state.database_info,
&state.query_metrics,
plan,
)
.await
.map(JsonResponse::Serialized)
}
8 changes: 4 additions & 4 deletions crates/connectors/ndc-postgres/src/query/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn explain(
// Compile the query.
let plan = async {
super::plan_query(configuration, state, query_request).map_err(|err| {
record::translation_error(&err, &state.metrics);
record::translation_error(&err, &state.query_metrics);
convert::translation_error_to_explain_error(&err)
})
}
Expand All @@ -45,19 +45,19 @@ pub async fn explain(
query_engine_execution::query::explain(
&state.pool,
&state.database_info,
&state.metrics,
&state.query_metrics,
plan,
)
.await
.map_err(|err| {
record::execution_error(&err, &state.metrics);
record::execution_error(&err, &state.query_metrics);
convert::execution_error_to_explain_error(err)
})
}
.instrument(info_span!("Explain query"))
.await?;

state.metrics.record_successful_explain();
state.query_metrics.record_successful_explain();

let details =
BTreeMap::from_iter([("SQL Query".into(), query), ("Execution Plan".into(), plan)]);
Expand Down
Loading

0 comments on commit 7fa6c56

Please sign in to comment.