Skip to content

Commit

Permalink
uv-client: make the cache heal itself
Browse files Browse the repository at this point in the history
This change detects payload deserialization errors, emits a (non-user)
warning and sends a fresh request to get new data.

This self-healing existed before some of my zero-copy deserialization
work landed, but my changes ended up losing half of it. The half that
still remains is if the cache policy itself fails to deserialize (since
it is done separately from the payload). This commit now also does it if
the payload fails to deserialize.

Unfortunately, we do clone the request here pessimistically in case we
need to resend it. Indeed, we might end up sending two requests: a
re-validation request and then another fresh request if we discover our
cached data is bad after the revalidation request.

We could try to deserialize the cached data before (possibly) sending
the initial request. But this does extra work (checking the validity of
the cached copy) even if we don't use it. It's hard to know exactly
which approach would be faster, but I went this route because the
request is typically very small.
  • Loading branch information
BurntSushi committed Feb 19, 2024
1 parent 4491969 commit 49ce66d
Showing 1 changed file with 41 additions and 2 deletions.
43 changes: 41 additions & 2 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl CachedClient {
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let cached_response = match Self::read_cache(cache_entry).await {
Some(cached) => self.send_cached(req, cache_control, cached).boxed().await?,
None => {
Expand All @@ -239,7 +240,17 @@ impl CachedClient {
}
};
match cached_response {
CachedResponse::FreshCache(cached) => Ok(Payload::from_aligned_bytes(cached.data)?),
CachedResponse::FreshCache(cached) => match Payload::from_aligned_bytes(cached.data) {
Ok(payload) => Ok(payload),
Err(err) => {
warn!(
"Broken fresh cache entry (for payload) at {}, removing: {err}",
cache_entry.path().display()
);
self.resend_and_heal_cache(fresh_req, cache_entry, response_callback)
.await
}
},
CachedResponse::NotModified { cached, new_policy } => {
let refresh_cache =
info_span!("refresh_cache", file = %cache_entry.path().display());
Expand All @@ -249,7 +260,19 @@ impl CachedClient {
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(Payload::from_aligned_bytes(cached.data)?)
match Payload::from_aligned_bytes(cached.data) {
Ok(payload) => Ok(payload),
Err(err) => {
warn!(
"Broken fresh cache entry after revalidation \
(for payload) at {}, removing: {err}",
cache_entry.path().display()
);
return self
.resend_and_heal_cache(fresh_req, cache_entry, response_callback)
.await;
}
}
}
.instrument(refresh_cache)
.await
Expand All @@ -264,6 +287,22 @@ impl CachedClient {
}
}

async fn resend_and_heal_cache<Payload: Cacheable, CallBackError, Callback, CallbackReturn>(
&self,
req: Request,
cache_entry: &CacheEntry,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
let (response, cache_policy) = self.fresh_request(req).await?;
self.run_response_callback(cache_entry, cache_policy, response, response_callback)
.await
}

async fn run_response_callback<Payload: Cacheable, CallBackError, Callback, CallbackReturn>(
&self,
cache_entry: &CacheEntry,
Expand Down

0 comments on commit 49ce66d

Please sign in to comment.