From 7ad5db5176c868413719492011ccfdb0f0d81d27 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Thu, 21 May 2026 02:49:50 +0000 Subject: [PATCH] feat(session): configure DataFusion's built-in CacheManager from Java --- .../datafusion/CacheManagerOptions.java | 163 ++++++++++++++++ .../datafusion/SessionContextBuilder.java | 23 +++ .../datafusion/CacheManagerOptionsTest.java | 177 ++++++++++++++++++ .../SessionContextCacheManagerTest.java | 126 +++++++++++++ native/build.rs | 1 + native/src/cache_manager.rs | 84 +++++++++ native/src/lib.rs | 7 + proto/cache_manager_options.proto | 61 ++++++ proto/session_options.proto | 5 + 9 files changed, 647 insertions(+) create mode 100644 core/src/main/java/org/apache/datafusion/CacheManagerOptions.java create mode 100644 core/src/test/java/org/apache/datafusion/CacheManagerOptionsTest.java create mode 100644 core/src/test/java/org/apache/datafusion/SessionContextCacheManagerTest.java create mode 100644 native/src/cache_manager.rs create mode 100644 proto/cache_manager_options.proto diff --git a/core/src/main/java/org/apache/datafusion/CacheManagerOptions.java b/core/src/main/java/org/apache/datafusion/CacheManagerOptions.java new file mode 100644 index 0000000..f217287 --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/CacheManagerOptions.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import java.time.Duration; + +import org.apache.datafusion.protobuf.CacheManagerOptionsProto; +import org.apache.datafusion.protobuf.ListFilesCacheOptionsProto; + +/** + * Configuration for DataFusion's built-in {@code CacheManager}. Pass an instance to {@link + * SessionContextBuilder#cacheManager(CacheManagerOptions)} to turn on any of the three + * upstream-provided caches at session construction time. + * + *

The three caches are independent; calling one setter does not affect the others. A setter that + * is never called leaves the upstream default in place — so existing callers that don't touch this + * builder see no behavioral change. + * + *

+ * + *

This v1 surface configures the built-in cache implementations. Plugging custom Java + * cache implementations through a JNI upcall path is intentionally out of scope — every cache + * lookup is a hot path during scans, and routing them through Java would defeat the cache. + */ +public final class CacheManagerOptions { + + private final Long fileMetadataCacheMaxBytes; + private final ListFilesCacheConfig listFilesCache; + private final Boolean fileStatisticsCacheEnabled; + + private CacheManagerOptions(Builder b) { + this.fileMetadataCacheMaxBytes = b.fileMetadataCacheMaxBytes; + this.listFilesCache = b.listFilesCache; + this.fileStatisticsCacheEnabled = b.fileStatisticsCacheEnabled; + } + + /** Begin building a {@link CacheManagerOptions} instance. */ + public static Builder builder() { + return new Builder(); + } + + CacheManagerOptionsProto toProto() { + CacheManagerOptionsProto.Builder b = CacheManagerOptionsProto.newBuilder(); + if (fileMetadataCacheMaxBytes != null) { + b.setFileMetadataCacheMaxBytes(fileMetadataCacheMaxBytes); + } + if (listFilesCache != null) { + ListFilesCacheOptionsProto.Builder lb = ListFilesCacheOptionsProto.newBuilder(); + if (listFilesCache.maxBytes != null) { + lb.setMaxBytes(listFilesCache.maxBytes); + } + if (listFilesCache.ttlMillis != null) { + lb.setTtlMillis(listFilesCache.ttlMillis); + } + b.setListFilesCache(lb.build()); + } + if (fileStatisticsCacheEnabled != null) { + b.setFileStatisticsCacheEnabled(fileStatisticsCacheEnabled); + } + return b.build(); + } + + /** Internal carrier — `null`-vs-set is the source of truth for "user called this setter". */ + private static final class ListFilesCacheConfig { + final Long maxBytes; + final Long ttlMillis; + + ListFilesCacheConfig(Long maxBytes, Long ttlMillis) { + this.maxBytes = maxBytes; + this.ttlMillis = ttlMillis; + } + } + + /** Builder for {@link CacheManagerOptions}. */ + public static final class Builder { + private Long fileMetadataCacheMaxBytes; + private ListFilesCacheConfig listFilesCache; + private Boolean fileStatisticsCacheEnabled; + + private Builder() {} + + /** + * Enable the file-embedded metadata cache (parquet footers, page metadata) with the given byte + * cap. The cap is the budget the upstream {@code DefaultFilesMetadataCache} uses to evict + * entries; {@code 0} is legal and means "construct the cache but with a 0-byte budget" + * (effectively disabled but observable in stats). + * + * @throws IllegalArgumentException if {@code maxBytes} is negative. + */ + public Builder fileMetadataCache(long maxBytes) { + if (maxBytes < 0) { + throw new IllegalArgumentException( + "fileMetadataCache maxBytes must be non-negative, got " + maxBytes); + } + this.fileMetadataCacheMaxBytes = maxBytes; + return this; + } + + /** + * Enable the list-files cache with the given byte cap and TTL. Pass {@code null} for {@code + * ttl} to use upstream's "no expiration" semantics (entries are evicted only by capacity + * pressure). + * + * @throws IllegalArgumentException if {@code maxBytes} is negative or {@code ttl} is negative. + */ + public Builder listFilesCache(long maxBytes, Duration ttl) { + if (maxBytes < 0) { + throw new IllegalArgumentException( + "listFilesCache maxBytes must be non-negative, got " + maxBytes); + } + Long ttlMillis = null; + if (ttl != null) { + if (ttl.isNegative()) { + throw new IllegalArgumentException("listFilesCache ttl must be non-negative, got " + ttl); + } + ttlMillis = ttl.toMillis(); + } + this.listFilesCache = new ListFilesCacheConfig(maxBytes, ttlMillis); + return this; + } + + /** + * Enable the file-statistics cache (per-file row counts / column statistics). When {@code + * enabled} is {@code true}, the upstream {@code DefaultFileStatisticsCache} is installed. When + * {@code false}, the slot is explicitly set to disabled in the wire format — the same end-state + * as never calling this setter, but distinguishable for testing. + */ + public Builder fileStatisticsCache(boolean enabled) { + this.fileStatisticsCacheEnabled = enabled; + return this; + } + + public CacheManagerOptions build() { + return new CacheManagerOptions(this); + } + } +} diff --git a/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java b/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java index f34e24f..25ec5b9 100644 --- a/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java +++ b/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java @@ -38,6 +38,7 @@ public final class SessionContextBuilder { private Long memoryLimitBytes; private Double memoryLimitFraction; private String tempDirectory; + private CacheManagerOptions cacheManager; private final LinkedHashMap options = new LinkedHashMap<>(); SessionContextBuilder() {} @@ -168,6 +169,25 @@ public SessionContextBuilder setOptions(Map entries) { return this; } + /** + * Configure DataFusion's built-in {@code CacheManager} for the new context. Build the {@link + * CacheManagerOptions} via {@link CacheManagerOptions#builder()}; each cache slot is independent, + * so leaving a setter unset keeps the upstream default in place. + * + *

Calling this setter twice replaces the previous configuration — there is no incremental + * merge between calls. If you need a different cache configuration, build a new {@code + * CacheManagerOptions} from scratch. + * + * @throws IllegalArgumentException if {@code options} is {@code null}. + */ + public SessionContextBuilder cacheManager(CacheManagerOptions options) { + if (options == null) { + throw new IllegalArgumentException("cacheManager options must be non-null"); + } + this.cacheManager = options; + return this; + } + /** * Construct a {@link SessionContext} with the configured options. * @@ -201,6 +221,9 @@ byte[] toBytes() { if (tempDirectory != null) { b.setTempDirectory(tempDirectory); } + if (cacheManager != null) { + b.setCacheManager(cacheManager.toProto()); + } for (Map.Entry e : options.entrySet()) { b.addOptions(ConfigOption.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build()); } diff --git a/core/src/test/java/org/apache/datafusion/CacheManagerOptionsTest.java b/core/src/test/java/org/apache/datafusion/CacheManagerOptionsTest.java new file mode 100644 index 0000000..66417bc --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/CacheManagerOptionsTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; + +import org.apache.datafusion.protobuf.CacheManagerOptionsProto; +import org.apache.datafusion.protobuf.SessionOptions; +import org.junit.jupiter.api.Test; + +class CacheManagerOptionsTest { + + @Test + void allThreeCachesRoundTripThroughProto() throws Exception { + byte[] bytes = + SessionContext.builder() + .cacheManager( + CacheManagerOptions.builder() + .fileMetadataCache(64L << 20) + .listFilesCache(8L << 20, Duration.ofMinutes(5)) + .fileStatisticsCache(true) + .build()) + .toBytes(); + + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertTrue(parsed.hasCacheManager()); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + + assertTrue(cm.hasFileMetadataCacheMaxBytes()); + assertEquals(64L << 20, cm.getFileMetadataCacheMaxBytes()); + + assertTrue(cm.hasListFilesCache()); + assertTrue(cm.getListFilesCache().hasMaxBytes()); + assertEquals(8L << 20, cm.getListFilesCache().getMaxBytes()); + assertTrue(cm.getListFilesCache().hasTtlMillis()); + assertEquals(Duration.ofMinutes(5).toMillis(), cm.getListFilesCache().getTtlMillis()); + + assertTrue(cm.hasFileStatisticsCacheEnabled()); + assertTrue(cm.getFileStatisticsCacheEnabled()); + } + + @Test + void unsetSettersAreAbsentInProto() throws Exception { + byte[] bytes = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileMetadataCache(64L << 20).build()) + .toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + + assertTrue(cm.hasFileMetadataCacheMaxBytes()); + // The other two travel as unset, not as zero/empty -- the Rust side + // distinguishes "leave upstream default in place" from "explicitly off". + assertFalse(cm.hasListFilesCache()); + assertFalse(cm.hasFileStatisticsCacheEnabled()); + } + + @Test + void listFilesCacheNullTtlIsUnsetOnTheWire() throws Exception { + byte[] bytes = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().listFilesCache(8L << 20, null).build()) + .toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + assertTrue(cm.hasListFilesCache()); + assertTrue(cm.getListFilesCache().hasMaxBytes()); + assertEquals(8L << 20, cm.getListFilesCache().getMaxBytes()); + // ttl_millis is the channel for "infinite" (None on the Rust side) -- + // it has to travel as unset, not as 0. + assertFalse(cm.getListFilesCache().hasTtlMillis()); + } + + @Test + void fileMetadataCacheZeroIsAccepted() throws Exception { + // Upstream allows 0 (the cache is constructed with a 0-byte budget). + // Java mirrors that contract. + byte[] bytes = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileMetadataCache(0).build()) + .toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + assertTrue(cm.hasFileMetadataCacheMaxBytes()); + assertEquals(0L, cm.getFileMetadataCacheMaxBytes()); + } + + @Test + void fileStatisticsCacheFalseTravelsExplicitlyOnTheWire() throws Exception { + // `fileStatisticsCache(false)` is a different end-state from "never + // called this setter" only on the wire -- the Rust side treats both as + // "leave the slot None" today, but distinguishing the two on the wire + // lets us assert the bool actually round-tripped. + byte[] bytes = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileStatisticsCache(false).build()) + .toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + assertTrue(cm.hasFileStatisticsCacheEnabled()); + assertFalse(cm.getFileStatisticsCacheEnabled()); + } + + @Test + void unsetCacheManagerIsAbsentInProto() throws Exception { + // Sanity check that builders that never call .cacheManager(...) emit no + // cache_manager field at all -- the existing-callers-see-no-change + // contract relies on this. + byte[] bytes = SessionContext.builder().batchSize(8192).toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertFalse(parsed.hasCacheManager()); + } + + @Test + void rejectsNegativeFileMetadataCacheMaxBytes() { + CacheManagerOptions.Builder b = CacheManagerOptions.builder(); + assertThrows(IllegalArgumentException.class, () -> b.fileMetadataCache(-1)); + } + + @Test + void rejectsNegativeListFilesCacheMaxBytes() { + CacheManagerOptions.Builder b = CacheManagerOptions.builder(); + assertThrows(IllegalArgumentException.class, () -> b.listFilesCache(-1, null)); + } + + @Test + void rejectsNegativeListFilesCacheTtl() { + CacheManagerOptions.Builder b = CacheManagerOptions.builder(); + assertThrows(IllegalArgumentException.class, () -> b.listFilesCache(0, Duration.ofMillis(-1))); + } + + @Test + void cacheManagerRejectsNull() { + SessionContextBuilder b = SessionContext.builder(); + assertThrows(IllegalArgumentException.class, () -> b.cacheManager(null)); + } + + @Test + void cacheManagerSetterReplacesPreviousValue() throws Exception { + // Setter replaces; doesn't merge. Calling fileMetadataCache(64) then + // listFilesCache(8) on a fresh builder must NOT carry the file-metadata + // setting forward. + byte[] bytes = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileMetadataCache(64L << 20).build()) + .cacheManager(CacheManagerOptions.builder().listFilesCache(8L << 20, null).build()) + .toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + CacheManagerOptionsProto cm = parsed.getCacheManager(); + // Only list-files survives -- the second .cacheManager(...) call replaced + // the first. + assertFalse(cm.hasFileMetadataCacheMaxBytes()); + assertTrue(cm.hasListFilesCache()); + } +} diff --git a/core/src/test/java/org/apache/datafusion/SessionContextCacheManagerTest.java b/core/src/test/java/org/apache/datafusion/SessionContextCacheManagerTest.java new file mode 100644 index 0000000..619e469 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/SessionContextCacheManagerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.Test; + +class SessionContextCacheManagerTest { + + @Test + void fileMetadataCacheBuildsAndContextIsUsable() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileMetadataCache(64L << 20).build()) + .build(); + DataFrame df = ctx.sql("SELECT 1"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + assertEquals(1, reader.getVectorSchemaRoot().getRowCount()); + } + } + + @Test + void listFilesCacheWithFiniteTtlBuilds() { + try (SessionContext ctx = + SessionContext.builder() + .cacheManager( + CacheManagerOptions.builder() + .listFilesCache(8L << 20, Duration.ofMinutes(5)) + .build()) + .build()) { + assertNotNull(ctx); + } + } + + @Test + void listFilesCacheWithInfiniteTtlBuilds() { + // null TTL must reach the Rust side as None (infinite). Construction + // succeeds; the only failure mode would be the JNI layer mistranslating + // null as "TTL=0" and the upstream constructor rejecting it. + try (SessionContext ctx = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().listFilesCache(8L << 20, null).build()) + .build()) { + assertNotNull(ctx); + } + } + + @Test + void fileStatisticsCacheBuilds() { + try (SessionContext ctx = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileStatisticsCache(true).build()) + .build()) { + assertNotNull(ctx); + } + } + + @Test + void allThreeCachesTogetherBuildAndContextIsUsable() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = + SessionContext.builder() + .cacheManager( + CacheManagerOptions.builder() + .fileMetadataCache(64L << 20) + .listFilesCache(8L << 20, Duration.ofMinutes(5)) + .fileStatisticsCache(true) + .build()) + .build(); + DataFrame df = ctx.sql("SELECT 1"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + assertEquals(1, reader.getVectorSchemaRoot().getRowCount()); + } + } + + @Test + void emptyCacheManagerOptionsIsHarmlessNoop() { + // builder().build() with nothing set is on the wire as "all three slots + // unset". The Rust side treats that as "skip with_cache_manager + // entirely" and the context construction succeeds. + try (SessionContext ctx = + SessionContext.builder().cacheManager(CacheManagerOptions.builder().build()).build()) { + assertNotNull(ctx); + } + } + + @Test + void fileMetadataCacheZeroIsHarmless() { + // Upstream allows a 0-byte cap (cache constructed but evicts every + // insert). Java should not reject it. + try (SessionContext ctx = + SessionContext.builder() + .cacheManager(CacheManagerOptions.builder().fileMetadataCache(0).build()) + .build()) { + assertNotNull(ctx); + } + } +} diff --git a/native/build.rs b/native/build.rs index d80f0aa..14df8f9 100644 --- a/native/build.rs +++ b/native/build.rs @@ -18,6 +18,7 @@ fn main() { const PROTOS: &[&str] = &[ "../proto/session_options.proto", + "../proto/cache_manager_options.proto", "../proto/file_compression_type.proto", "../proto/arrow_read_options.proto", "../proto/avro_read_options.proto", diff --git a/native/src/cache_manager.rs b/native/src/cache_manager.rs new file mode 100644 index 0000000..3b9e286 --- /dev/null +++ b/native/src/cache_manager.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Translate the [`CacheManagerOptionsProto`] message into a +//! [`CacheManagerConfig`] for [`RuntimeEnvBuilder::with_cache_manager`]. +//! +//! Each of the three caches is independent; an unset proto field leaves the +//! corresponding upstream default in place (no cache for list-files / stats, +//! a `DefaultFilesMetadataCache` with the default limit for file metadata). +//! When a setter *is* present, the JNI layer always installs a fresh +//! `Default*Cache` impl -- the v1 contract is "configure the built-in +//! caches", not "swap in a custom one". + +use std::sync::Arc; +use std::time::Duration; + +use datafusion::execution::cache::cache_manager::CacheManagerConfig; +use datafusion::execution::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultFilesMetadataCache, +}; +use datafusion::execution::cache::DefaultListFilesCache; + +use crate::errors::JniResult; +use crate::proto_gen::CacheManagerOptionsProto; + +/// Build a [`CacheManagerConfig`] from the proto. Returns `Ok(None)` if the +/// caller did not set any cache-manager field, so the JNI layer can skip the +/// `with_cache_manager(...)` call entirely and let upstream's own +/// `RuntimeEnvBuilder` defaults apply. +pub(crate) fn build_config( + opts: &CacheManagerOptionsProto, +) -> JniResult> { + // Treat "all three slots unset" as "caller didn't pass a cache_manager + // message at all" -- avoids overriding upstream's cache-manager-default + // with a freshly-constructed-but-empty config that happens to be + // equivalent today but might diverge on a future DataFusion release. + if opts.file_metadata_cache_max_bytes.is_none() + && opts.list_files_cache.is_none() + && opts.file_statistics_cache_enabled.is_none() + { + return Ok(None); + } + + let mut config = CacheManagerConfig::default(); + + if let Some(max_bytes) = opts.file_metadata_cache_max_bytes { + let max = max_bytes as usize; + config.file_metadata_cache = Some(Arc::new(DefaultFilesMetadataCache::new(max))); + config.metadata_cache_limit = max; + } + + if let Some(lfc) = &opts.list_files_cache { + // `max_bytes` unset → use upstream's documented default + // (currently 1 MiB; pulled from CacheManagerConfig::default() + // rather than re-declared here so we track upstream automatically). + let default_limit = CacheManagerConfig::default().list_files_cache_limit; + let max = lfc.max_bytes.map(|v| v as usize).unwrap_or(default_limit); + let ttl = lfc.ttl_millis.map(Duration::from_millis); + + config.list_files_cache = Some(Arc::new(DefaultListFilesCache::new(max, ttl))); + config.list_files_cache_limit = max; + config.list_files_cache_ttl = ttl; + } + + if opts.file_statistics_cache_enabled.unwrap_or(false) { + config.table_files_statistics_cache = Some(Arc::new(DefaultFileStatisticsCache::default())); + } + + Ok(Some(config)) +} diff --git a/native/src/lib.rs b/native/src/lib.rs index a235cd3..bfe533e 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -17,6 +17,7 @@ mod arrow; mod avro; +mod cache_manager; mod csv; mod errors; mod jni_util; @@ -127,6 +128,12 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo runtime_builder = runtime_builder.with_temp_file_path(PathBuf::from(dir)); } + if let Some(cm_opts) = opts.cache_manager.as_ref() { + if let Some(cm_config) = crate::cache_manager::build_config(cm_opts)? { + runtime_builder = runtime_builder.with_cache_manager(cm_config); + } + } + // datafusion.runtime.* keys live on RuntimeEnv (separate object from // SessionConfig) and round-tripping them through getOption/setOption // has subtle correctness pitfalls (lazy default-tempdir creation, diff --git a/proto/cache_manager_options.proto b/proto/cache_manager_options.proto new file mode 100644 index 0000000..208f0bd --- /dev/null +++ b/proto/cache_manager_options.proto @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package datafusion_java; + +option java_package = "org.apache.datafusion.protobuf"; +option java_multiple_files = true; + +// Configuration for DataFusion's built-in CacheManager (see +// `datafusion::execution::cache::cache_manager::CacheManagerConfig`). +// Each of the three caches is independent; unset fields leave the +// upstream default in place. When a typed setter is used, the JNI +// layer installs the corresponding `Default*Cache` implementation. +// +// The Java public class is `CacheManagerOptions`; the proto message +// gets a `Proto` suffix to avoid a name clash on the Java side, the +// same convention used for `ParquetReadOptionsProto` etc. +message CacheManagerOptionsProto { + // file_metadata_cache: when set, install `DefaultFilesMetadataCache` + // configured with this byte cap and raise `metadata_cache_limit` to + // match. Unset = leave the upstream default in place. Zero is a + // legal value upstream and means "construct the cache but with a + // 0-byte budget" (effectively disabled but observable in stats). + optional uint64 file_metadata_cache_max_bytes = 1; + + // list_files_cache: when set, install `DefaultListFilesCache`. The + // inner message's own fields preserve unset-ness so the caller can + // enable the cache with all-defaults by sending an empty + // ListFilesCacheOptionsProto. + optional ListFilesCacheOptionsProto list_files_cache = 2; + + // file_statistics_cache: when true, install + // `DefaultFileStatisticsCache`. When false or unset, leave the slot + // `None` (disabled). No bytewise cap because upstream's + // `DefaultFileStatisticsCache` doesn't expose one. + optional bool file_statistics_cache_enabled = 3; +} + +message ListFilesCacheOptionsProto { + // Memory cap. Unset = upstream default + // (`DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT`, currently 1 MiB). + optional uint64 max_bytes = 1; + // TTL in milliseconds. Unset = upstream's `None` (infinite TTL). + optional uint64 ttl_millis = 2; +} diff --git a/proto/session_options.proto b/proto/session_options.proto index 2c9e629..5c4cfe2 100644 --- a/proto/session_options.proto +++ b/proto/session_options.proto @@ -19,6 +19,8 @@ syntax = "proto3"; package datafusion_java; +import "cache_manager_options.proto"; + option java_package = "org.apache.datafusion.protobuf"; option java_multiple_files = true; @@ -50,6 +52,9 @@ message SessionOptions { optional MemoryLimit memory_limit = 5; optional string temp_directory = 6; repeated ConfigOption options = 7; + // Configuration for DataFusion's built-in CacheManager. Unset = leave + // the upstream default in place (matches existing callers). + optional CacheManagerOptionsProto cache_manager = 8; } message ConfigOption {