Skip to content

Commit

Permalink
ref: Box::pin all the moka init Futures
Browse files Browse the repository at this point in the history
We can extract this from #1010, and this should help with moka-rs/moka#212 even if we do not (yet) switch to more widespread moka usage.
  • Loading branch information
Swatinem committed Feb 2, 2023
1 parent 5f67bb7 commit 529e136
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 93 deletions.
34 changes: 16 additions & 18 deletions crates/symbolicator-service/src/services/download/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,23 @@ impl GcsDownloader {
/// If the cache contains a valid token, then this token is returned. Otherwise, a new token is
/// requested from GCS and stored in the cache.
async fn get_token(&self, source_key: &Arc<GcsSourceKey>) -> CacheEntry<Arc<GcsToken>> {
let init = Box::pin(async {
let token = gcs::request_new_token(&self.client, source_key).await;
metric!(counter("source.gcs.token.requests") += 1);
token.map(Arc::new).map_err(CacheError::from)
});
let replace_if = |entry: &CacheEntry<Arc<GcsToken>>| match entry {
Ok(token) => {
let is_expired = token.is_expired();
if !is_expired {
metric!(counter("source.gcs.token.cached") += 1);
}
is_expired
}
Err(_) => true,
};
self.token_cache
.get_with_if(
source_key.clone(),
async {
let token = gcs::request_new_token(&self.client, source_key).await;
metric!(counter("source.gcs.token.requests") += 1);
token.map(Arc::new).map_err(CacheError::from)
},
|entry| match entry {
Ok(token) => {
let is_expired = token.is_expired();
if !is_expired {
metric!(counter("source.gcs.token.cached") += 1);
}
is_expired
}
Err(_) => true,
},
)
.get_with_if(source_key.clone(), init, replace_if)
.await
}

Expand Down
49 changes: 24 additions & 25 deletions crates/symbolicator-service/src/services/download/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,31 @@ impl S3Downloader {
metric!(counter("source.s3.client.cached") += 1);
}

self.client_cache
.get_with_by_ref(key, async {
metric!(counter("source.s3.client.create") += 1);

tracing::debug!(
"Using AWS credentials provider: {:?}",
key.aws_credentials_provider
);
Arc::new(match key.aws_credentials_provider {
AwsCredentialsProvider::Container => {
let provider = LazyCachingCredentialsProvider::builder()
.load(aws_config::ecs::EcsCredentialsProvider::builder().build())
.build();
self.create_s3_client(provider, &key.region).await
}
AwsCredentialsProvider::Static => {
let provider = Credentials::from_keys(
key.access_key.clone(),
key.secret_key.clone(),
None,
);
self.create_s3_client(provider, &key.region).await
}
})
let init = Box::pin(async {
metric!(counter("source.s3.client.create") += 1);

tracing::debug!(
"Using AWS credentials provider: {:?}",
key.aws_credentials_provider
);
Arc::new(match key.aws_credentials_provider {
AwsCredentialsProvider::Container => {
let provider = LazyCachingCredentialsProvider::builder()
.load(aws_config::ecs::EcsCredentialsProvider::builder().build())
.build();
self.create_s3_client(provider, &key.region).await
}
AwsCredentialsProvider::Static => {
let provider = Credentials::from_keys(
key.access_key.clone(),
key.secret_key.clone(),
None,
);
self.create_s3_client(provider, &key.region).await
}
})
.await
});
self.client_cache.get_with_by_ref(key, init).await
}

async fn create_s3_client(
Expand Down
6 changes: 3 additions & 3 deletions crates/symbolicator-service/src/services/download/sentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl SentryDownloader {
/// If there are cached search results this skips the actual search.
async fn cached_sentry_search(&self, query: SearchQuery) -> CacheEntry<Vec<SearchResult>> {
let query_ = query.clone();
let init_future = async {
let init = Box::pin(async {
tracing::debug!(
"Fetching list of Sentry debug files from {}",
&query_.index_url
Expand All @@ -112,9 +112,9 @@ impl SentryDownloader {
CancelOnDrop::new(self.runtime.spawn(future.bind_hub(sentry::Hub::current())));

future.await.map_err(|_| CacheError::InternalError)?
};
});
self.index_cache
.get_with_if(query, init_future, |entry| entry.is_err())
.get_with_if(query, init, |entry| entry.is_err())
.await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,37 +173,36 @@ impl SymbolicatorSymbolProvider {
/// Fetches CFI for the given module, parses it into a `SymbolFile`, and stores it internally.
async fn load_cfi_module(&self, module: &(dyn Module + Sync)) -> FetchedCfiCache {
let key = LookupKey::new(module);
self.cficaches
.get_with_by_ref(&key, async {
let sources = self.sources.clone();
let scope = self.scope.clone();

let identifier = ObjectId {
code_id: key.code_id.clone(),
code_file: Some(module.code_file().into_owned()),
debug_id: key.debug_id,
debug_file: module
.debug_file()
.map(|debug_file| debug_file.into_owned()),
debug_checksum: None,
let load = Box::pin(async {
let sources = self.sources.clone();
let scope = self.scope.clone();

let identifier = ObjectId {
code_id: key.code_id.clone(),
code_file: Some(module.code_file().into_owned()),
debug_id: key.debug_id,
debug_file: module
.debug_file()
.map(|debug_file| debug_file.into_owned()),
debug_checksum: None,
object_type: self.object_type,
};

self.cficache_actor
.fetch(FetchCfiCache {
object_type: self.object_type,
};

self.cficache_actor
.fetch(FetchCfiCache {
object_type: self.object_type,
identifier,
sources,
scope,
})
// NOTE: this `bind_hub` is important!
// `load_cfi_module` is being called concurrently from `rust-minidump` via
// `join_all`. We do need proper isolation of any async task that might
// manipulate any Sentry scope.
.bind_hub(Hub::new_from_top(Hub::current()))
.await
})
.await
identifier,
sources,
scope,
})
// NOTE: this `bind_hub` is important!
// `load_cfi_module` is being called concurrently from `rust-minidump` via
// `join_all`. We do need proper isolation of any async task that might
// manipulate any Sentry scope.
.bind_hub(Hub::new_from_top(Hub::current()))
.await
});
self.cficaches.get_with_by_ref(&key, load).await
}
}

Expand Down
26 changes: 9 additions & 17 deletions crates/symbolicator-service/src/services/symcaches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,19 @@ mod tests {

// Create the symcache for the first time. Since the bcsymbolmap is not available, names in the
// symcache will be obfuscated.
let owned_symcache = symcache_actor
let symcache = symcache_actor
.fetch(fetch_symcache.clone())
.await
.cache
.ok()
.unwrap();

let symcache = owned_symcache.get();
let sl = symcache.lookup(0x5a75).next().unwrap();
let sl = symcache.get().lookup(0x5a75).next().unwrap();
assert_eq!(
sl.file().unwrap().full_path(),
"__hidden#41_/__hidden#41_/__hidden#42_"
);
assert_eq!(sl.function().name(), "__hidden#0_");
drop(symcache);

// Copy the plist and bcsymbolmap to the temporary symbol directory so that the SymCacheActor can find them.
fs::copy(
Expand All @@ -472,37 +471,30 @@ mod tests {
.unwrap();

// Create the symcache for the second time. Even though the bcsymbolmap is now available, its absence should
// still be cached and the SymcacheActor should make no attempt to download it. Therefore, the names should
// still be cached and the SymCacheActor should make no attempt to download it. Therefore, the names should
// be obfuscated like before.
let owned_symcache = symcache_actor
let symcache = symcache_actor
.fetch(fetch_symcache.clone())
.await
.cache
.ok()
.unwrap();

let symcache = owned_symcache.get();
let sl = symcache.lookup(0x5a75).next().unwrap();
let sl = symcache.get().lookup(0x5a75).next().unwrap();
assert_eq!(
sl.file().unwrap().full_path(),
"__hidden#41_/__hidden#41_/__hidden#42_"
);
assert_eq!(sl.function().name(), "__hidden#0_");
drop(symcache);

// Sleep long enough for the negative cache entry to become invalid.
std::thread::sleep(TIMEOUT);

// Create the symcache for the third time. This time, the bcsymbolmap is downloaded and the names in the
// symcache are unobfuscated.
let owned_symcache = symcache_actor
.fetch(fetch_symcache.clone())
.await
.cache
.ok()
.unwrap();
let symcache = symcache_actor.fetch(fetch_symcache).await.cache.unwrap();

let symcache = owned_symcache.get();
let sl = symcache.lookup(0x5a75).next().unwrap();
let sl = symcache.get().lookup(0x5a75).next().unwrap();
assert_eq!(
sl.file().unwrap().full_path(),
"/Users/philipphofmann/git-repos/sentry-cocoa/Sources/Sentry/SentryMessage.m"
Expand Down

0 comments on commit 529e136

Please sign in to comment.