From 7bca3c9840c895fc17b464874f0ec47082aecb71 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 9 Oct 2025 22:40:05 -0400 Subject: [PATCH 1/9] feat: Support periodic reload for api key --- crates/dogstatsd/src/api_key.rs | 123 +++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 18 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 06118e0..08ac0ea 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -1,7 +1,8 @@ use std::fmt::Debug; use std::sync::Arc; +use std::time::{Duration, Instant}; use std::{future::Future, pin::Pin}; -use tokio::sync::OnceCell; +use tokio::sync::RwLock; pub type ApiKeyResolverFn = Arc Pin> + Send>> + Send + Sync>; @@ -11,7 +12,11 @@ pub enum ApiKeyFactory { Static(String), Dynamic { resolver_fn: ApiKeyResolverFn, - api_key: Arc>>, + // How often to reload the api key. If None, the api key will only be loaded once. + // Reload checks only happen on reads of the api key. + reload_interval: Option, + api_key: Arc>>, + last_reload_time: Arc>>, }, } @@ -22,24 +27,70 @@ impl ApiKeyFactory { } /// Create a new `ApiKeyFactory` with a dynamic API key resolver function. - pub fn new_from_resolver(resolver_fn: ApiKeyResolverFn) -> Self { + pub fn new_from_resolver( + resolver_fn: ApiKeyResolverFn, + reload_interval: Option, + ) -> Self { Self::Dynamic { resolver_fn, - api_key: Arc::new(OnceCell::new()), + reload_interval, + api_key: Arc::new(RwLock::new(None)), + last_reload_time: Arc::new(RwLock::new(None)), } } - pub async fn get_api_key(&self) -> Option<&str> { + pub async fn get_api_key(&self) -> Option { match self { - Self::Static(api_key) => Some(api_key), + Self::Static(api_key) => Some(api_key.clone()), Self::Dynamic { resolver_fn, api_key, - } => api_key - .get_or_init(|| async { (resolver_fn)().await }) - .await - .as_ref() - .map(|s| s.as_str()), + last_reload_time, + .. + } => { + // Check if reload is needed without acquiring write lock for api_key. + // If no, return the api key directly. If yes, acquire the write lock and reload the api key. + if self.should_load_api_key().await { + let mut api_key_write = api_key.write().await; + + // Double-check: verify reload is still needed after acquiring lock + // This prevents duplicate reloads from multiple threads + if self.should_load_api_key().await { + let api_key_value = (resolver_fn)().await; + *api_key_write = api_key_value.clone(); + *last_reload_time.write().await = Some(Instant::now()); + } + } + + api_key.read().await.clone() + } + } + } + + async fn should_load_api_key(&self) -> bool { + match self { + Self::Static(_) => false, + Self::Dynamic { + reload_interval, + last_reload_time, + .. + } => { + match *last_reload_time.read().await { + // Initial load + None => true, + // Not initial load + Some(last_reload_time) => { + match *reload_interval { + // User's configuration says do not reload + None => false, + // Reload only if it has been longer than reload interval since last reload + Some(reload_interval) => { + Instant::now() > last_reload_time + reload_interval + } + } + } + } + } } } } @@ -57,15 +108,51 @@ pub mod tests { #[tokio::test] async fn test_new() { let api_key_factory = ApiKeyFactory::new("mock-api-key"); - assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key")); + assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key".to_string())); + } + + #[tokio::test] + async fn test_resolver_no_reload() { + let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver( + Arc::new(move || { + let api_key = "mock-api-key".to_string(); + Box::pin(async move { Some(api_key) }) + }), + None, + )); + assert_eq!( + api_key_factory.get_api_key().await, + Some("mock-api-key".to_string()), + ); } #[tokio::test] - async fn test_new_from_resolver() { - let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || { - let api_key = "mock-api-key".to_string(); - Box::pin(async move { Some(api_key) }) - }))); - assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key")); + async fn test_resolver_with_reload() { + let counter = Arc::new(RwLock::new(0)); + let counter_clone = counter.clone(); + + // Return different api keys on each call + let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver( + Arc::new(move || { + let counter = counter_clone.clone(); + Box::pin(async move { + let mut count = counter.write().await; + *count += 1; + Some(format!("mock-api-key-{}", *count)) + }) + }), + Some(Duration::from_millis(1)), + )); + + // First call - should return "mock-api-key-1" + let first_key = api_key_factory.get_api_key().await; + assert_eq!(first_key, Some("mock-api-key-1".to_string())); + + // Sleep for 1 millisecond to allow reload + tokio::time::sleep(Duration::from_millis(1)).await; + + // Second call - should return "mock-api-key-2" (after reload) + let second_key = api_key_factory.get_api_key().await; + assert_eq!(second_key, Some("mock-api-key-2".to_string())); } } From 16ac95b69c1228632fc813cb4c9cd74c7fa89bab Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 9 Oct 2025 23:49:49 -0400 Subject: [PATCH 2/9] fmt --- crates/dogstatsd/src/api_key.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 08ac0ea..a881397 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -108,7 +108,10 @@ pub mod tests { #[tokio::test] async fn test_new() { let api_key_factory = ApiKeyFactory::new("mock-api-key"); - assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key".to_string())); + assert_eq!( + api_key_factory.get_api_key().await, + Some("mock-api-key".to_string()) + ); } #[tokio::test] @@ -130,7 +133,7 @@ pub mod tests { async fn test_resolver_with_reload() { let counter = Arc::new(RwLock::new(0)); let counter_clone = counter.clone(); - + // Return different api keys on each call let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver( Arc::new(move || { @@ -143,14 +146,14 @@ pub mod tests { }), Some(Duration::from_millis(1)), )); - + // First call - should return "mock-api-key-1" let first_key = api_key_factory.get_api_key().await; assert_eq!(first_key, Some("mock-api-key-1".to_string())); - + // Sleep for 1 millisecond to allow reload tokio::time::sleep(Duration::from_millis(1)).await; - + // Second call - should return "mock-api-key-2" (after reload) let second_key = api_key_factory.get_api_key().await; assert_eq!(second_key, Some("mock-api-key-2".to_string())); From 4fa1585011006d7a75d45aa5ac2f89ebd68d7ec9 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 10 Oct 2025 16:52:16 -0400 Subject: [PATCH 3/9] Add Acquire/Release for load lock --- crates/dogstatsd/src/api_key.rs | 53 ++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index a881397..fdf2f64 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -1,5 +1,8 @@ use std::fmt::Debug; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use std::time::{Duration, Instant}; use std::{future::Future, pin::Pin}; use tokio::sync::RwLock; @@ -7,7 +10,12 @@ use tokio::sync::RwLock; pub type ApiKeyResolverFn = Arc Pin> + Send>> + Send + Sync>; -#[derive(Clone)] +#[derive(Default)] +pub struct ApiKeyState { + api_key: Option, + last_load_time: Option, +} + pub enum ApiKeyFactory { Static(String), Dynamic { @@ -15,8 +23,9 @@ pub enum ApiKeyFactory { // How often to reload the api key. If None, the api key will only be loaded once. // Reload checks only happen on reads of the api key. reload_interval: Option, - api_key: Arc>>, - last_reload_time: Arc>>, + api_key_state: Arc>, + // Whether the api key is currently being loaded. A lock to avoid concurrent loads. + loading_api_key: AtomicBool, }, } @@ -34,8 +43,8 @@ impl ApiKeyFactory { Self::Dynamic { resolver_fn, reload_interval, - api_key: Arc::new(RwLock::new(None)), - last_reload_time: Arc::new(RwLock::new(None)), + api_key_state: Arc::new(RwLock::new(ApiKeyState::default())), + loading_api_key: AtomicBool::new(false), } } @@ -44,25 +53,29 @@ impl ApiKeyFactory { Self::Static(api_key) => Some(api_key.clone()), Self::Dynamic { resolver_fn, - api_key, - last_reload_time, + api_key_state, + loading_api_key, .. } => { - // Check if reload is needed without acquiring write lock for api_key. - // If no, return the api key directly. If yes, acquire the write lock and reload the api key. - if self.should_load_api_key().await { - let mut api_key_write = api_key.write().await; - + if self.should_load_api_key().await + && loading_api_key + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { // Double-check: verify reload is still needed after acquiring lock // This prevents duplicate reloads from multiple threads if self.should_load_api_key().await { let api_key_value = (resolver_fn)().await; - *api_key_write = api_key_value.clone(); - *last_reload_time.write().await = Some(Instant::now()); + *api_key_state.write().await = ApiKeyState { + api_key: api_key_value.clone(), + last_load_time: Some(Instant::now()), + }; } + + loading_api_key.store(false, Ordering::Release); } - api_key.read().await.clone() + api_key_state.read().await.api_key.clone() } } } @@ -72,20 +85,20 @@ impl ApiKeyFactory { Self::Static(_) => false, Self::Dynamic { reload_interval, - last_reload_time, + api_key_state, .. } => { - match *last_reload_time.read().await { + match api_key_state.read().await.last_load_time { // Initial load None => true, // Not initial load - Some(last_reload_time) => { + Some(last_load_time) => { match *reload_interval { // User's configuration says do not reload None => false, // Reload only if it has been longer than reload interval since last reload Some(reload_interval) => { - Instant::now() > last_reload_time + reload_interval + Instant::now() > last_load_time + reload_interval } } } From e5962d78a87336f24adaaf9ba1543cb84c7eb464 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 10 Oct 2025 16:55:45 -0400 Subject: [PATCH 4/9] Change some reload words to load --- crates/dogstatsd/src/api_key.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index fdf2f64..8f00287 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -62,8 +62,8 @@ impl ApiKeyFactory { .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) .is_ok() { - // Double-check: verify reload is still needed after acquiring lock - // This prevents duplicate reloads from multiple threads + // Double-check: verify load is still needed after acquiring lock + // This prevents duplicate loads from multiple threads if self.should_load_api_key().await { let api_key_value = (resolver_fn)().await; *api_key_state.write().await = ApiKeyState { @@ -96,7 +96,7 @@ impl ApiKeyFactory { match *reload_interval { // User's configuration says do not reload None => false, - // Reload only if it has been longer than reload interval since last reload + // Reload only if it has been longer than reload interval since last load Some(reload_interval) => { Instant::now() > last_load_time + reload_interval } From 34e75543eab509227d2c573d7b3922ee697b8f0b Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 10 Oct 2025 17:18:49 -0400 Subject: [PATCH 5/9] Handle the case of waiting for another thread for initial load --- crates/dogstatsd/src/api_key.rs | 39 +++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 8f00287..71b48ea 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -57,22 +57,33 @@ impl ApiKeyFactory { loading_api_key, .. } => { - if self.should_load_api_key().await - && loading_api_key + if self.should_load_api_key().await { + // Try to acquire the loading lock. + if (loading_api_key .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // Double-check: verify load is still needed after acquiring lock - // This prevents duplicate loads from multiple threads - if self.should_load_api_key().await { - let api_key_value = (resolver_fn)().await; - *api_key_state.write().await = ApiKeyState { - api_key: api_key_value.clone(), - last_load_time: Some(Instant::now()), - }; - } + .is_ok()) + { + // Acquired the loading lock. + // Double-check: verify load is still needed after acquiring lock + // This prevents duplicate loads from multiple threads + if self.should_load_api_key().await { + let api_key_value = (resolver_fn)().await; + *api_key_state.write().await = ApiKeyState { + api_key: api_key_value.clone(), + last_load_time: Some(Instant::now()), + }; + } - loading_api_key.store(false, Ordering::Release); + loading_api_key.store(false, Ordering::Release); + } else { + // Failed to acquire the loading lock, which means another thread is doing the load. + // If there is an old api key, break out and return it. + // (We assume the old api key will still be valid for a while.) + // If there is no old api key, wait for another thread to complete the initial load. + while api_key_state.read().await.last_load_time.is_none() { + tokio::task::yield_now().await; + } + } } api_key_state.read().await.api_key.clone() From ca6e0f51ddf7a49b23bc1dcf797bce47d1b361cb Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 10 Oct 2025 17:33:06 -0400 Subject: [PATCH 6/9] Add comment --- crates/dogstatsd/src/api_key.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 71b48ea..6f7d71c 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -80,6 +80,8 @@ impl ApiKeyFactory { // If there is an old api key, break out and return it. // (We assume the old api key will still be valid for a while.) // If there is no old api key, wait for another thread to complete the initial load. + // We check last_load_time instead of api_key because if we check api_key and + // the resolver function returns None, this thread would wait forever. while api_key_state.read().await.last_load_time.is_none() { tokio::task::yield_now().await; } From 2cc14333c24b07c04f6cd6fa27a5bb9d5e12b607 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 10 Oct 2025 21:10:37 -0400 Subject: [PATCH 7/9] fmt --- crates/dogstatsd/src/api_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 6f7d71c..523683a 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -80,7 +80,7 @@ impl ApiKeyFactory { // If there is an old api key, break out and return it. // (We assume the old api key will still be valid for a while.) // If there is no old api key, wait for another thread to complete the initial load. - // We check last_load_time instead of api_key because if we check api_key and + // We check last_load_time instead of api_key because if we check api_key and // the resolver function returns None, this thread would wait forever. while api_key_state.read().await.last_load_time.is_none() { tokio::task::yield_now().await; From 6ae512a5f663f380d1a830b62b471f937a341297 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 14 Oct 2025 10:21:10 -0400 Subject: [PATCH 8/9] Log reload interval when creating ApiKeyFactory --- crates/dogstatsd/src/api_key.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 523683a..286fe0c 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -6,6 +6,7 @@ use std::sync::{ use std::time::{Duration, Instant}; use std::{future::Future, pin::Pin}; use tokio::sync::RwLock; +use tracing::debug; pub type ApiKeyResolverFn = Arc Pin> + Send>> + Send + Sync>; @@ -40,6 +41,9 @@ impl ApiKeyFactory { resolver_fn: ApiKeyResolverFn, reload_interval: Option, ) -> Self { + if let Some(reload_interval) = reload_interval { + debug!("Creating ApiKeyFactory with reload interval: {:?}", reload_interval); + } Self::Dynamic { resolver_fn, reload_interval, From bb7aa47ca80f27e06ae48f5867918dd9e300b8fc Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 14 Oct 2025 11:28:33 -0400 Subject: [PATCH 9/9] fmt --- crates/dogstatsd/src/api_key.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 286fe0c..f724b42 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -42,7 +42,10 @@ impl ApiKeyFactory { reload_interval: Option, ) -> Self { if let Some(reload_interval) = reload_interval { - debug!("Creating ApiKeyFactory with reload interval: {:?}", reload_interval); + debug!( + "Creating ApiKeyFactory with reload interval: {:?}", + reload_interval + ); } Self::Dynamic { resolver_fn,