Skip to content

Commit

Permalink
fix: Refactor slack rate limit retrying
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian May committed Aug 4, 2023
1 parent 6121794 commit b958843
Showing 1 changed file with 75 additions and 77 deletions.
152 changes: 75 additions & 77 deletions src/output/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::Task;
use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use futures::Future;
use slack_morphism::errors::SlackClientError;
use slack_morphism::prelude::*;
use std::collections::HashMap;
Expand Down Expand Up @@ -200,6 +201,33 @@ struct SlackState {
ts: Option<SlackTs>,
}

async fn retry_slack<Fut, F, R>(f: F) -> Option<R>
where
F: Fn() -> Fut + Send + Sync,
Fut: Future<Output = Result<R, SlackClientError>> + Send,
R: Send,
{
let mut count = 0u32;
loop {
count += 1;
match f().await {
Err(SlackClientError::RateLimitError(rle)) => {
let retry_time = Instant::now() + rle.retry_after.unwrap_or(DEFAULT_RETRY);
sleep_until(retry_time).await;
if count >= MAX_TRIES {
println!("Too many retries posting finished to slack: {rle}");
break None;
};
}
Err(err) => {
println!("Slack error posting finished: {err}");
break None;
}
Ok(result) => break Some(result),
}
}
}

impl SlackState {
fn new() -> Result<SlackState> {
let slack_channel: String = config_env_var("SLACK_CHANNEL")?;
Expand Down Expand Up @@ -232,19 +260,7 @@ impl SlackState {

let client = SlackClient::new(SlackClientHyperConnector::new());
let session = client.open_session(&self.token);

let title = slack_title(state);

let mut installation_blocks = get_installation_blocks(state, &title);
let mut outdated_blocks = get_outdated_blocks(state);

let mut blocks = vec![];
blocks.append(&mut installation_blocks);
blocks.append(&mut outdated_blocks);

let content = SlackMessageContent::new()
.with_blocks(blocks)
.with_text(title);
let content = get_update_content(state);

if let Some(ts) = &self.ts {
let update_req = SlackApiChatUpdateRequest::new(
Expand All @@ -264,6 +280,32 @@ impl SlackState {
Ok(())
}

async fn update_slack_final(&mut self, state: &State) {
let client = SlackClient::new(SlackClientHyperConnector::new());
let session = client.open_session(&self.token);
let content = get_update_content(state);

if let Some(ts) = &self.ts {
let update_req = SlackApiChatUpdateRequest::new(
self.slack_channel.clone().into(),
content,
ts.clone(),
);
let do_post = || async { session.chat_update(&update_req).await };
if let Some(update_response) = retry_slack(do_post).await {
self.ts = Some(update_response.ts);
}
} else {
let post_chat_req =
SlackApiChatPostMessageRequest::new(self.slack_channel.clone().into(), content);

let do_post = || async { session.chat_post_message(&post_chat_req).await };
if let Some(post_chat_response) = retry_slack(do_post).await {
self.ts = Some(post_chat_response.ts);
}
}
}

async fn send_finished(&self, state: &State) {
#[allow(clippy::match_same_arms)]
let data = state
Expand Down Expand Up @@ -325,27 +367,8 @@ impl SlackState {
content,
);

let mut count = 0u32;
loop {
count += 1;
match session.chat_post_message(&post_chat_req).await {
Err(SlackClientError::RateLimitError(err)) => {
let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY);
sleep_until(retry_time).await;
if count >= MAX_TRIES {
println!("Too many retries posting finished to slack: {err}");
break;
};
}
Err(err) => {
println!("Slack error posting finished: {err}");
break;
}
Ok(_) => {
break;
}
}
}
let do_post = || async { session.chat_post_message(&post_chat_req).await };
retry_slack(do_post).await;
}
}

Expand Down Expand Up @@ -395,30 +418,26 @@ impl SlackState {
let post_chat_req =
SlackApiChatPostMessageRequest::new(self.slack_channel.clone().into(), content);

let mut count = 0u32;
loop {
count += 1;
match session.chat_post_message(&post_chat_req).await {
Err(SlackClientError::RateLimitError(err)) => {
let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY);
sleep_until(retry_time).await;
if count >= MAX_TRIES {
println!("Too many retries posting helm result to slack: {err}");
break;
};
}
Err(err) => {
println!("Slack error posting helm result: {err}");
break;
}
Ok(_) => {
break;
}
}
}
let do_post = || async { session.chat_post_message(&post_chat_req).await };
retry_slack(do_post).await;
}
}

fn get_update_content(state: &State) -> SlackMessageContent {
let title = slack_title(state);

let mut installation_blocks = get_installation_blocks(state, &title);
let mut outdated_blocks = get_outdated_blocks(state);

let mut blocks = vec![];
blocks.append(&mut installation_blocks);
blocks.append(&mut outdated_blocks);

SlackMessageContent::new()
.with_blocks(blocks)
.with_text(title)
}

fn get_installation_blocks(state: &State, title: &str) -> Vec<SlackBlock> {
let status = results_to_string(state);
let status = ["```".to_string(), status, "```".to_string()];
Expand Down Expand Up @@ -474,28 +493,7 @@ async fn update_results(state: &State, slack: &mut SlackState) -> Instant {
}

async fn update_final_results(state: &State, slack: &mut SlackState) {
let mut count = 0u32;
loop {
count += 1;
match slack.update_slack(state).await {
Err(SlackClientError::RateLimitError(err)) => {
let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY);
sleep_until(retry_time).await;
if count >= MAX_TRIES {
println!("Too many retries posting final result to slack: {err}");
break;
};
}
Err(err) => {
println!("Slack error posting final result: {err}");
break;
}
Ok(_) => {
break;
}
}
}

slack.update_slack_final(state).await;
slack.send_finished(state).await;
}

Expand Down

0 comments on commit b958843

Please sign in to comment.