Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import org.apache.datafusion.protobuf.ConfigOption;
import org.apache.datafusion.protobuf.DiskManagerOptions;
import org.apache.datafusion.protobuf.MemoryLimit;
import org.apache.datafusion.protobuf.SessionOptions;

Expand All @@ -40,6 +41,8 @@ public final class SessionContextBuilder {
private Long memoryLimitBytes;
private Double memoryLimitFraction;
private String tempDirectory;
private boolean spillDisabled;
private Long maxTempDirectorySize;
private final LinkedHashMap<String, String> options = new LinkedHashMap<>();
private final List<ObjectStoreOptions> objectStores = new ArrayList<>();

Expand Down Expand Up @@ -93,6 +96,39 @@ public SessionContextBuilder tempDirectory(String path) {
return this;
}

/**
* Disable on-disk spill entirely. Queries that need spill fail with a {@link RuntimeException}
* carrying DataFusion's "resources exhausted" message rather than going to disk; useful for
* memory-only execution profiles or environments without writable disk.
*
* <p>Mutually exclusive with {@link #tempDirectory(String)} — the combination throws at {@link
* #build()} time. {@link #maxTempDirectorySize(long)} is allowed alongside this setter but is a
* no-op (no directory to cap).
*/
public SessionContextBuilder disableSpill() {
this.spillDisabled = true;
return this;
}

/**
* Cap the cumulative bytes used by spill files under {@link #tempDirectory(String)}. Mirrors
* upstream {@code RuntimeEnvBuilder::with_max_temp_directory_size} 1:1. Once exceeded, queries
* that need more spill space fail with a {@link RuntimeException} carrying DataFusion's
* "resources exhausted" message. Combinable with {@link #disableSpill()} but a no-op there.
*
* <p>{@code 0} is accepted — upstream documents zero as legal and equivalent to "no spill
* allowed". Negative values are rejected.
*
* @throws IllegalArgumentException if {@code bytes} is negative.
*/
public SessionContextBuilder maxTempDirectorySize(long bytes) {
if (bytes < 0) {
throw new IllegalArgumentException("maxTempDirectorySize must be non-negative, got " + bytes);
}
this.maxTempDirectorySize = bytes;
return this;
}

/**
* Set an arbitrary {@code datafusion.*} config option by string key. Mirrors DataFusion's {@code
* ConfigOptions::set(key, value)} API — see the DataFusion configuration reference for the full
Expand Down Expand Up @@ -199,9 +235,15 @@ public SessionContextBuilder registerObjectStore(ObjectStoreOptions options) {
/**
* Construct a {@link SessionContext} with the configured options.
*
* @throws IllegalStateException if {@link #disableSpill()} was called alongside {@link
* #tempDirectory(String)} — the combination is contradictory.
* @throws RuntimeException if the native side fails to construct the context.
*/
public SessionContext build() {
if (spillDisabled && tempDirectory != null) {
throw new IllegalStateException(
"disableSpill() is mutually exclusive with tempDirectory(...)");
}
return new SessionContext(toBytes());
}

Expand Down Expand Up @@ -229,6 +271,18 @@ byte[] toBytes() {
if (tempDirectory != null) {
b.setTempDirectory(tempDirectory);
}
DiskManagerOptions.Builder dm = null;
if (spillDisabled) {
dm = DiskManagerOptions.newBuilder().setDisabled(true);
}
if (maxTempDirectorySize != null) {
dm =
(dm != null ? dm : DiskManagerOptions.newBuilder())
.setMaxTempDirectorySize(maxTempDirectorySize);
}
if (dm != null) {
b.setDiskManager(dm.build());
}
for (Map.Entry<String, String> e : options.entrySet()) {
b.addOptions(ConfigOption.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,94 @@ void getOptionRejectsRuntimeKeys() {
"expected runtime-key error, got: " + thrown.getMessage());
}
}

// -------------------------------------------------------------------------
// disk_manager surface (disable, size cap)
// -------------------------------------------------------------------------

@Test
void disableSpillRoundTripsThroughProto() throws Exception {
byte[] bytes = SessionContext.builder().disableSpill().toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertTrue(parsed.hasDiskManager());
assertTrue(parsed.getDiskManager().getDisabled());
}

@Test
void disableSpillProducesUsableContext() throws Exception {
// SELECT 1 doesn't spill, so disabling spill must not break it.
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = SessionContext.builder().disableSpill().build();
DataFrame df = ctx.sql("SELECT 1");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
}
}

@Test
void disableSpillAndTempDirectoryConflictThrowsAtBuild() {
SessionContextBuilder b = SessionContext.builder().disableSpill().tempDirectory("/tmp/x");
assertThrows(IllegalStateException.class, b::build);
}

@Test
void maxTempDirectorySizeRoundTripsThroughProto() throws Exception {
byte[] bytes = SessionContext.builder().maxTempDirectorySize(20L << 30).toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertTrue(parsed.hasDiskManager());
assertTrue(parsed.getDiskManager().hasMaxTempDirectorySize());
assertEquals(20L << 30, parsed.getDiskManager().getMaxTempDirectorySize());
}

@Test
void maxTempDirectorySizeRejectsNegative() {
SessionContextBuilder b = SessionContext.builder();
assertThrows(IllegalArgumentException.class, () -> b.maxTempDirectorySize(-1));
}

@Test
void maxTempDirectorySizeZeroBuildsCleanly() throws Exception {
// Upstream allows 0 -- "no spill allowed" -- so the Java setter mirrors
// that. Sanity-check that the context constructs and SELECT 1 (which
// doesn't spill) still works.
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = SessionContext.builder().maxTempDirectorySize(0).build();
DataFrame df = ctx.sql("SELECT 1");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
}
}

@Test
void disableSpillCombinesWithMaxTempDirectorySize() throws Exception {
// The cap is a no-op when spill is disabled (no directory to cap), but
// setting both must not throw -- callers may have code that always
// configures the cap and conditionally disables spill.
byte[] bytes = SessionContext.builder().disableSpill().maxTempDirectorySize(1L << 30).toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertTrue(parsed.getDiskManager().getDisabled());
assertTrue(parsed.getDiskManager().hasMaxTempDirectorySize());
}

@Test
void diskManagerFieldIsAbsentWhenNothingSet() throws Exception {
// Existing-callers-see-no-change contract: a builder that doesn't touch
// any of the new disk_manager setters produces no disk_manager field on
// the wire.
byte[] bytes = SessionContext.builder().batchSize(8192).toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertFalse(parsed.hasDiskManager());
}

@Test
void tempDirectoryStaysOnLegacyField() throws Exception {
// tempDirectory(String) writes the existing SessionOptions.temp_directory
// field, not disk_manager -- bytes identical to pre-PR behaviour for
// builders that touch only that setter.
byte[] bytes = SessionContext.builder().tempDirectory("/tmp/df").toBytes();
SessionOptions parsed = SessionOptions.parseFrom(bytes);
assertTrue(parsed.hasTempDirectory());
assertEquals("/tmp/df", parsed.getTempDirectory());
assertFalse(parsed.hasDiskManager());
}
}
17 changes: 17 additions & 0 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrame;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::{ScalarUDF, Signature};
Expand Down Expand Up @@ -128,6 +129,22 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo
if let Some(dir) = opts.temp_directory {
runtime_builder = runtime_builder.with_temp_file_path(PathBuf::from(dir));
}
// disk_manager carries the disable / size-cap surface added on top of
// the legacy temp_directory field. Java-side builder enforces that
// disabled and tempDirectory aren't both set; the Rust layer doesn't
// re-validate because there's no path that produces a contradictory
// mode here -- with_disk_manager_builder(Disabled) wholesale
// replaces any prior with_temp_file_path call, and that's the
// semantics callers using the typed Java setters can already see.
if let Some(dm) = opts.disk_manager.as_ref() {
if dm.disabled() {
let builder = DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled);
runtime_builder = runtime_builder.with_disk_manager_builder(builder);
}
if let Some(size) = dm.max_temp_directory_size {
runtime_builder = runtime_builder.with_max_temp_directory_size(size);
}
}

// datafusion.runtime.* keys live on RuntimeEnv (separate object from
// SessionConfig) and round-tripping them through getOption/setOption
Expand Down
19 changes: 19 additions & 0 deletions proto/session_options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ message SessionOptions {
// later one wins (matching upstream's `RuntimeEnv::register_object_store`
// semantics).
repeated ObjectStoreRegistration object_stores = 8;
// Disk-manager configuration (disable spill, size cap). Unset fields
// leave the upstream `RuntimeEnvBuilder` default in place. Slot 9 is
// reserved for the pending CacheManagerOptions field; merge order with
// that PR determines whether disk_manager stays at 10 or compacts to 9.
optional DiskManagerOptions disk_manager = 10;
}

// Disk-manager configuration. The two fields are independent on the wire;
// mutual-exclusion of `disabled` against the legacy
// `SessionOptions.temp_directory` field (slot 6) is enforced at the Java
// builder level.
message DiskManagerOptions {
// Disable spill entirely (`DiskManagerMode::Disabled`).
optional bool disabled = 1;
// Cap on bytes used by spill files. Mirrors upstream
// `RuntimeEnvBuilder::with_max_temp_directory_size` 1:1. Zero is a
// legal value upstream (means "no spill allowed"); the Java setter
// mirrors that contract.
optional uint64 max_temp_directory_size = 2;
}

message ConfigOption {
Expand Down