Skip to content

Commit

Permalink
fix: Postgres query metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed May 20, 2024
1 parent ccc7875 commit d12d9a6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/handlers/register_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub async fn handler(
token: body.token,
always_raw,
},
state.metrics.as_ref(),
)
.await?;

Expand Down
35 changes: 34 additions & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
use wc::metrics::{otel::metrics::Counter, ServiceMetrics};
use {
std::time::Instant,
wc::metrics::{
otel::{
metrics::{Counter, Histogram},
KeyValue,
},
ServiceMetrics,
},
};

#[derive(Clone)]
pub struct Metrics {
Expand All @@ -16,6 +25,9 @@ pub struct Metrics {

pub tenant_suspensions: Counter<u64>,
pub client_suspensions: Counter<u64>,

postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
}

impl Default for Metrics {
Expand Down Expand Up @@ -84,6 +96,16 @@ impl Metrics {
.with_description("The number of clients that have been suspended")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
.init();

let postgres_query_latency: Histogram<u64> = meter
.u64_histogram("postgres_query_latency")
.with_description("The latency Postgres queries")
.init();

Metrics {
registered_clients: clients_counter,
received_notifications: received_notification_counter,
Expand All @@ -96,6 +118,17 @@ impl Metrics {
tenant_fcm_v1_updates: tenant_fcm_v1_updates_counter,
tenant_suspensions: tenant_suspensions_counter,
client_suspensions: client_suspensions_counter,
postgres_queries,
postgres_query_latency,
}
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

let attributes = [KeyValue::new("name", query_name)];
self.postgres_queries.add(1, &attributes);
self.postgres_query_latency
.record(elapsed.as_millis() as u64, &attributes);
}
}
37 changes: 34 additions & 3 deletions src/stores/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use {
crate::{
metrics::Metrics,
providers::ProviderKind,
stores::{self, StoreError::NotFound},
},
async_trait::async_trait,
sqlx::Executor,
std::time::Instant,
tracing::{debug, instrument},
};

Expand All @@ -19,15 +21,27 @@ pub struct Client {

#[async_trait]
pub trait ClientStore {
async fn create_client(&self, tenant_id: &str, id: &str, client: Client) -> stores::Result<()>;
async fn create_client(
&self,
tenant_id: &str,
id: &str,
client: Client,
metrics: Option<&Metrics>,
) -> stores::Result<()>;
async fn get_client(&self, tenant_id: &str, id: &str) -> stores::Result<Client>;
async fn delete_client(&self, tenant_id: &str, id: &str) -> stores::Result<()>;
}

#[async_trait]
impl ClientStore for sqlx::PgPool {
#[instrument(skip(self, client))]
async fn create_client(&self, tenant_id: &str, id: &str, client: Client) -> stores::Result<()> {
#[instrument(skip(self, client, metrics))]
async fn create_client(
&self,
tenant_id: &str,
id: &str,
client: Client,
metrics: Option<&Metrics>,
) -> stores::Result<()> {
debug!(
"ClientStore::create_client tenant_id={tenant_id} id={id} token={} with locking",
client.token
Expand All @@ -37,6 +51,7 @@ impl ClientStore for sqlx::PgPool {

// Statement for locking based on the client id to prevent an issue #230
// and locking based on the token to prevent an issue #292
let start = Instant::now();
sqlx::query(
"SELECT
pg_advisory_xact_lock(abs(hashtext($1::text))),
Expand All @@ -46,13 +61,21 @@ impl ClientStore for sqlx::PgPool {
.bind(client.token.clone())
.execute(&mut transaction)
.await?;
if let Some(metrics) = metrics {
metrics.postgres_query("create_client_pg_advisory_xact_lock", start);
}

let start = Instant::now();
sqlx::query("DELETE FROM public.clients WHERE id = $1 OR device_token = $2")
.bind(id)
.bind(client.token.clone())
.execute(&mut transaction)
.await?;
if let Some(metrics) = metrics {
metrics.postgres_query("create_client_delete", start);
}

let start = Instant::now();
let mut insert_query = sqlx::QueryBuilder::new(
"INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)",
);
Expand All @@ -73,7 +96,15 @@ impl ClientStore for sqlx::PgPool {
},
);
insert_query.build().execute(&mut transaction).await?;
if let Some(metrics) = metrics {
metrics.postgres_query("create_client_insert", start);
}

let start = Instant::now();
transaction.commit().await?;
if let Some(metrics) = metrics {
metrics.postgres_query("create_client_commit", start);
}

Ok(())
}
Expand Down
11 changes: 11 additions & 0 deletions tests/functional/stores/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn client_creation(ctx: &mut StoreContext) {
token,
always_raw: false,
},
None,
)
.await
.unwrap();
Expand All @@ -46,6 +47,7 @@ async fn client_creation_fcm(ctx: &mut StoreContext) {
token,
always_raw: false,
},
None,
)
.await
.unwrap();
Expand All @@ -68,6 +70,7 @@ async fn client_creation_apns(ctx: &mut StoreContext) {
token,
always_raw: false,
},
None,
)
.await
.unwrap();
Expand All @@ -92,6 +95,7 @@ async fn client_upsert_token(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -133,6 +137,7 @@ async fn client_upsert_token(ctx: &mut StoreContext) {
token: updated_token.clone(),
always_raw: true,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -164,6 +169,7 @@ async fn client_upsert_id(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -205,6 +211,7 @@ async fn client_upsert_id(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -239,6 +246,7 @@ async fn client_create_same_id_and_token(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -280,6 +288,7 @@ async fn client_create_same_id_and_token(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -310,6 +319,7 @@ async fn client_deletion(ctx: &mut StoreContext) {
token,
always_raw: false,
},
None,
)
.await
.unwrap();
Expand All @@ -332,6 +342,7 @@ async fn client_fetch(ctx: &mut StoreContext) {
token: token.clone(),
always_raw: false,
},
None,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions tests/functional/stores/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub async fn create_client(client_store: &ClientStoreArc) -> String {
token,
always_raw: false,
},
None,
)
.await
.expect("failed to create client for notification test");
Expand Down

0 comments on commit d12d9a6

Please sign in to comment.