diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 3b91d0c77d907..91af7d570caaa 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -72,6 +72,7 @@ itertools = "0.9.0" lru = "0.6.5" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" +moka = "0.8.2" tracing = "0.1.25" tracing-futures = { version = "0.2.5", features = ["tokio", "tokio-executor"] } diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 639d30146ba9a..8e4f5ca960d14 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -19,7 +19,7 @@ use std::fmt; use std::fs::File; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; @@ -52,6 +52,8 @@ use parquet::file::{ use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; +use std::fmt::Formatter; +use std::time::Duration; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; @@ -59,6 +61,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; +use moka::sync::Cache; use parquet::file::metadata::ParquetMetaData; use super::SQLMetric; @@ -135,48 +138,6 @@ pub trait ParquetMetadataCache: Debug + Sync + Send { (*metadata).clone(), )) } - - /// Returns a copy of the cache stats. - fn stats(&self) -> ParquetMetadataCacheStats; -} - -/// Stats for ParquetMetadataCache. -#[derive(Clone, Debug)] -pub struct ParquetMetadataCacheStats { - hits: u64, - misses: u64, -} - -impl ParquetMetadataCacheStats { - /// Returns a new ParquetMetadataCacheStats - pub fn new() -> Self { - ParquetMetadataCacheStats { hits: 0, misses: 0 } - } - - /// Returns the number of cache reads. - pub fn reads(&self) -> u64 { - self.hits + self.misses - } - - /// Returns the number of cache hits. - pub fn hits(&self) -> u64 { - self.hits - } - - /// Returns the numbere of cache misses. - pub fn misses(&self) -> u64 { - self.misses - } - - /// Increments the number of cache hits. - pub fn hit(&mut self) { - self.hits += 1; - } - - /// Increments the number of cache misses. - pub fn miss(&mut self) { - self.misses += 1; - } } /// Default MetadataCache, does not cache anything @@ -194,59 +155,45 @@ impl ParquetMetadataCache for NoopParquetMetadataCache { fn metadata(&self, _key: &str, file: &File) -> Result> { Ok(Arc::new(footer::parse_metadata(file)?)) } - - fn stats(&self) -> ParquetMetadataCacheStats { - ParquetMetadataCacheStats::new() - } } /// LruMetadataCache, caches parquet metadata. -#[derive(Debug)] pub struct LruParquetMetadataCache { - data: Mutex, -} - -#[derive(Debug)] -struct LruParquetMetadataCacheData { - cache: lru::LruCache>, - stats: ParquetMetadataCacheStats, + cache: Cache>, } impl LruParquetMetadataCache { /// Creates a new LruMetadataCache - pub fn new(metadata_cache_capacity: usize) -> Arc { + pub fn new(max_capacity: u64, time_to_idle: Duration) -> Arc { Arc::new(LruParquetMetadataCache { - data: Mutex::new(LruParquetMetadataCacheData { - cache: lru::LruCache::new(metadata_cache_capacity), - stats: ParquetMetadataCacheStats::new(), - }), + cache: moka::sync::Cache::builder() + .weigher(|_, value: &Arc| value.metadata_size()) + .max_capacity(max_capacity) + .time_to_idle(time_to_idle) + .build(), }) } } +impl Debug for LruParquetMetadataCache { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("LruParquetMetadataCache") + .field("cache", &"") + .finish() + } +} + impl ParquetMetadataCache for LruParquetMetadataCache { fn metadata(&self, key: &str, file: &File) -> Result> { - { - let mut data = self.data.lock().unwrap(); - let metadata = data.cache.get(&key.to_string()); - if let Some(metadata) = metadata { - let result = Ok(metadata.clone()); - data.stats.hit(); - return result; - } else { - data.stats.miss(); + let k = key.to_string(); + match self.cache.get(&k) { + Some(metadata) => Ok(metadata), + None => { + let metadata = Arc::new(footer::parse_metadata(file)?); + self.cache.insert(k, metadata.clone()); + Ok(metadata) } } - let metadata = Arc::new(footer::parse_metadata(file)?); - { - let mut data = self.data.lock().unwrap(); - data.cache.put(key.to_string(), metadata.clone()); - } - Ok(metadata) - } - - fn stats(&self) -> ParquetMetadataCacheStats { - self.data.lock().unwrap().stats.clone() } }