diff --git a/src/stores/client.rs b/src/stores/client.rs index 8aefa7a..6ba5e47 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -47,57 +47,144 @@ impl ClientStore for sqlx::PgPool { client.token ); - let mut transaction = self.begin().await?; + #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] + pub struct ClientSelect { + pub id: String, + pub device_token: String, + pub tenant_id: String, + } - // Statement for locking based on the client id to prevent an issue #230 - // and locking based on the token to prevent an issue #292 let start = Instant::now(); - sqlx::query( - "SELECT - pg_advisory_xact_lock(abs(hashtext($1::text))), - pg_advisory_xact_lock(abs(hashtext($2::text)))", - ) - .bind(id) - .bind(client.token.clone()) - .execute(&mut transaction) - .await?; + let mut transaction = self.begin().await?; if let Some(metrics) = metrics { - metrics.postgres_query("create_client_pg_advisory_xact_lock", start); + metrics.postgres_query("create_client_begin", start); } + let query = " + SELECT * + FROM public.clients + WHERE id = $1 + OR device_token = $2 + FOR UPDATE + "; let start = Instant::now(); - sqlx::query("DELETE FROM public.clients WHERE id = $1 OR device_token = $2") + let existing_client = sqlx::query_as::(query) .bind(id) .bind(client.token.clone()) - .execute(&mut transaction) - .await?; + .fetch_one(&mut transaction) + .await + .map(Some) + .or_else(|e| match e { + sqlx::Error::RowNotFound => Ok(None), + e => Err(e), + })?; if let Some(metrics) = metrics { metrics.postgres_query("create_client_delete", start); } - let start = Instant::now(); - let mut insert_query = sqlx::QueryBuilder::new( - "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", - ); - insert_query.push_values( - vec![( - id, - tenant_id, - client.push_type, - client.token, - client.always_raw, - )], - |mut b, client| { - b.push_bind(client.0) - .push_bind(client.1) - .push_bind(client.2) - .push_bind(client.3) - .push_bind(client.4); - }, - ); - insert_query.build().execute(&mut transaction).await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_insert", start); + if let Some(existing_client) = existing_client { + if existing_client.id == id && existing_client.device_token != client.token { + let query = " + UPDATE public.clients + SET device_token = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.token) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_device_token", start); + } + } else if existing_client.device_token == client.token && existing_client.id != id { + let query = " + DELETE FROM public.notifications + WHERE client_id = $1 + AND tenant_id = $2 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(existing_client.id) + .bind(existing_client.tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_delete_notifications", start); + } + + let query = " + UPDATE public.clients + SET id = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 + WHERE device_token = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(client.token) + .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } + } else { + let query = " + UPDATE public.clients + SET push_type = $2, + always_raw = $3, + tenant_id = $4 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } + } + } else { + let start = Instant::now(); + let mut insert_query = sqlx::QueryBuilder::new( + "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", + ); + insert_query.push_values( + vec![( + id, + tenant_id, + client.push_type, + client.token, + client.always_raw, + )], + |mut b, client| { + b.push_bind(client.0) + .push_bind(client.1) + .push_bind(client.2) + .push_bind(client.3) + .push_bind(client.4); + }, + ); + insert_query.build().execute(&mut transaction).await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_insert", start); + } } let start = Instant::now();