You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem or challenge?
FileMetadataCache (introduced in #16971, default-enabled per the #17000 epic) does not coordinate between concurrent callers that miss on the same Path. The fetch_metadata path in datafusion/datasource-parquet/src/metadata.rs is:
cache.get(path) — return if hit and is_valid_for
on miss: drive ParquetMetaDataPushDecoder (one or more S3 range reads)
cache.put(path, entry)
If N concurrent queries against the same file arrive while the cache is cold for that key, all N independently do step 2 — N S3 metadata round-trips for what should be one. The first to finish puts, the rest finish their own loads and overwrite. Net effect on production hot-pod workloads (many concurrent queries against the same parquet file right after a rolling deploy, or any cold-cache window) is a thundering herd of duplicate S3 metadata fetches.
The current DefaultFilesMetadataCache is Mutex<HashMap + LRU> with sync get/put (file_metadata_cache.rs); there is no in-flight tracking, and the CacheAccessor trait signature (sync get/put) cannot express a load-through semantic.
Describe the solution you'd like
Add a default-implemented async method on FileMetadataCache (and analogous on FileStatisticsCache) that lets implementors provide singleflight / load-through:
pubtraitFileMetadataCache:CacheAccessor<Path,CachedFileMetadataEntry>{// ... existing methods unchanged .../// Load-through with coalescing semantics: if multiple concurrent callers/// miss the same key, an implementor may run `load` only once and share/// the result with the other waiters.////// The default implementation does NOT coalesce — it preserves current/// behavior so existing implementors do not need to change. Implementors/// backed by `moka::future::Cache::try_get_with`, a `dashmap` of/// `tokio::sync::broadcast` senders, etc. should override this.asyncfnget_or_try_load<F,Fut>(&self,key:&Path,current_meta:&ObjectMeta,load:F,) -> Result<CachedFileMetadataEntry>whereF:FnOnce() -> Fut + Send,Fut:Future<Output = Result<CachedFileMetadataEntry>> + Send,{ifletSome(cached) = self.get(key){if cached.is_valid_for(current_meta){returnOk(cached);}}let entry = load().await?;self.put(key, entry.clone());Ok(entry)}}
Then fetch_metadata in datafusion/datasource-parquet/src/metadata.rs switches from the explicit get / is_valid_for / put dance to a single get_or_try_load call. Behavior unchanged for the default impl; deployments with a custom cache (e.g. moka-backed) get singleflight automatically.
Describe alternatives I've considered
Implementor-side workaround. Wrap DefaultFilesMetadataCache in a custom struct that adds its own pending-load map. Possible but means every deployment that cares re-implements the same dedup logic.
Sync-only with block_on. Trying to express singleflight on a sync trait by block_on-ing inside get/put blocks a tokio worker thread per cache miss, which is a soundness hazard on small runtimes.
Wait for Decoupling Cache and Eviction Strategies #18405 (Decoupling Cache and Eviction Strategies). That issue addresses storage / eviction pluggability, not the load-through gap. The two are complementary; this proposal is a smaller and more focused addition.
Additional context
I'm working on adopting this cache in a deployment that today uses a moka-backed metadata cache with try_get_with (singleflight). For our workload (many concurrent queries against the same hot parquet file from a small pool of pods), losing the dedup is a real regression — a single cold-cache file becomes N concurrent S3 metadata reads for N concurrent queries.
Happy to send the PR if the API shape looks reasonable. Tagging @nuno-faria and @alamb since they're closest to this code path.
Is your feature request related to a problem or challenge?
FileMetadataCache(introduced in #16971, default-enabled per the #17000 epic) does not coordinate between concurrent callers that miss on the samePath. Thefetch_metadatapath indatafusion/datasource-parquet/src/metadata.rsis:cache.get(path)— return if hit andis_valid_forParquetMetaDataPushDecoder(one or more S3 range reads)cache.put(path, entry)If N concurrent queries against the same file arrive while the cache is cold for that key, all N independently do step 2 — N S3 metadata round-trips for what should be one. The first to finish
puts, the rest finish their own loads and overwrite. Net effect on production hot-pod workloads (many concurrent queries against the same parquet file right after a rolling deploy, or any cold-cache window) is a thundering herd of duplicate S3 metadata fetches.The current
DefaultFilesMetadataCacheisMutex<HashMap + LRU>with syncget/put(file_metadata_cache.rs); there is no in-flight tracking, and theCacheAccessortrait signature (sync get/put) cannot express a load-through semantic.Describe the solution you'd like
Add a default-implemented async method on
FileMetadataCache(and analogous onFileStatisticsCache) that lets implementors provide singleflight / load-through:Then
fetch_metadataindatafusion/datasource-parquet/src/metadata.rsswitches from the explicitget/is_valid_for/putdance to a singleget_or_try_loadcall. Behavior unchanged for the default impl; deployments with a custom cache (e.g. moka-backed) get singleflight automatically.Describe alternatives I've considered
DefaultFilesMetadataCachein a custom struct that adds its own pending-load map. Possible but means every deployment that cares re-implements the same dedup logic.block_on. Trying to express singleflight on a sync trait byblock_on-ing insideget/putblocks a tokio worker thread per cache miss, which is a soundness hazard on small runtimes.Additional context
I'm working on adopting this cache in a deployment that today uses a moka-backed metadata cache with
try_get_with(singleflight). For our workload (many concurrent queries against the same hot parquet file from a small pool of pods), losing the dedup is a real regression — a single cold-cache file becomes N concurrent S3 metadata reads for N concurrent queries.Happy to send the PR if the API shape looks reasonable. Tagging @nuno-faria and @alamb since they're closest to this code path.