From c6b88bc66c20c0c66bde11c195f2c0937f46d380 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Sat, 8 Oct 2022 13:59:57 +0200 Subject: [PATCH] Distributed tracing (spans) support, scope propagation and `SlackClient.run_in_session` implementation (#151) * Auxiliary SlackClient.run_in_session implementation with docs and examples * Docs updates * Span/context propagation --- Cargo.toml | 7 +- docs/src/SUMMARY.md | 1 + docs/src/observability-tracing.md | 32 ++++++++ docs/src/web-api.md | 24 ++++++ examples/client.rs | 24 +++++- examples/client_with_tracing.rs | 45 +++++++++++ src/api/webhook.rs | 16 ++-- src/client.rs | 120 +++++++++++++++++------------- src/hyper_tokio/connector.rs | 113 ++++++++++++++++------------ 9 files changed, 274 insertions(+), 108 deletions(-) create mode 100644 docs/src/observability-tracing.md create mode 100644 examples/client_with_tracing.rs diff --git a/Cargo.toml b/Cargo.toml index a5e3320a..f5551374 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "slack-morphism" -version = "1.2.2" +version = "1.3.0" authors = ["Abdulla Abdurakhmanov "] edition = "2021" license = "Apache-2.0" @@ -72,6 +72,11 @@ name = "client" path = "examples/client.rs" required-features = ["hyper"] +[[example]] +name = "client_with_tracing" +path = "examples/client_with_tracing.rs" +required-features = ["hyper"] + [[example]] name = "events_api_server" path = "examples/events_api_server.rs" diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index e07a7858..db6ad2d5 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -8,6 +8,7 @@ - [Send webhook messages](./send-webhooks-messages.md) - [Hyper connection types and proxy support](./hyper-connections-types.md) - [Rate control and retries](./rate-control-and-retries.md) + - [Observability and tracing](./observability-tracing.md) - [Events API](./events-api.md) - [Hyper-based](./events-api-hyper.md) - [Axum-based](./events-api-axum.md) diff --git a/docs/src/observability-tracing.md b/docs/src/observability-tracing.md new file mode 100644 index 00000000..405c7628 --- /dev/null +++ b/docs/src/observability-tracing.md @@ -0,0 +1,32 @@ +# Observability and tracing + +The library uses popular `tracing` crate for logs and distributed traces (spans). +To improve observability for your specific cases, additionally to the fields provided by library, you can inject your own trace fields: + +```rust,noplaypen +use slack_morphism::prelude::*; +use tracing::*; + +// While Team ID is optional but still useful for tracing and rate control purposes +let token: SlackApiToken = + SlackApiToken::new(token_value).with_team_id(config_env_var("SLACK_TEST_TEAM_ID")?.into()); + +// Sessions are lightweight and basically just a reference to client and token +let my_custom_span = span!(Level::DEBUG, "My scope", my_scope_attr = "my-scope-value"); +debug!("Testing tracing abilities"); + +client + .run_in_session(&token, |session| async move { + let test: SlackApiTestResponse = session + .api_test(&SlackApiTestRequest::new().with_foo("Test".into())) + .await?; + println!("{:#?}", test); + + let auth_test = session.auth_test().await?; + println!("{:#?}", auth_test); + + Ok(()) + }) + .instrument(my_custom_span.or_current()) + .await +``` diff --git a/docs/src/web-api.md b/docs/src/web-api.md index cbe856f7..e55f5de6 100644 --- a/docs/src/web-api.md +++ b/docs/src/web-api.md @@ -52,3 +52,27 @@ async fn example() -> Result<(), Box> { Ok(()) } ``` + +Note that `session` is just an auxiliary lightweight structure that stores references to the token and the client +to make easier to have series of calls for the same token. +It doesn't make any network calls. There is no need to store it. + +Another option is to use `session` is to use function `run_in_session`: + +```rust,noplaypen + // Sessions are lightweight and basically just a reference to client and token + client + .run_in_session(&token, |session| async move { + let test: SlackApiTestResponse = session + .api_test(&SlackApiTestRequest::new().with_foo("Test".into())) + .await?; + + println!("{:#?}", test); + + let auth_test = session.auth_test().await?; + println!("{:#?}", auth_test); + + Ok(()) + }) + .await?; +``` diff --git a/examples/client.rs b/examples/client.rs index 2950d1d5..8f9211b9 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -9,7 +9,7 @@ use url::Url; use futures::stream::BoxStream; use futures::TryStreamExt; -async fn test_client() -> Result<(), Box> { +async fn test_simple_api_calls() -> Result<(), Box> { let client = SlackClient::new(SlackClientHyperConnector::new()); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value); @@ -27,6 +27,15 @@ async fn test_client() -> Result<(), Box> { let auth_test = session.auth_test().await?; println!("{:#?}", auth_test); + Ok(()) +} + +async fn test_post_message() -> Result<(), Box> { + let client = SlackClient::new(SlackClientHyperConnector::new()); + let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); + let token: SlackApiToken = SlackApiToken::new(token_value); + let session = client.open_session(&token); + let message = WelcomeMessageTemplateParams::new("".into()); let post_chat_req = @@ -35,6 +44,15 @@ async fn test_client() -> Result<(), Box> { let post_chat_resp = session.chat_post_message(&post_chat_req).await?; println!("post chat resp: {:#?}", &post_chat_resp); + Ok(()) +} + +async fn test_scrolling_user_list() -> Result<(), Box> { + let client = SlackClient::new(SlackClientHyperConnector::new()); + let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); + let token: SlackApiToken = SlackApiToken::new(token_value); + let session = client.open_session(&token); + let scroller_req: SlackApiUsersListRequest = SlackApiUsersListRequest::new().with_limit(1); let scroller = scroller_req.scroller(); @@ -156,7 +174,9 @@ async fn main() -> Result<(), Box> { .finish(); tracing::subscriber::set_global_default(subscriber)?; - test_client().await?; + test_simple_api_calls().await?; + test_post_message().await?; + test_scrolling_user_list().await?; Ok(()) } diff --git a/examples/client_with_tracing.rs b/examples/client_with_tracing.rs new file mode 100644 index 00000000..240750c2 --- /dev/null +++ b/examples/client_with_tracing.rs @@ -0,0 +1,45 @@ +use slack_morphism::prelude::*; +use tracing::*; + +async fn test_simple_api_calls_as_predicate() -> Result<(), Box> +{ + let client = SlackClient::new(SlackClientHyperConnector::new()); + let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); + let token: SlackApiToken = + SlackApiToken::new(token_value).with_team_id(config_env_var("SLACK_TEST_TEAM_ID")?.into()); // While Team ID is optional but still useful for tracing and rate control purposes + + // Sessions are lightweight and basically just a reference to client and token + let my_custom_span = span!(Level::DEBUG, "My scope", my_scope_attr = "my-scope-value"); + debug!("Testing tracing abilities"); + + client + .run_in_session(&token, |session| async move { + let test: SlackApiTestResponse = session + .api_test(&SlackApiTestRequest::new().with_foo("Test".into())) + .await?; + println!("{:#?}", test); + + let auth_test = session.auth_test().await?; + println!("{:#?}", auth_test); + + Ok(()) + }) + .instrument(my_custom_span.or_current()) + .await +} + +pub fn config_env_var(name: &str) -> Result { + std::env::var(name).map_err(|e| format!("{}: {}", name, e)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = tracing_subscriber::fmt() + .with_env_filter("client_with_tracing=debug,slack_morphism=debug") + .finish(); + tracing::subscriber::set_global_default(subscriber)?; + + test_simple_api_calls_as_predicate().await?; + + Ok(()) +} diff --git a/src/api/webhook.rs b/src/api/webhook.rs index c12a68ae..8b198aee 100644 --- a/src/api/webhook.rs +++ b/src/api/webhook.rs @@ -13,6 +13,7 @@ use crate::ratectl::*; use crate::SlackClient; use crate::{ClientResult, SlackClientHttpConnector}; use rvstruct::ValueStruct; +use tracing::*; impl SlackClient where @@ -26,14 +27,17 @@ where incoming_webhook_url: &Url, req: &SlackApiPostWebhookMessageRequest, ) -> ClientResult { + let http_webhook_span = span!(Level::DEBUG, "Slack WebHook"); + + let context = crate::SlackClientApiCallContext { + rate_control_params: Some(&POST_WEBHOOK_SPECIAL_LIMIT_RATE_CTL), + token: None, + tracing_span: &http_webhook_span, + }; + self.http_api .connector - .http_post_uri( - incoming_webhook_url.clone(), - req, - None, - Some(&POST_WEBHOOK_SPECIAL_LIMIT_RATE_CTL), - ) + .http_post_uri(incoming_webhook_url.clone(), req, context) .await } diff --git a/src/client.rs b/src/client.rs index f8bd73c2..ffe3819e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use std::future::Future; use std::sync::Arc; use crate::token::*; @@ -8,6 +9,8 @@ use crate::models::*; use crate::ratectl::SlackApiMethodRateControlConfig; use futures_util::future::BoxFuture; use lazy_static::*; +use rvstruct::ValueStruct; +use tracing::*; use url::Url; #[derive(Debug)] @@ -41,14 +44,21 @@ where { client: &'a SlackClient, token: &'a SlackApiToken, + pub span: Span, +} + +#[derive(Debug, Clone)] +pub struct SlackClientApiCallContext<'a> { + pub rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + pub token: Option<&'a SlackApiToken>, + pub tracing_span: &'a Span, } pub trait SlackClientHttpConnector { fn http_get_uri<'a, RS>( &'a self, full_uri: Url, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + 'a + Send; @@ -62,12 +72,11 @@ pub trait SlackClientHttpConnector { where RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + 'a + Send; - fn http_get_token<'a, 'p, RS, PT, TS>( + fn http_get<'a, 'p, RS, PT, TS>( &'a self, method_relative_uri: &str, params: &'p PT, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RS: for<'de> serde::de::Deserialize<'de> + Send + 'a, @@ -79,39 +88,24 @@ pub trait SlackClientHttpConnector { params, ); - self.http_get_uri(full_uri, token, rate_control_params) - } - - fn http_get<'a, 'p, RS, PT, TS>( - &'a self, - method_relative_uri: &str, - params: &'p PT, - ) -> BoxFuture<'a, ClientResult> - where - RS: for<'de> serde::de::Deserialize<'de> + Send + 'a, - PT: std::iter::IntoIterator)> + Clone, - TS: std::string::ToString + 'p + 'a + Send, - { - self.http_get_token(method_relative_uri, params, None, None) + self.http_get_uri(full_uri, context) } fn http_post_uri<'a, RQ, RS>( &'a self, full_uri: Url, request_body: &'a RQ, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RQ: serde::ser::Serialize + Send + Sync, RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send + 'a; - fn http_post_token<'a, RQ, RS>( + fn http_post<'a, RQ, RS>( &'a self, method_relative_uri: &str, request: &'a RQ, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RQ: serde::ser::Serialize + Send + Sync, @@ -121,19 +115,7 @@ pub trait SlackClientHttpConnector { &SlackClientHttpApiUri::create_method_uri_path(method_relative_uri), ); - self.http_post_uri(full_uri, request, token, rate_control_params) - } - - fn http_post<'a, RQ, RS>( - &'a self, - method_relative_uri: &str, - request: &'a RQ, - ) -> BoxFuture<'a, ClientResult> - where - RQ: serde::ser::Serialize + Send + Sync, - RS: for<'de> serde::de::Deserialize<'de> + Send + 'a, - { - self.http_post_token(method_relative_uri, request, None, None) + self.http_post_uri(full_uri, request, context) } } @@ -147,7 +129,7 @@ pub type AnyStdResult = std::result::Result, - // apps.manifest.validate returns validation errors in `errors` field with `ok: false`. + // Slack may return validation errors in `errors` field with `ok: false` for some methods (such as `apps.manifest.validate`. pub errors: Option>, pub warnings: Option>, } @@ -211,13 +193,33 @@ where } pub fn open_session<'a>(&'a self, token: &'a SlackApiToken) -> SlackClientSession<'a, SCHC> { + let http_session_span = span!( + Level::DEBUG, + "Slack API request", + "/slack/team_id" = token + .team_id + .as_ref() + .map(|team_id| team_id.value().as_str()) + .unwrap_or_else(|| "-") + ); + let http_session_api = SlackClientHttpSessionApi { client: self, token, + span: http_session_span, }; SlackClientSession { http_session_api } } + + pub async fn run_in_session<'a, FN, F, T>(&'a self, token: &'a SlackApiToken, pred: FN) -> T + where + FN: Fn(SlackClientSession<'a, SCHC>) -> F, + F: Future, + { + let session = self.open_session(token); + pred(session).await + } } impl<'a, SCHC> SlackClientHttpSessionApi<'a, SCHC> @@ -232,10 +234,16 @@ where where RS: for<'de> serde::de::Deserialize<'de> + Send, { + let context = SlackClientApiCallContext { + rate_control_params, + token: Some(self.token), + tracing_span: &self.span, + }; + self.client .http_api .connector - .http_get_uri(full_uri, Some(self.token), rate_control_params) + .http_get_uri(full_uri, context) .await } @@ -250,15 +258,16 @@ where PT: std::iter::IntoIterator)> + Clone, TS: std::string::ToString + 'p + Send, { + let context = SlackClientApiCallContext { + rate_control_params, + token: Some(self.token), + tracing_span: &self.span, + }; + self.client .http_api .connector - .http_get_token( - method_relative_uri, - params, - Some(self.token), - rate_control_params, - ) + .http_get(method_relative_uri, params, context) .await } @@ -272,15 +281,16 @@ where RQ: serde::ser::Serialize + Send + Sync, RS: for<'de> serde::de::Deserialize<'de> + Send, { + let context = SlackClientApiCallContext { + rate_control_params, + token: Some(self.token), + tracing_span: &self.span, + }; + self.client .http_api .connector - .http_post_token( - method_relative_uri, - &request, - Some(self.token), - rate_control_params, - ) + .http_post(method_relative_uri, &request, context) .await } @@ -294,10 +304,16 @@ where RQ: serde::ser::Serialize + Send + Sync, RS: for<'de> serde::de::Deserialize<'de> + Send, { + let context = SlackClientApiCallContext { + rate_control_params, + token: Some(self.token), + tracing_span: &self.span, + }; + self.client .http_api .connector - .http_post_uri(full_uri, &request, Some(self.token), rate_control_params) + .http_post_uri(full_uri, &request, context) .await } } diff --git a/src/hyper_tokio/connector.rs b/src/hyper_tokio/connector.rs index 03afd344..57830dd6 100644 --- a/src/hyper_tokio/connector.rs +++ b/src/hyper_tokio/connector.rs @@ -11,7 +11,7 @@ use hyper_rustls::HttpsConnector; use rvstruct::ValueStruct; use crate::prelude::hyper_ext::HyperExtensions; -use crate::ratectl::{SlackApiMethodRateControlConfig, SlackApiRateControlConfig}; +use crate::ratectl::SlackApiRateControlConfig; use std::sync::Arc; use std::time::Duration; use tracing::*; @@ -61,16 +61,22 @@ impl SlackClientHyperConnec } } - async fn send_http_request(&self, request: Request) -> ClientResult + async fn send_http_request<'a, RS>( + &'a self, + request: Request, + context: SlackClientApiCallContext<'a>, + ) -> ClientResult where RS: for<'de> serde::de::Deserialize<'de>, { let uri_str = request.uri().to_string(); - debug!( - slack_uri = uri_str.as_str(), - "Sending HTTP request to {}", - request.uri() - ); + context.tracing_span.in_scope(|| { + debug!( + slack_uri = uri_str.as_str(), + "Sending HTTP request to {}", + request.uri() + ); + }); let http_res = self.hyper_connector.request(request).await?; let http_status = http_res.status(); @@ -81,12 +87,14 @@ impl SlackClientHyperConnec response_mime.type_() == mime::APPLICATION && response_mime.subtype() == mime::JSON }); - debug!( - slack_uri = uri_str.as_str(), - slack_http_status = http_status.as_u16(), - "Received HTTP response {}", - http_status - ); + context.tracing_span.in_scope(|| { + debug!( + slack_uri = uri_str.as_str(), + slack_http_status = http_status.as_u16(), + "Received HTTP response {}", + http_status + ); + }); match http_status { StatusCode::OK if http_content_is_json => { @@ -148,8 +156,7 @@ impl SlackClientHyperConnec async fn send_rate_controlled_request<'a, R, RS>( &'a self, request: R, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, delayed: Option, retried: usize, ) -> ClientResult @@ -157,38 +164,39 @@ impl SlackClientHyperConnec R: Fn() -> ClientResult> + Send + Sync, RS: for<'de> serde::de::Deserialize<'de> + Send, { - match (self.tokio_rate_controller.as_ref(), rate_control_params) { + match ( + self.tokio_rate_controller.as_ref(), + context.rate_control_params, + ) { (Some(rate_controller), maybe_method_rate_params) => { rate_controller .throttle_delay( maybe_method_rate_params, - token.and_then(|t| t.team_id.clone()), + context.token.and_then(|t| t.team_id.clone()), delayed, ) .await; self.retry_request_if_needed( rate_controller.clone(), - self.send_http_request(request()?).await, + self.send_http_request(request()?, context.clone()).await, retried, request, - token, - rate_control_params, + context, ) .await } - (None, _) => self.send_http_request(request()?).await, + (None, _) => self.send_http_request(request()?, context).await, } } - async fn retry_request_if_needed( - &self, + async fn retry_request_if_needed<'a, R, RS>( + &'a self, rate_controller: Arc, result: ClientResult, retried: usize, request: R, - token: Option<&SlackApiToken>, - rate_control_params: Option<&SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> ClientResult where R: Fn() -> ClientResult> + Send + Sync, @@ -198,17 +206,18 @@ impl SlackClientHyperConnec Err(err) => match rate_controller.config.max_retries { Some(max_retries) if max_retries > retried => match err { SlackClientError::RateLimitError(ref rate_error) => { - debug!( - "Rate limit error received: {}. Retrying: {}/{}", - rate_error, - retried + 1, - max_retries - ); + context.tracing_span.in_scope(|| { + debug!( + "Rate limit error received: {}. Retrying: {}/{}", + rate_error, + retried + 1, + max_retries + ); + }); self.send_rate_controlled_request( request, - token, - rate_control_params, + context, rate_error.retry_after, retried + 1, ) @@ -229,12 +238,12 @@ impl SlackClientHttpConnect fn http_get_uri<'a, RS>( &'a self, full_uri: Url, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send, { + let context_token = context.token; async move { let body = self .send_rate_controlled_request( @@ -244,13 +253,14 @@ impl SlackClientHttpConnect hyper::http::Method::GET, ); - let http_request = - HyperExtensions::setup_token_auth_header(base_http_request, token); + let http_request = HyperExtensions::setup_token_auth_header( + base_http_request, + context_token, + ); http_request.body(Body::empty()).map_err(|e| e.into()) }, - token, - rate_control_params, + context, None, 0, ) @@ -271,6 +281,14 @@ impl SlackClientHttpConnect RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + 'a + Send, { async move { + let http_oauth_span = span!(Level::DEBUG, "Slack OAuth Get"); + + let context = crate::SlackClientApiCallContext { + rate_control_params: None, + token: None, + tracing_span: &http_oauth_span, + }; + self.send_rate_controlled_request( || { HyperExtensions::setup_basic_auth_header( @@ -284,8 +302,7 @@ impl SlackClientHttpConnect .body(Body::empty()) .map_err(|e| e.into()) }, - None, - None, + context, None, 0, ) @@ -298,13 +315,14 @@ impl SlackClientHttpConnect &'a self, full_uri: Url, request_body: &'a RQ, - token: Option<&'a SlackApiToken>, - rate_control_params: Option<&'a SlackApiMethodRateControlConfig>, + context: SlackClientApiCallContext<'a>, ) -> BoxFuture<'a, ClientResult> where RQ: serde::ser::Serialize + Send + Sync, RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send + 'a, { + let context_token = context.token; + async move { let post_json = serde_json::to_string(&request_body).map_err(|err| map_serde_error(err, None))?; @@ -318,15 +336,16 @@ impl SlackClientHttpConnect ) .header("content-type", "application/json; charset=utf-8"); - let http_request = - HyperExtensions::setup_token_auth_header(base_http_request, token); + let http_request = HyperExtensions::setup_token_auth_header( + base_http_request, + context_token, + ); http_request .body(post_json.clone().into()) .map_err(|e| e.into()) }, - token, - rate_control_params, + context, None, 0, )