Skip to content

Commit

Permalink
feat(decrypted_notify): update message payload and pass raw message (#…
Browse files Browse the repository at this point in the history
…281)

Co-authored-by: Chris Smith <1979423+chris13524@users.noreply.github.com>
Co-authored-by: Chris Smith <chris@walletconnect.com>
  • Loading branch information
3 people committed Nov 21, 2023
1 parent 16a377a commit 3990b70
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 221 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fcm = "0.9"
ed25519-dalek = "2.0.0-rc.2"

# JWT Authentication
relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", rev = "ced99e7"}
relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", rev = "v0.23.0"}
jsonwebtoken = "8.1"
data-encoding = "2.3"

Expand Down
1 change: 1 addition & 0 deletions src/analytics/client_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ pub struct ClientInfo {
pub project_id: Arc<str>,
pub client_id: Arc<str>,
pub push_provider: Arc<str>,
pub always_raw: bool,
pub registered_at: chrono::NaiveDateTime,
}
6 changes: 4 additions & 2 deletions src/analytics/message_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ pub struct MessageInfo {
pub client_id: Arc<str>,
pub topic: Arc<str>,
pub push_provider: Arc<str>,
pub encrypted: bool,
pub flags: u32,
pub always_raw: Option<bool>,
pub tag: Option<u32>,
pub encrypted: Option<bool>,
pub flags: Option<u32>,
pub status: u16,
pub response_message: Option<Arc<str>>,
pub received_at: chrono::NaiveDateTime,
Expand Down
2 changes: 1 addition & 1 deletion src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl DecryptedPayloadBlob {
Ok(serde_json::from_str(&blob_string)?)
}

pub fn from_base64_encoded(blob_string: String) -> Result<DecryptedPayloadBlob> {
pub fn from_base64_encoded(blob_string: &str) -> Result<DecryptedPayloadBlob> {
let blob_decoded = base64::engine::general_purpose::STANDARD.decode(blob_string)?;
Ok(serde_json::from_slice(&blob_decoded)?)
}
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub enum Error {
InternalServerError,

#[error(transparent)]
JwtError(#[from] relay_rpc::auth::JwtVerificationError),
JwtError(#[from] relay_rpc::jwt::JwtError),

#[error("the provided authentication does not authenticate the request")]
InvalidAuthentication,
Expand Down
17 changes: 13 additions & 4 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use {
Json,
},
hyper::StatusCode,
relay_rpc::{auth::Jwt, domain::ClientId},
relay_rpc::{
domain::ClientId,
jwt::{JwtBasicClaims, VerifyableClaims},
},
serde_json::{json, Value},
std::{collections::HashSet, string::ToString},
tracing::info,
Expand Down Expand Up @@ -53,12 +56,18 @@ where
{
return if let Some(auth_header) = headers.get(axum::http::header::AUTHORIZATION) {
let header_str = auth_header.to_str()?;
let client_id = Jwt(header_str.to_string())
.decode(&HashSet::from([aud.to_string()]))

let claims = JwtBasicClaims::try_from_str(header_str).map_err(|e| {
info!("Invalid claims: {:?}", e);
e
})?;
claims
.verify_basic(&HashSet::from([aud.to_string()]), None)
.map_err(|e| {
info!("Invalid claims: {:?}", e);
info!("Failed to verify_basic: {:?}", e);
e
})?;
let client_id: ClientId = claims.iss.into();
Ok(check(Some(client_id)))
} else {
// Note: Authentication is not required right now to ensure that this is a
Expand Down
99 changes: 64 additions & 35 deletions src/handlers/push_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use axum_client_ip::SecureClientIp;
use {
crate::{
analytics::message_info::MessageInfo,
blob::ENCRYPTED_FLAG,
error::{
Error,
Error::{ClientNotFound, Store},
Expand All @@ -12,7 +11,7 @@ use {
increment_counter,
log::prelude::*,
middleware::validate_signature::RequireValidSignature,
providers::{Provider, PushProvider},
providers::{LegacyPushMessage, Provider, PushMessage, PushProvider, RawPushMessage},
request_id::get_req_id,
state::AppState,
stores::StoreError,
Expand All @@ -28,23 +27,14 @@ use {
tracing::instrument,
};

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct MessagePayload {
pub topic: String,
pub flags: u32,
pub blob: String,
}

impl MessagePayload {
pub fn is_encrypted(&self) -> bool {
(self.flags & ENCRYPTED_FLAG) == ENCRYPTED_FLAG
}
}

#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct PushMessageBody {
pub id: String,
pub payload: MessagePayload,
#[serde(flatten)]
pub raw: Option<RawPushMessage>,

// Legacy (deprecating) fields
#[serde(flatten)]
pub legacy: Option<LegacyPushMessage>,
}

pub async fn handler(
Expand Down Expand Up @@ -133,19 +123,13 @@ pub async fn handler(
Ok(response)
}

#[instrument(name = "push_message_internal", skip_all, fields(tenant_id = tenant_id, client_id = client_id, id = body.id))]
#[instrument(name = "push_message_internal", skip_all, fields(tenant_id = tenant_id, client_id = client_id))]
pub async fn handler_internal(
Path((tenant_id, client_id)): Path<(String, String)>,
StateExtractor(state): StateExtractor<Arc<AppState>>,
headers: HeaderMap,
RequireValidSignature(Json(body)): RequireValidSignature<Json<PushMessageBody>>,
) -> Result<(axum::response::Response, Option<MessageInfo>), (Error, Option<MessageInfo>)> {
#[cfg(feature = "analytics")]
let topic: Arc<str> = body.payload.clone().topic.into();

#[cfg(feature = "analytics")]
let (flags, encrypted) = (body.payload.clone().flags, body.payload.is_encrypted());

let client = match state.client_store.get_client(&tenant_id, &client_id).await {
Ok(c) => Ok(c),
Err(StoreError::NotFound(_, _)) => Err(ClientNotFound),
Expand All @@ -156,16 +140,32 @@ pub async fn handler_internal(
e,
#[cfg(feature = "analytics")]
Some(MessageInfo {
msg_id: body.id.clone().into(),
msg_id: body
.raw
.as_ref()
.map(|msg| relay_rpc::rpc::msg_id::get_message_id(&msg.message).into())
.unwrap_or(
body.legacy
.as_ref()
.map(|msg| msg.id.clone())
.unwrap_or("error: no message id".to_owned().into()),
),
region: None,
country: None,
continent: None,
project_id: tenant_id.clone().into(),
client_id: client_id.clone().into(),
topic: topic.clone(),
topic: body.raw.as_ref().map(|m| m.topic.clone()).unwrap_or(
body.legacy
.as_ref()
.map(|m| m.payload.topic.clone())
.unwrap_or("error: no topic".to_owned().into()),
),
push_provider: "unknown".into(),
encrypted,
flags,
always_raw: None,
tag: body.raw.as_ref().map(|m| m.tag),
encrypted: body.legacy.as_ref().map(|m| m.payload.is_encrypted()),
flags: body.legacy.as_ref().map(|m| m.payload.flags),
status: 0,
response_message: None,
received_at: wc::analytics::time::now(),
Expand All @@ -175,18 +175,47 @@ pub async fn handler_internal(
)
})?;

let cloned_body = body.clone();
let push_message = if client.always_raw {
if let Some(body) = body.raw {
PushMessage::RawPushMessage(body)
} else {
return Err((
Error::EmptyField("missing topic, tag, or message field".to_string()),
None,
));
}
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(body) = body.legacy {
PushMessage::LegacyPushMessage(body)
} else {
return Err((
Error::EmptyField("missing id or payload field".to_string()),
None,
));
}
};

let message_id = push_message.message_id();

#[cfg(feature = "analytics")]
let mut analytics = Some(MessageInfo {
msg_id: body.id.clone().into(),
msg_id: message_id.clone(),
region: None,
country: None,
continent: None,
project_id: tenant_id.clone().into(),
client_id: client_id.clone().into(),
topic,
topic: push_message.topic(),
push_provider: client.push_type.as_str().into(),
encrypted,
flags,
always_raw: Some(client.always_raw),
tag: cloned_body.raw.as_ref().map(|m| m.tag),
encrypted: cloned_body
.legacy
.as_ref()
.map(|m| m.payload.is_encrypted()),
flags: cloned_body.legacy.as_ref().map(|m| m.payload.flags),
status: 0,
response_message: None,
received_at: wc::analytics::time::now(),
Expand Down Expand Up @@ -266,7 +295,7 @@ pub async fn handler_internal(

if let Ok(notification) = state
.notification_store
.get_notification(&body.id, &client_id, &tenant_id)
.get_notification(&message_id, &client_id, &tenant_id)
.await
{
warn!(
Expand Down Expand Up @@ -294,7 +323,7 @@ pub async fn handler_internal(

let notification = state
.notification_store
.create_or_update_notification(&body.id, &tenant_id, &client_id, &body.payload)
.create_or_update_notification(&message_id, &tenant_id, &client_id, &cloned_body)
.await
.tap_err(|e| warn!("error create_or_update_notification: {e:?}"))
.map_err(|e| (Error::Store(e), analytics.clone()))?;
Expand Down Expand Up @@ -365,7 +394,7 @@ pub async fn handler_internal(
"fetched provider"
);

match provider.send_notification(client.token, body.payload).await {
match provider.send_notification(client.token, push_message).await {
Ok(()) => Ok(()),
Err(error) => {
warn!("error sending notification: {error:?}");
Expand Down
4 changes: 3 additions & 1 deletion src/handlers/register_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ pub async fn handler(
.trim_start_matches(DECENTRALIZED_IDENTIFIER_PREFIX)
.to_owned();

let always_raw = body.always_raw.unwrap_or(false);
state
.client_store
.create_client(&tenant_id, &client_id, Client {
tenant_id: tenant_id.clone(),
push_type,
token: body.token,
always_raw: body.always_raw.unwrap_or(false),
always_raw,
})
.await?;

Expand Down Expand Up @@ -130,6 +131,7 @@ pub async fn handler(
project_id: tenant_id.into(),
client_id: client_id.into(),
push_provider: body.push_type.as_str().into(),
always_raw,
registered_at: wc::analytics::time::now(),
};

Expand Down
Loading

0 comments on commit 3990b70

Please sign in to comment.