Skip to content

Commit

Permalink
re-introduce cache healing when we see an invalid cache entry (#1707)
Browse files Browse the repository at this point in the history
This PR introduces more robust cache healing when `uv` fails to
deserialize an existing cache entry.

("Cache healing" in this context means that if `uv` fails to
deserialize a cache entry, then it will automatically invalidate that
entry and re-generate the data. Typically by sending an HTTP request.)

Previous to some optimizations I made around deserialization, we were
already doing this. After those optimizations, deserializing a cache
policy and the payload were split into two steps. While deserializing
a cache policy retained its cache healing behavior, deserializing the
payload did not. This became an issue when #1556 landed, which changed
one of our `rkyv` data types. This in turn made our internal types
incompatible with existing cache entries. One could work-around this
by clearing `uv`'s cache with `uv clean`, but we should just do it
automatically on a cache entry by entry basis.

This does technically introduce a new cost by pessimistically cloning
the HTTP request so that we can re-send it if necessary (see the commit
messages for the knot pushing me toward this approach). So I re-ran my
favorite ad-hoc benchmark:

```
$ hyperfine -w10 --runs 50 "uv-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null" "uv-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null" ; A bart
Benchmark 1: uv-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
  Time (mean ± σ):     114.4 ms ±   3.2 ms    [User: 149.4 ms, System: 221.5 ms]
  Range (min … max):   106.7 ms … 122.0 ms    50 runs

Benchmark 2: uv-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
  Time (mean ± σ):     114.0 ms ±   3.0 ms    [User: 146.0 ms, System: 223.3 ms]
  Range (min … max):   105.3 ms … 121.4 ms    50 runs

Summary
  uv-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null ran
    1.00 ± 0.04 times faster than uv-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
```

Which is about what I expected.

We should endeavor to have a better testing strategy for these kinds of
bugs, but I think it might be a little tricky to do. I created
#1699 to track that.

Fixes #1571
  • Loading branch information
BurntSushi committed Feb 19, 2024
1 parent b76efc6 commit cd1f619
Showing 1 changed file with 96 additions and 33 deletions.
129 changes: 96 additions & 33 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,30 @@ impl CachedClient {
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let cached_response = match Self::read_cache(cache_entry).await {
Some(cached) => self.send_cached(req, cache_control, cached).boxed().await?,
None => {
debug!("No cache entry for: {}", req.url());
self.fresh_request(req).await?
let (response, cache_policy) = self.fresh_request(req).await?;
CachedResponse::ModifiedOrNew {
response,
cache_policy,
}
}
};
match cached_response {
CachedResponse::FreshCache(cached) => Ok(Payload::from_aligned_bytes(cached.data)?),
CachedResponse::FreshCache(cached) => match Payload::from_aligned_bytes(cached.data) {
Ok(payload) => Ok(payload),
Err(err) => {
warn!(
"Broken fresh cache entry (for payload) at {}, removing: {err}",
cache_entry.path().display()
);
self.resend_and_heal_cache(fresh_req, cache_entry, response_callback)
.await
}
},
CachedResponse::NotModified { cached, new_policy } => {
let refresh_cache =
info_span!("refresh_cache", file = %cache_entry.path().display());
Expand All @@ -245,7 +260,18 @@ impl CachedClient {
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(Payload::from_aligned_bytes(cached.data)?)
match Payload::from_aligned_bytes(cached.data) {
Ok(payload) => Ok(payload),
Err(err) => {
warn!(
"Broken fresh cache entry after revalidation \
(for payload) at {}, removing: {err}",
cache_entry.path().display()
);
self.resend_and_heal_cache(fresh_req, cache_entry, response_callback)
.await
}
}
}
.instrument(refresh_cache)
.await
Expand All @@ -254,31 +280,62 @@ impl CachedClient {
response,
cache_policy,
} => {
let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
let data = response_callback(response)
.boxed()
self.run_response_callback(cache_entry, cache_policy, response, response_callback)
.await
.map_err(|err| CachedClientError::Callback(err))?;
let Some(cache_policy) = cache_policy else {
return Ok(data.into_target());
};
async {
fs_err::tokio::create_dir_all(cache_entry.dir())
.await
.map_err(ErrorKind::CacheWrite)?;
let data_with_cache_policy_bytes =
DataWithCachePolicy::serialize(&cache_policy, &data.to_bytes()?)?;
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(data.into_target())
}
.instrument(new_cache)
.await
}
}
}

async fn resend_and_heal_cache<Payload: Cacheable, CallBackError, Callback, CallbackReturn>(
&self,
req: Request,
cache_entry: &CacheEntry,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
let (response, cache_policy) = self.fresh_request(req).await?;
self.run_response_callback(cache_entry, cache_policy, response, response_callback)
.await
}

async fn run_response_callback<Payload: Cacheable, CallBackError, Callback, CallbackReturn>(
&self,
cache_entry: &CacheEntry,
cache_policy: Option<Box<CachePolicy>>,
response: Response,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
let data = response_callback(response)
.boxed()
.await
.map_err(|err| CachedClientError::Callback(err))?;
let Some(cache_policy) = cache_policy else {
return Ok(data.into_target());
};
async {
fs_err::tokio::create_dir_all(cache_entry.dir())
.await
.map_err(ErrorKind::CacheWrite)?;
let data_with_cache_policy_bytes =
DataWithCachePolicy::serialize(&cache_policy, &data.to_bytes()?)?;
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(data.into_target())
}
.instrument(new_cache)
.await
}

async fn read_cache(cache_entry: &CacheEntry) -> Option<DataWithCachePolicy> {
let span = info_span!("read_and_parse_cache", file = %cache_entry.path().display());
match span
Expand All @@ -288,7 +345,7 @@ impl CachedClient {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
"Broken cache policy entry at {}, removing: {err}",
cache_entry.path().display()
);
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
Expand Down Expand Up @@ -339,7 +396,11 @@ impl CachedClient {
"Cached request doesn't match current request for: {}",
req.url()
);
self.fresh_request(req).await?
let (response, cache_policy) = self.fresh_request(req).await?;
CachedResponse::ModifiedOrNew {
response,
cache_policy,
}
}
})
}
Expand Down Expand Up @@ -385,7 +446,10 @@ impl CachedClient {
}

#[instrument(skip_all, fields(url = req.url().as_str()))]
async fn fresh_request(&self, req: Request) -> Result<CachedResponse, Error> {
async fn fresh_request(
&self,
req: Request,
) -> Result<(Response, Option<Box<CachePolicy>>), Error> {
trace!("Sending fresh {} request for {}", req.method(), req.url());
let cache_policy_builder = CachePolicyBuilder::new(&req);
let response = self
Expand All @@ -396,13 +460,12 @@ impl CachedClient {
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let cache_policy = cache_policy_builder.build(&response);
Ok(CachedResponse::ModifiedOrNew {
response,
cache_policy: cache_policy
.to_archived()
.is_storable()
.then(|| Box::new(cache_policy)),
})
let cache_policy = if cache_policy.to_archived().is_storable() {
Some(Box::new(cache_policy))
} else {
None
};
Ok((response, cache_policy))
}
}

Expand Down

0 comments on commit cd1f619

Please sign in to comment.