Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions core/src/main/java/org/apache/datafusion/CacheManagerOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import java.time.Duration;

import org.apache.datafusion.protobuf.CacheManagerOptionsProto;
import org.apache.datafusion.protobuf.ListFilesCacheOptionsProto;

/**
* Configuration for DataFusion's built-in {@code CacheManager}. Pass an instance to {@link
* SessionContextBuilder#cacheManager(CacheManagerOptions)} to turn on any of the three
* upstream-provided caches at session construction time.
*
* <p>The three caches are independent; calling one setter does not affect the others. A setter that
* is never called leaves the upstream default in place — so existing callers that don't touch this
* builder see no behavioral change.
*
* <ul>
* <li>{@link Builder#fileMetadataCache(long)} — caches file-embedded metadata (e.g. parquet
* footers / page metadata). Backed by {@code DefaultFilesMetadataCache} with the supplied
* byte cap.
* <li>{@link Builder#listFilesCache(long, Duration)} — caches results of object-store {@code
* list} operations. Backed by {@code DefaultListFilesCache} with the supplied byte cap and
* optional TTL ({@code null} = infinite).
* <li>{@link Builder#fileStatisticsCache(boolean)} — caches per-file row counts and column
* statistics. Backed by {@code DefaultFileStatisticsCache}; no bytewise cap exists upstream.
* </ul>
*
* <p>This v1 surface configures the <em>built-in</em> cache implementations. Plugging custom Java
* cache implementations through a JNI upcall path is intentionally out of scope — every cache
* lookup is a hot path during scans, and routing them through Java would defeat the cache.
*/
public final class CacheManagerOptions {

private final Long fileMetadataCacheMaxBytes;
private final ListFilesCacheConfig listFilesCache;
private final Boolean fileStatisticsCacheEnabled;

private CacheManagerOptions(Builder b) {
this.fileMetadataCacheMaxBytes = b.fileMetadataCacheMaxBytes;
this.listFilesCache = b.listFilesCache;
this.fileStatisticsCacheEnabled = b.fileStatisticsCacheEnabled;
}

/** Begin building a {@link CacheManagerOptions} instance. */
public static Builder builder() {
return new Builder();
}

CacheManagerOptionsProto toProto() {
CacheManagerOptionsProto.Builder b = CacheManagerOptionsProto.newBuilder();
if (fileMetadataCacheMaxBytes != null) {
b.setFileMetadataCacheMaxBytes(fileMetadataCacheMaxBytes);
}
if (listFilesCache != null) {
ListFilesCacheOptionsProto.Builder lb = ListFilesCacheOptionsProto.newBuilder();
if (listFilesCache.maxBytes != null) {
lb.setMaxBytes(listFilesCache.maxBytes);
}
if (listFilesCache.ttlMillis != null) {
lb.setTtlMillis(listFilesCache.ttlMillis);
}
b.setListFilesCache(lb.build());
}
if (fileStatisticsCacheEnabled != null) {
b.setFileStatisticsCacheEnabled(fileStatisticsCacheEnabled);
}
return b.build();
}

/** Internal carrier — `null`-vs-set is the source of truth for "user called this setter". */
private static final class ListFilesCacheConfig {
final Long maxBytes;
final Long ttlMillis;

ListFilesCacheConfig(Long maxBytes, Long ttlMillis) {
this.maxBytes = maxBytes;
this.ttlMillis = ttlMillis;
}
}

/** Builder for {@link CacheManagerOptions}. */
public static final class Builder {
private Long fileMetadataCacheMaxBytes;
private ListFilesCacheConfig listFilesCache;
private Boolean fileStatisticsCacheEnabled;

private Builder() {}

/**
* Enable the file-embedded metadata cache (parquet footers, page metadata) with the given byte
* cap. The cap is the budget the upstream {@code DefaultFilesMetadataCache} uses to evict
* entries; {@code 0} is legal and means "construct the cache but with a 0-byte budget"
* (effectively disabled but observable in stats).
*
* @throws IllegalArgumentException if {@code maxBytes} is negative.
*/
public Builder fileMetadataCache(long maxBytes) {
if (maxBytes < 0) {
throw new IllegalArgumentException(
"fileMetadataCache maxBytes must be non-negative, got " + maxBytes);
}
this.fileMetadataCacheMaxBytes = maxBytes;
return this;
}

/**
* Enable the list-files cache with the given byte cap and TTL. Pass {@code null} for {@code
* ttl} to use upstream's "no expiration" semantics (entries are evicted only by capacity
* pressure).
*
* @throws IllegalArgumentException if {@code maxBytes} is negative or {@code ttl} is negative.
*/
public Builder listFilesCache(long maxBytes, Duration ttl) {
if (maxBytes < 0) {
throw new IllegalArgumentException(
"listFilesCache maxBytes must be non-negative, got " + maxBytes);
}
Long ttlMillis = null;
if (ttl != null) {
if (ttl.isNegative()) {
throw new IllegalArgumentException("listFilesCache ttl must be non-negative, got " + ttl);
}
ttlMillis = ttl.toMillis();
}
this.listFilesCache = new ListFilesCacheConfig(maxBytes, ttlMillis);
return this;
}

/**
* Enable the file-statistics cache (per-file row counts / column statistics). When {@code
* enabled} is {@code true}, the upstream {@code DefaultFileStatisticsCache} is installed. When
* {@code false}, the slot is explicitly set to disabled in the wire format — the same end-state
* as never calling this setter, but distinguishable for testing.
*/
public Builder fileStatisticsCache(boolean enabled) {
this.fileStatisticsCacheEnabled = enabled;
return this;
}

public CacheManagerOptions build() {
return new CacheManagerOptions(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class SessionContextBuilder {
private Long memoryLimitBytes;
private Double memoryLimitFraction;
private String tempDirectory;
private CacheManagerOptions cacheManager;
private final LinkedHashMap<String, String> options = new LinkedHashMap<>();
private final List<ObjectStoreOptions> objectStores = new ArrayList<>();

Expand Down Expand Up @@ -171,6 +172,25 @@ public SessionContextBuilder setOptions(Map<String, String> entries) {
return this;
}

/**
* Configure DataFusion's built-in {@code CacheManager} for the new context. Build the {@link
* CacheManagerOptions} via {@link CacheManagerOptions#builder()}; each cache slot is independent,
* so leaving a setter unset keeps the upstream default in place.
*
* <p>Calling this setter twice replaces the previous configuration — there is no incremental
* merge between calls. If you need a different cache configuration, build a new {@code
* CacheManagerOptions} from scratch.
*
* @throws IllegalArgumentException if {@code options} is {@code null}.
*/
public SessionContextBuilder cacheManager(CacheManagerOptions options) {
if (options == null) {
throw new IllegalArgumentException("cacheManager options must be non-null");
}
this.cacheManager = options;
return this;
}

/**
* Register an {@code object_store::ObjectStore} backend on the new context's {@code RuntimeEnv}.
* Build {@link ObjectStoreOptions} via the per-backend factories ({@link ObjectStoreOptions#s3},
Expand Down Expand Up @@ -229,6 +249,9 @@ byte[] toBytes() {
if (tempDirectory != null) {
b.setTempDirectory(tempDirectory);
}
if (cacheManager != null) {
b.setCacheManager(cacheManager.toProto());
}
for (Map.Entry<String, String> e : options.entrySet()) {
b.addOptions(ConfigOption.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
}
Expand Down
177 changes: 177 additions & 0 deletions core/src/test/java/org/apache/datafusion/CacheManagerOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;

import org.apache.datafusion.protobuf.CacheManagerOptionsProto;
import org.apache.datafusion.protobuf.SessionOptions;
import org.junit.jupiter.api.Test;

class CacheManagerOptionsTest {

@Test
void allThreeCachesRoundTripThroughProto() throws Exception {
byte[] bytes =
SessionContext.builder()
.cacheManager(
CacheManagerOptions.builder()
.fileMetadataCache(64L << 20)
.listFilesCache(8L << 20, Duration.ofMinutes(5))
.fileStatisticsCache(true)
.build())
.toBytes();

SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertTrue(parsed.hasCacheManager());
CacheManagerOptionsProto cm = parsed.getCacheManager();

assertTrue(cm.hasFileMetadataCacheMaxBytes());
assertEquals(64L << 20, cm.getFileMetadataCacheMaxBytes());

assertTrue(cm.hasListFilesCache());
assertTrue(cm.getListFilesCache().hasMaxBytes());
assertEquals(8L << 20, cm.getListFilesCache().getMaxBytes());
assertTrue(cm.getListFilesCache().hasTtlMillis());
assertEquals(Duration.ofMinutes(5).toMillis(), cm.getListFilesCache().getTtlMillis());

assertTrue(cm.hasFileStatisticsCacheEnabled());
assertTrue(cm.getFileStatisticsCacheEnabled());
}

@Test
void unsetSettersAreAbsentInProto() throws Exception {
byte[] bytes =
SessionContext.builder()
.cacheManager(CacheManagerOptions.builder().fileMetadataCache(64L << 20).build())
.toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
CacheManagerOptionsProto cm = parsed.getCacheManager();

assertTrue(cm.hasFileMetadataCacheMaxBytes());
// The other two travel as unset, not as zero/empty -- the Rust side
// distinguishes "leave upstream default in place" from "explicitly off".
assertFalse(cm.hasListFilesCache());
assertFalse(cm.hasFileStatisticsCacheEnabled());
}

@Test
void listFilesCacheNullTtlIsUnsetOnTheWire() throws Exception {
byte[] bytes =
SessionContext.builder()
.cacheManager(CacheManagerOptions.builder().listFilesCache(8L << 20, null).build())
.toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
CacheManagerOptionsProto cm = parsed.getCacheManager();
assertTrue(cm.hasListFilesCache());
assertTrue(cm.getListFilesCache().hasMaxBytes());
assertEquals(8L << 20, cm.getListFilesCache().getMaxBytes());
// ttl_millis is the channel for "infinite" (None on the Rust side) --
// it has to travel as unset, not as 0.
assertFalse(cm.getListFilesCache().hasTtlMillis());
}

@Test
void fileMetadataCacheZeroIsAccepted() throws Exception {
// Upstream allows 0 (the cache is constructed with a 0-byte budget).
// Java mirrors that contract.
byte[] bytes =
SessionContext.builder()
.cacheManager(CacheManagerOptions.builder().fileMetadataCache(0).build())
.toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
CacheManagerOptionsProto cm = parsed.getCacheManager();
assertTrue(cm.hasFileMetadataCacheMaxBytes());
assertEquals(0L, cm.getFileMetadataCacheMaxBytes());
}

@Test
void fileStatisticsCacheFalseTravelsExplicitlyOnTheWire() throws Exception {
// `fileStatisticsCache(false)` is a different end-state from "never
// called this setter" only on the wire -- the Rust side treats both as
// "leave the slot None" today, but distinguishing the two on the wire
// lets us assert the bool actually round-tripped.
byte[] bytes =
SessionContext.builder()
.cacheManager(CacheManagerOptions.builder().fileStatisticsCache(false).build())
.toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
CacheManagerOptionsProto cm = parsed.getCacheManager();
assertTrue(cm.hasFileStatisticsCacheEnabled());
assertFalse(cm.getFileStatisticsCacheEnabled());
}

@Test
void unsetCacheManagerIsAbsentInProto() throws Exception {
// Sanity check that builders that never call .cacheManager(...) emit no
// cache_manager field at all -- the existing-callers-see-no-change
// contract relies on this.
byte[] bytes = SessionContext.builder().batchSize(8192).toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertFalse(parsed.hasCacheManager());
}

@Test
void rejectsNegativeFileMetadataCacheMaxBytes() {
CacheManagerOptions.Builder b = CacheManagerOptions.builder();
assertThrows(IllegalArgumentException.class, () -> b.fileMetadataCache(-1));
}

@Test
void rejectsNegativeListFilesCacheMaxBytes() {
CacheManagerOptions.Builder b = CacheManagerOptions.builder();
assertThrows(IllegalArgumentException.class, () -> b.listFilesCache(-1, null));
}

@Test
void rejectsNegativeListFilesCacheTtl() {
CacheManagerOptions.Builder b = CacheManagerOptions.builder();
assertThrows(IllegalArgumentException.class, () -> b.listFilesCache(0, Duration.ofMillis(-1)));
}

@Test
void cacheManagerRejectsNull() {
SessionContextBuilder b = SessionContext.builder();
assertThrows(IllegalArgumentException.class, () -> b.cacheManager(null));
}

@Test
void cacheManagerSetterReplacesPreviousValue() throws Exception {
// Setter replaces; doesn't merge. Calling fileMetadataCache(64) then
// listFilesCache(8) on a fresh builder must NOT carry the file-metadata
// setting forward.
byte[] bytes =
SessionContext.builder()
.cacheManager(CacheManagerOptions.builder().fileMetadataCache(64L << 20).build())
.cacheManager(CacheManagerOptions.builder().listFilesCache(8L << 20, null).build())
.toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
CacheManagerOptionsProto cm = parsed.getCacheManager();
// Only list-files survives -- the second .cacheManager(...) call replaced
// the first.
assertFalse(cm.hasFileMetadataCacheMaxBytes());
assertTrue(cm.hasListFilesCache());
}
}
Loading