Skip to content

Commit

Permalink
feat: Suspend Broken Tenants & Delete Broken Clients (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarryET committed Aug 9, 2023
1 parent f51c7c1 commit a8e1aa7
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 14 deletions.
5 changes: 5 additions & 0 deletions migrations/1691518766_add-suspension.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
alter table public.tenants
add suspended bool not null default false;

alter table public.tenants
add suspended_reason text;
1 change: 0 additions & 1 deletion migrations/new.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
#!/bin/bash
DESCRIPTION=$1
touch "./$(date +%s)_$DESCRIPTION.sql"
27 changes: 27 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub enum Error {
#[error(transparent)]
Fcm(#[from] fcm::FcmError),

#[error("FCM Responded with an error")]
FcmResponse(fcm::ErrorReason),

#[error(transparent)]
Io(#[from] std::io::Error),

Expand Down Expand Up @@ -171,6 +174,12 @@ pub enum Error {

#[error("invalid apns creds")]
BadApnsCredentials,

#[error("client deleted due to invalid device token")]
ClientDeleted,

#[error("tenant suspended due to invalid configuration")]
TenantSuspended,
}

impl IntoResponse for Error {
Expand Down Expand Up @@ -213,6 +222,12 @@ impl IntoResponse for Error {
message: e.to_string(),
}
], vec![]),
Error::FcmResponse(e) => crate::handlers::Response::new_failure(StatusCode::INTERNAL_SERVER_ERROR, vec![
ResponseError {
name: "fcm_response".to_string(),
message: format!("{:?}", e)
}
], vec![]),
Error::BadFcmApiKey => crate::handlers::Response::new_failure(StatusCode::BAD_REQUEST, vec![
ResponseError {
name: "bad_fcm_api_key".to_string(),
Expand Down Expand Up @@ -525,6 +540,18 @@ impl IntoResponse for Error {
location: ErrorLocation::Path,
}
]),
Error::ClientDeleted => crate::handlers::Response::new_failure(StatusCode::ACCEPTED, vec![
ResponseError {
name: "client_deleted".to_string(),
message: "Request Accepted, client deleted due to invalid token".to_string(),
},
], vec![]),
Error::TenantSuspended => crate::handlers::Response::new_failure(StatusCode::ACCEPTED, vec![
ResponseError {
name: "tenant_suspended".to_string(),
message: "Request Accepted, tenant suspended due to invalid configuration".to_string(),
},
], vec![]),
e => {
warn!("Error does not have response clause, {:?}", e);

Expand Down
4 changes: 4 additions & 0 deletions src/handlers/get_tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct GetTenantResponse {
enabled_providers: Vec<String>,
apns_topic: Option<String>,
apns_type: Option<ApnsType>,
suspended: bool,
suspended_reason: Option<String>,
}

pub async fn handler(
Expand Down Expand Up @@ -64,6 +66,8 @@ pub async fn handler(
enabled_providers: tenant.providers().iter().map(Into::into).collect(),
apns_topic: None,
apns_type: None,
suspended: tenant.suspended,
suspended_reason: tenant.suspended_reason,
};

if providers.contains(&ProviderKind::Apns) {
Expand Down
66 changes: 62 additions & 4 deletions src/handlers/push_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ pub async fn handler_internal(
"fetched tenant"
);

if tenant.suspended {
return Err((Error::TenantSuspended, analytics.clone()));
}

let mut provider = tenant
.provider(&client.push_type)
.map_err(|e| (e, analytics.clone()))?;
Expand All @@ -358,10 +362,64 @@ pub async fn handler_internal(
"fetched provider"
);

provider
.send_notification(client.token, body.payload)
.await
.map_err(|e| (e, analytics.clone()))?;
match provider.send_notification(client.token, body.payload).await {
Ok(_) => Ok(()),
Err(error) => match error {
Error::BadDeviceToken => {
state
.client_store
.delete_client(&tenant_id, &id)
.await
.map_err(|e| (Error::Store(e), analytics.clone()))?;
increment_counter!(state.metrics, client_suspensions);
warn!(
%request_id,
%tenant_id,
client_id = %id,
notification_id = %notification.id,
push_type = client.push_type.as_str(),
"client has been deleted due to a bad device token"
);
Err(Error::ClientDeleted)
}
Error::BadApnsCredentials => {
state
.tenant_store
.suspend_tenant(&tenant_id, "Invalid APNS Credentials")
.await
.map_err(|e| (e, analytics.clone()))?;
increment_counter!(state.metrics, tenant_suspensions);
warn!(
%request_id,
%tenant_id,
client_id = %id,
notification_id = %notification.id,
push_type = client.push_type.as_str(),
"tenant has been suspended due to invalid provider credentials"
);
Err(Error::TenantSuspended)
}
Error::BadFcmApiKey => {
state
.tenant_store
.suspend_tenant(&tenant_id, "Invalid FCM Credentials")
.await
.map_err(|e| (e, analytics.clone()))?;
increment_counter!(state.metrics, tenant_suspensions);
warn!(
%request_id,
%tenant_id,
client_id = %id,
notification_id = %notification.id,
push_type = client.push_type.as_str(),
"tenant has been suspended due to invalid provider credentials"
);
Err(Error::TenantSuspended)
}
e => Err(e),
},
}
.map_err(|e| (e, analytics.clone()))?;

info!(
%request_id,
Expand Down
8 changes: 7 additions & 1 deletion src/handlers/update_apns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,19 @@ pub async fn handler(

// ---- handler
if let Some(auth) = apns_updates.auth {
let _new_tenant = state
let new_tenant = state
.tenant_store
.update_tenant_apns_auth(&id, auth)
.await?;

increment_counter!(state.metrics, tenant_apns_updates);

if new_tenant.suspended {
// If suspended, it can be restored now because valid credentials have been
// provided
state.tenant_store.unsuspend_tenant(&new_tenant.id).await?;
}

return Ok(Json(UpdateTenantApnsResponse { success: true }));
}

Expand Down
8 changes: 7 additions & 1 deletion src/handlers/update_fcm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ pub async fn handler(
fcm_api_key: body.api_key,
};

let _new_tenant = state
let new_tenant = state
.tenant_store
.update_tenant_fcm(&id, update_body)
.await?;

if new_tenant.suspended {
// If suspended, it can be restored now because valid credentials have been
// provided
state.tenant_store.unsuspend_tenant(&new_tenant.id).await?;
}

increment_counter!(state.metrics, tenant_fcm_updates);

Ok(Json(UpdateTenantFcmResponse { success: true }))
Expand Down
17 changes: 16 additions & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub struct Metrics {

pub tenant_apns_updates: Counter<u64>,
pub tenant_fcm_updates: Counter<u64>,

pub tenant_suspensions: Counter<u64>,
pub client_suspensions: Counter<u64>,
}

impl Metrics {
Expand Down Expand Up @@ -80,7 +83,17 @@ impl Metrics {

let tenant_fcm_updates_counter = meter
.u64_counter("tenant_fcm_updates")
.with_description("The number of times tenants have updated their APNS")
.with_description("The number of times tenants have updated their FCM")
.init();

let tenant_suspensions_counter = meter
.u64_counter("tenant_suspensions")
.with_description("The number of tenants that have been suspended")
.init();

let client_suspensions_counter = meter
.u64_counter("client_suspensions")
.with_description("The number of clients that have been suspended")
.init();

Ok(Metrics {
Expand All @@ -92,6 +105,8 @@ impl Metrics {
registered_tenants: tenants_counter,
tenant_apns_updates: tenant_apns_updates_counter,
tenant_fcm_updates: tenant_fcm_updates_counter,
tenant_suspensions: tenant_suspensions_counter,
client_suspensions: client_suspensions_counter,
})
}

Expand Down
33 changes: 27 additions & 6 deletions src/providers/fcm.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use {
crate::{
blob::DecryptedPayloadBlob,
error::Error,
handlers::push_message::MessagePayload,
providers::PushProvider,
},
async_trait::async_trait,
fcm::{MessageBuilder, NotificationBuilder},
fcm::{ErrorReason, FcmError, FcmResponse, MessageBuilder, NotificationBuilder},
std::fmt::{Debug, Formatter},
tracing::span,
};
Expand Down Expand Up @@ -36,12 +37,12 @@ impl PushProvider for FcmProvider {

let mut message_builder = MessageBuilder::new(self.api_key.as_str(), token.as_str());

if payload.is_encrypted() {
let result = if payload.is_encrypted() {
message_builder.data(&payload)?;

let fcm_message = message_builder.finalize();

let _ = self.client.send(fcm_message).await?;
self.client.send(fcm_message).await
} else {
let blob = DecryptedPayloadBlob::from_base64_encoded(payload.clone().blob)?;

Expand All @@ -55,10 +56,30 @@ impl PushProvider for FcmProvider {

let fcm_message = message_builder.finalize();

let _ = self.client.send(fcm_message).await?;
self.client.send(fcm_message).await
};

match result {
Ok(val) => {
let FcmResponse { error, .. } = val;
if let Some(error) = error {
match error {
ErrorReason::MissingRegistration
| ErrorReason::InvalidRegistration
| ErrorReason::NotRegistered => Err(Error::BadDeviceToken),
ErrorReason::InvalidApnsCredential => Err(Error::BadApnsCredentials),
e => Err(Error::FcmResponse(e)),
}
} else {
// Note: No Errors in the response, this request was good
Ok(())
}
}
Err(e) => match e {
FcmError::Unauthorized => Err(Error::BadFcmApiKey),
e => Err(Error::Fcm(e)),
},
}

Ok(())
}
}

Expand Down
42 changes: 42 additions & 0 deletions src/stores/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub struct Tenant {
pub apns_key_id: Option<String>,
pub apns_team_id: Option<String>,

// Suspension
pub suspended: bool,
pub suspended_reason: Option<String>,

pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
Expand Down Expand Up @@ -261,6 +265,8 @@ pub trait TenantStore {
id: &str,
params: TenantApnsUpdateAuth,
) -> Result<Tenant>;
async fn suspend_tenant(&self, id: &str, reason: &str) -> Result<()>;
async fn unsuspend_tenant(&self, id: &str) -> Result<()>;
}

#[async_trait]
Expand Down Expand Up @@ -373,6 +379,32 @@ impl TenantStore for PgPool {

Ok(res)
}

async fn suspend_tenant(&self, id: &str, reason: &str) -> Result<()> {
let mut query_builder = sqlx::QueryBuilder::new(
"UPDATE public.tenants SET suspended = true, suspended_reason =",
);
query_builder.push_bind(reason);
query_builder.push(" WHERE id = ");
query_builder.push_bind(id);
let query = query_builder.build();

self.execute(query).await?;

Ok(())
}

async fn unsuspend_tenant(&self, id: &str) -> Result<()> {
let mut query_builder = sqlx::QueryBuilder::new(
"UPDATE public.tenants SET suspended = false, suspended_reason = null WHERE id = ",
);
query_builder.push_bind(id);
let query = query_builder.build();

self.execute(query).await?;

Ok(())
}
}

#[cfg(not(feature = "multitenant"))]
Expand All @@ -391,6 +423,8 @@ impl DefaultTenantStore {
apns_pkcs8_pem: config.apns_pkcs8_pem.clone(),
apns_key_id: config.apns_key_id.clone(),
apns_team_id: config.apns_team_id.clone(),
suspended: false,
suspended_reason: None,
created_at: Default::default(),
updated_at: Default::default(),
}))
Expand Down Expand Up @@ -435,4 +469,12 @@ impl TenantStore for DefaultTenantStore {
) -> Result<Tenant> {
panic!("Shouldn't have run in single tenant mode")
}

async fn suspend_tenant(&self, _id: &str, _reason: &str) -> Result<()> {
panic!("Shouldn't have run in single tenant mode")
}

async fn unsuspend_tenant(&self, _id: &str) -> Result<()> {
panic!("Shouldn't have run in single tenant mode")
}
}

0 comments on commit a8e1aa7

Please sign in to comment.