diff --git a/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java b/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java index 10a5735..b60d980 100644 --- a/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java +++ b/core/src/main/java/org/apache/datafusion/SessionContextBuilder.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.datafusion.protobuf.ConfigOption; +import org.apache.datafusion.protobuf.DiskManagerOptions; import org.apache.datafusion.protobuf.MemoryLimit; import org.apache.datafusion.protobuf.SessionOptions; @@ -40,6 +41,8 @@ public final class SessionContextBuilder { private Long memoryLimitBytes; private Double memoryLimitFraction; private String tempDirectory; + private boolean spillDisabled; + private Long maxTempDirectorySize; private final LinkedHashMap options = new LinkedHashMap<>(); private final List objectStores = new ArrayList<>(); @@ -93,6 +96,39 @@ public SessionContextBuilder tempDirectory(String path) { return this; } + /** + * Disable on-disk spill entirely. Queries that need spill fail with a {@link RuntimeException} + * carrying DataFusion's "resources exhausted" message rather than going to disk; useful for + * memory-only execution profiles or environments without writable disk. + * + *

Mutually exclusive with {@link #tempDirectory(String)} — the combination throws at {@link + * #build()} time. {@link #maxTempDirectorySize(long)} is allowed alongside this setter but is a + * no-op (no directory to cap). + */ + public SessionContextBuilder disableSpill() { + this.spillDisabled = true; + return this; + } + + /** + * Cap the cumulative bytes used by spill files under {@link #tempDirectory(String)}. Mirrors + * upstream {@code RuntimeEnvBuilder::with_max_temp_directory_size} 1:1. Once exceeded, queries + * that need more spill space fail with a {@link RuntimeException} carrying DataFusion's + * "resources exhausted" message. Combinable with {@link #disableSpill()} but a no-op there. + * + *

{@code 0} is accepted — upstream documents zero as legal and equivalent to "no spill + * allowed". Negative values are rejected. + * + * @throws IllegalArgumentException if {@code bytes} is negative. + */ + public SessionContextBuilder maxTempDirectorySize(long bytes) { + if (bytes < 0) { + throw new IllegalArgumentException("maxTempDirectorySize must be non-negative, got " + bytes); + } + this.maxTempDirectorySize = bytes; + return this; + } + /** * Set an arbitrary {@code datafusion.*} config option by string key. Mirrors DataFusion's {@code * ConfigOptions::set(key, value)} API — see the DataFusion configuration reference for the full @@ -199,9 +235,15 @@ public SessionContextBuilder registerObjectStore(ObjectStoreOptions options) { /** * Construct a {@link SessionContext} with the configured options. * + * @throws IllegalStateException if {@link #disableSpill()} was called alongside {@link + * #tempDirectory(String)} — the combination is contradictory. * @throws RuntimeException if the native side fails to construct the context. */ public SessionContext build() { + if (spillDisabled && tempDirectory != null) { + throw new IllegalStateException( + "disableSpill() is mutually exclusive with tempDirectory(...)"); + } return new SessionContext(toBytes()); } @@ -229,6 +271,18 @@ byte[] toBytes() { if (tempDirectory != null) { b.setTempDirectory(tempDirectory); } + DiskManagerOptions.Builder dm = null; + if (spillDisabled) { + dm = DiskManagerOptions.newBuilder().setDisabled(true); + } + if (maxTempDirectorySize != null) { + dm = + (dm != null ? dm : DiskManagerOptions.newBuilder()) + .setMaxTempDirectorySize(maxTempDirectorySize); + } + if (dm != null) { + b.setDiskManager(dm.build()); + } for (Map.Entry e : options.entrySet()) { b.addOptions(ConfigOption.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build()); } diff --git a/core/src/test/java/org/apache/datafusion/SessionContextBuilderTest.java b/core/src/test/java/org/apache/datafusion/SessionContextBuilderTest.java index 27693b3..f828e97 100644 --- a/core/src/test/java/org/apache/datafusion/SessionContextBuilderTest.java +++ b/core/src/test/java/org/apache/datafusion/SessionContextBuilderTest.java @@ -347,4 +347,94 @@ void getOptionRejectsRuntimeKeys() { "expected runtime-key error, got: " + thrown.getMessage()); } } + + // ------------------------------------------------------------------------- + // disk_manager surface (disable, size cap) + // ------------------------------------------------------------------------- + + @Test + void disableSpillRoundTripsThroughProto() throws Exception { + byte[] bytes = SessionContext.builder().disableSpill().toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertTrue(parsed.hasDiskManager()); + assertTrue(parsed.getDiskManager().getDisabled()); + } + + @Test + void disableSpillProducesUsableContext() throws Exception { + // SELECT 1 doesn't spill, so disabling spill must not break it. + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = SessionContext.builder().disableSpill().build(); + DataFrame df = ctx.sql("SELECT 1"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + } + } + + @Test + void disableSpillAndTempDirectoryConflictThrowsAtBuild() { + SessionContextBuilder b = SessionContext.builder().disableSpill().tempDirectory("/tmp/x"); + assertThrows(IllegalStateException.class, b::build); + } + + @Test + void maxTempDirectorySizeRoundTripsThroughProto() throws Exception { + byte[] bytes = SessionContext.builder().maxTempDirectorySize(20L << 30).toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertTrue(parsed.hasDiskManager()); + assertTrue(parsed.getDiskManager().hasMaxTempDirectorySize()); + assertEquals(20L << 30, parsed.getDiskManager().getMaxTempDirectorySize()); + } + + @Test + void maxTempDirectorySizeRejectsNegative() { + SessionContextBuilder b = SessionContext.builder(); + assertThrows(IllegalArgumentException.class, () -> b.maxTempDirectorySize(-1)); + } + + @Test + void maxTempDirectorySizeZeroBuildsCleanly() throws Exception { + // Upstream allows 0 -- "no spill allowed" -- so the Java setter mirrors + // that. Sanity-check that the context constructs and SELECT 1 (which + // doesn't spill) still works. + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = SessionContext.builder().maxTempDirectorySize(0).build(); + DataFrame df = ctx.sql("SELECT 1"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + } + } + + @Test + void disableSpillCombinesWithMaxTempDirectorySize() throws Exception { + // The cap is a no-op when spill is disabled (no directory to cap), but + // setting both must not throw -- callers may have code that always + // configures the cap and conditionally disables spill. + byte[] bytes = SessionContext.builder().disableSpill().maxTempDirectorySize(1L << 30).toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertTrue(parsed.getDiskManager().getDisabled()); + assertTrue(parsed.getDiskManager().hasMaxTempDirectorySize()); + } + + @Test + void diskManagerFieldIsAbsentWhenNothingSet() throws Exception { + // Existing-callers-see-no-change contract: a builder that doesn't touch + // any of the new disk_manager setters produces no disk_manager field on + // the wire. + byte[] bytes = SessionContext.builder().batchSize(8192).toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertFalse(parsed.hasDiskManager()); + } + + @Test + void tempDirectoryStaysOnLegacyField() throws Exception { + // tempDirectory(String) writes the existing SessionOptions.temp_directory + // field, not disk_manager -- bytes identical to pre-PR behaviour for + // builders that touch only that setter. + byte[] bytes = SessionContext.builder().tempDirectory("/tmp/df").toBytes(); + SessionOptions parsed = SessionOptions.parseFrom(bytes); + assertTrue(parsed.hasTempDirectory()); + assertEquals("/tmp/df", parsed.getTempDirectory()); + assertFalse(parsed.hasDiskManager()); + } } diff --git a/native/src/lib.rs b/native/src/lib.rs index cebcb22..f684efb 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -46,6 +46,7 @@ use datafusion::config::TableParquetOptions; use datafusion::dataframe::DataFrame; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::DataFusionError; +use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::{ScalarUDF, Signature}; @@ -128,6 +129,22 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo if let Some(dir) = opts.temp_directory { runtime_builder = runtime_builder.with_temp_file_path(PathBuf::from(dir)); } + // disk_manager carries the disable / size-cap surface added on top of + // the legacy temp_directory field. Java-side builder enforces that + // disabled and tempDirectory aren't both set; the Rust layer doesn't + // re-validate because there's no path that produces a contradictory + // mode here -- with_disk_manager_builder(Disabled) wholesale + // replaces any prior with_temp_file_path call, and that's the + // semantics callers using the typed Java setters can already see. + if let Some(dm) = opts.disk_manager.as_ref() { + if dm.disabled() { + let builder = DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled); + runtime_builder = runtime_builder.with_disk_manager_builder(builder); + } + if let Some(size) = dm.max_temp_directory_size { + runtime_builder = runtime_builder.with_max_temp_directory_size(size); + } + } // datafusion.runtime.* keys live on RuntimeEnv (separate object from // SessionConfig) and round-tripping them through getOption/setOption diff --git a/proto/session_options.proto b/proto/session_options.proto index 330fe9b..3af7b73 100644 --- a/proto/session_options.proto +++ b/proto/session_options.proto @@ -57,6 +57,25 @@ message SessionOptions { // later one wins (matching upstream's `RuntimeEnv::register_object_store` // semantics). repeated ObjectStoreRegistration object_stores = 8; + // Disk-manager configuration (disable spill, size cap). Unset fields + // leave the upstream `RuntimeEnvBuilder` default in place. Slot 9 is + // reserved for the pending CacheManagerOptions field; merge order with + // that PR determines whether disk_manager stays at 10 or compacts to 9. + optional DiskManagerOptions disk_manager = 10; +} + +// Disk-manager configuration. The two fields are independent on the wire; +// mutual-exclusion of `disabled` against the legacy +// `SessionOptions.temp_directory` field (slot 6) is enforced at the Java +// builder level. +message DiskManagerOptions { + // Disable spill entirely (`DiskManagerMode::Disabled`). + optional bool disabled = 1; + // Cap on bytes used by spill files. Mirrors upstream + // `RuntimeEnvBuilder::with_max_temp_directory_size` 1:1. Zero is a + // legal value upstream (means "no spill allowed"); the Java setter + // mirrors that contract. + optional uint64 max_temp_directory_size = 2; } message ConfigOption {