From 3247739c0643ca25d9651ff8060fc8af2b2f3e3f Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Thu, 21 May 2026 09:39:03 +0000 Subject: [PATCH] feat(metrics): add SessionContext.memoryUsage and runtimeStats --- Makefile | 9 +- .../org/apache/datafusion/MemoryUsage.java | 60 ++++++ .../org/apache/datafusion/RuntimeStats.java | 153 +++++++++++++ .../org/apache/datafusion/SessionContext.java | 71 ++++++ .../SessionContextMemoryUsageTest.java | 144 +++++++++++++ .../SessionContextRuntimeStatsTest.java | 129 +++++++++++ native/Cargo.lock | 13 ++ native/Cargo.toml | 22 +- native/src/lib.rs | 110 +++++++++- native/src/memory.rs | 188 ++++++++++++++++ native/src/runtime_metrics.rs | 203 ++++++++++++++++++ 11 files changed, 1094 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/org/apache/datafusion/MemoryUsage.java create mode 100644 core/src/main/java/org/apache/datafusion/RuntimeStats.java create mode 100644 core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java create mode 100644 core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java create mode 100644 native/src/memory.rs create mode 100644 native/src/runtime_metrics.rs diff --git a/Makefile b/Makefile index ca894d3..10e225e 100644 --- a/Makefile +++ b/Makefile @@ -15,13 +15,20 @@ # specific language governing permissions and limitations # under the License. -.PHONY: all native jvm test clean tpch-data +.PHONY: all native native-runtime-metrics jvm test clean tpch-data all: native jvm native: cd native && cargo build +# Build the native crate with the `runtime-metrics` Cargo feature enabled. +# Requires `--cfg tokio_unstable` because tokio-metrics gates its API there. +# Default `make native` does not pull this in; callers who need +# SessionContext.runtimeStats() pick this target explicitly. +native-runtime-metrics: + cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics + jvm: ./mvnw package -DskipTests diff --git a/core/src/main/java/org/apache/datafusion/MemoryUsage.java b/core/src/main/java/org/apache/datafusion/MemoryUsage.java new file mode 100644 index 0000000..e4864d7 --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/MemoryUsage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +/** + * Snapshot of session-wide memory usage from {@link SessionContext#memoryUsage()}. Returns the + * total bytes currently reserved against the session's {@code MemoryPool} and the peak observed + * since the session was created. + * + *

Multi-tenant attribution: place each tenant in its own {@link SessionContext}. Sessions in + * DataFusion are cheap; one per tenant is the standard pattern. Within a single session, the + * snapshot is the sum across all in-flight queries' operator reservations -- see {@link + * SessionContext#memoryUsage()} for the precise definition of what is and is not counted. + * + *

Instances are immutable. + */ +public final class MemoryUsage { + private final long currentBytes; + private final long peakBytes; + + public MemoryUsage(long currentBytes, long peakBytes) { + this.currentBytes = currentBytes; + this.peakBytes = peakBytes; + } + + /** Bytes currently reserved against this session's {@code MemoryPool}. */ + public long currentBytes() { + return currentBytes; + } + + /** + * Maximum value of {@link #currentBytes()} observed since the session was created. Monotonic: + * never decreases for a given session. + */ + public long peakBytes() { + return peakBytes; + } + + @Override + public String toString() { + return "MemoryUsage{currentBytes=" + currentBytes + ", peakBytes=" + peakBytes + "}"; + } +} diff --git a/core/src/main/java/org/apache/datafusion/RuntimeStats.java b/core/src/main/java/org/apache/datafusion/RuntimeStats.java new file mode 100644 index 0000000..b80795b --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/RuntimeStats.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +/** + * Snapshot of the underlying Tokio runtime from {@link SessionContext#runtimeStats()}. Mirrors the + * subset of {@code tokio_metrics::RuntimeMetrics} fields that node-level dashboards typically + * surface (worker count, busy time, queue depth, etc.). + * + *

Instances are immutable. The snapshot is process-wide rather than per-session because the JNI + * library drives a single shared multi-threaded Tokio runtime; gating on the {@code SessionContext} + * only ensures the caller still has a live handle. + * + *

Requires the {@code runtime-metrics} Cargo feature on the native crate, plus {@code + * RUSTFLAGS="--cfg tokio_unstable"} at build time. + */ +public final class RuntimeStats { + private final int numWorkers; + private final long liveTasksCount; + private final long globalQueueDepth; + private final long elapsedNanos; + private final long totalBusyNanos; + private final long totalParkCount; + private final long totalPollsCount; + private final long totalNoopCount; + private final long totalStealCount; + private final long totalLocalScheduleCount; + private final long totalOverflowCount; + + public RuntimeStats( + int numWorkers, + long liveTasksCount, + long globalQueueDepth, + long elapsedNanos, + long totalBusyNanos, + long totalParkCount, + long totalPollsCount, + long totalNoopCount, + long totalStealCount, + long totalLocalScheduleCount, + long totalOverflowCount) { + this.numWorkers = numWorkers; + this.liveTasksCount = liveTasksCount; + this.globalQueueDepth = globalQueueDepth; + this.elapsedNanos = elapsedNanos; + this.totalBusyNanos = totalBusyNanos; + this.totalParkCount = totalParkCount; + this.totalPollsCount = totalPollsCount; + this.totalNoopCount = totalNoopCount; + this.totalStealCount = totalStealCount; + this.totalLocalScheduleCount = totalLocalScheduleCount; + this.totalOverflowCount = totalOverflowCount; + } + + /** Number of OS worker threads driving the multi-threaded Tokio runtime. */ + public int numWorkers() { + return numWorkers; + } + + /** Number of tasks currently scheduled (alive) on the runtime. */ + public long liveTasksCount() { + return liveTasksCount; + } + + /** Tasks waiting in the global injection queue, not yet picked up by a worker. */ + public long globalQueueDepth() { + return globalQueueDepth; + } + + /** Wall-clock nanoseconds covered by this snapshot's interval (since runtime start). */ + public long elapsedNanos() { + return elapsedNanos; + } + + /** Total nanoseconds workers spent doing work (sum across workers). */ + public long totalBusyNanos() { + return totalBusyNanos; + } + + /** Times a worker has parked itself (gone idle waiting for work). */ + public long totalParkCount() { + return totalParkCount; + } + + /** Total task polls completed across workers. */ + public long totalPollsCount() { + return totalPollsCount; + } + + /** Times a worker unparked but found no work (false wakeup). */ + public long totalNoopCount() { + return totalNoopCount; + } + + /** Times a worker successfully stole tasks from another worker. */ + public long totalStealCount() { + return totalStealCount; + } + + /** Tasks scheduled into a worker's local queue. */ + public long totalLocalScheduleCount() { + return totalLocalScheduleCount; + } + + /** Times a worker's local queue overflowed and pushed tasks to the injector. */ + public long totalOverflowCount() { + return totalOverflowCount; + } + + @Override + public String toString() { + return "RuntimeStats{numWorkers=" + + numWorkers + + ", liveTasksCount=" + + liveTasksCount + + ", globalQueueDepth=" + + globalQueueDepth + + ", elapsedNanos=" + + elapsedNanos + + ", totalBusyNanos=" + + totalBusyNanos + + ", totalParkCount=" + + totalParkCount + + ", totalPollsCount=" + + totalPollsCount + + ", totalNoopCount=" + + totalNoopCount + + ", totalStealCount=" + + totalStealCount + + ", totalLocalScheduleCount=" + + totalLocalScheduleCount + + ", totalOverflowCount=" + + totalOverflowCount + + "}"; + } +} diff --git a/core/src/main/java/org/apache/datafusion/SessionContext.java b/core/src/main/java/org/apache/datafusion/SessionContext.java index 674341a..6486a51 100644 --- a/core/src/main/java/org/apache/datafusion/SessionContext.java +++ b/core/src/main/java/org/apache/datafusion/SessionContext.java @@ -101,6 +101,73 @@ public DataFrame fromProto(byte[] planBytes) { return new DataFrame(dfHandle); } + /** + * Snapshot the session's memory pool: bytes currently held and the peak observed since this + * session was created. Thread-safe; can be polled while queries run. + * + *

For multi-tenant attribution, place each tenant in its own {@link SessionContext}. Within a + * single session, the snapshot is the sum across all in-flight queries -- there is no + * per-DataFrame breakdown today. + * + *

The session's {@code MemoryPool} is wrapped transparently with a tracking adapter at + * construction time; the wrapper layers on top of whatever pool {@link + * SessionContextBuilder#memoryLimit(long, double)} produced (or DataFusion's default unbounded + * pool) and does not change pool semantics (limits, eviction, spilling). + * + *

What this counts: bytes reserved against the {@code MemoryPool} -- operator state for + * sorts, hash joins, aggregates, repartition buffers, and anything else that uses DataFusion's + * {@code MemoryReservation} machinery during execution. + * + *

What this does not count: memory held outside the pool, including record-batch + * buffers materialised by {@link DataFrame#cache()} (stored in an in-memory {@code MemTable} as + * plain {@code Vec} with no reservation), record-batch buffers that have crossed the + * FFI boundary into Arrow's Java allocator, and JVM-side allocations. Operator-level reservations + * are released as the plan unwinds, so a query that runs to completion typically returns {@code + * currentBytes} to ~0 even if the result set is large. + * + * @throws IllegalStateException if this context is closed. + * @throws RuntimeException if the native side has not registered a tracker for this handle + * (should not happen in practice -- tracker registration is done by the constructor). + */ + public MemoryUsage memoryUsage() { + if (nativeHandle == 0) { + throw new IllegalStateException("SessionContext is closed"); + } + long[] values = memoryUsageNative(nativeHandle); + return new MemoryUsage(values[0], values[1]); + } + + /** + * Snapshot operational counters from the underlying Tokio runtime: worker count, busy time, queue + * depth, etc. Thread-safe; can be polled while queries run. + * + *

The runtime is process-wide rather than per-session because the JNI library drives a single + * shared multi-threaded Tokio runtime. The {@link SessionContext} handle is checked only to + * ensure the caller still has a live session; the values returned are not session-specific. + * + *

Requires the {@code runtime-metrics} Cargo feature on the native crate (off by default). + * Rebuild with: + * + *

{@code
+   * RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+   * }
+ * + *

If invoked against a native binary built without the feature, this method throws {@link + * RuntimeException} with a message pointing at the rebuild command. + * + * @throws IllegalStateException if this context is closed. + * @throws RuntimeException if the native crate was built without the {@code runtime-metrics} + * feature. + */ + public RuntimeStats runtimeStats() { + if (nativeHandle == 0) { + throw new IllegalStateException("SessionContext is closed"); + } + long[] s = runtimeStatsNative(nativeHandle); + return new RuntimeStats( + (int) s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7], s[8], s[9], s[10]); + } + /** * Return the Arrow {@link Schema} of a registered table. Transferred via Arrow IPC; no {@link * org.apache.arrow.memory.BufferAllocator} is required because a schema carries no buffer data. @@ -524,6 +591,10 @@ public void close() { private static native String getOptionNative(long handle, String key); + private static native long[] memoryUsageNative(long handle); + + private static native long[] runtimeStatsNative(long handle); + private static native void registerParquetWithOptions( long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes); diff --git a/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java b/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java new file mode 100644 index 0000000..7a67ea0 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.Test; + +class SessionContextMemoryUsageTest { + + /** A query with enough rows to drive a non-trivial sort, so peak memory is measurable. */ + private static final String HEAVY_SQL = + "SELECT a, b FROM " + + " (SELECT i AS a, i * 7 AS b FROM " + + " (SELECT * FROM " + + " generate_series(1, 200000) AS t(i))) " + + "ORDER BY b DESC"; + + @Test + void snapshotIsZeroBeforeAnyQuery() { + try (SessionContext ctx = new SessionContext()) { + MemoryUsage usage = ctx.memoryUsage(); + assertEquals(0L, usage.currentBytes()); + assertEquals(0L, usage.peakBytes()); + } + } + + @Test + void peakIncreasesAfterMemoryHeavyQuery() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + try (DataFrame df = ctx.sql(HEAVY_SQL); + ArrowReader reader = df.collect(allocator)) { + // Drain the result so the sort actually runs and allocates buffers. + while (reader.loadNextBatch()) { + assertNotNull(reader.getVectorSchemaRoot()); + } + } + MemoryUsage usage = ctx.memoryUsage(); + assertTrue(usage.peakBytes() > 0L, () -> "peakBytes was " + usage.peakBytes()); + assertTrue( + usage.peakBytes() >= usage.currentBytes(), + () -> "peak " + usage.peakBytes() + " < current " + usage.currentBytes()); + } + } + + @Test + void peakStaysAfterCurrentDrops() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + try (DataFrame df = ctx.sql(HEAVY_SQL); + ArrowReader reader = df.collect(allocator)) { + while (reader.loadNextBatch()) { + assertNotNull(reader.getVectorSchemaRoot()); + } + } + MemoryUsage afterRun = ctx.memoryUsage(); + // After the query is fully drained and closed, current should be ~0 + // (DataFusion shrinks reservations on drop) but peak should remain. + assertEquals(0L, afterRun.currentBytes()); + assertTrue(afterRun.peakBytes() > 0L); + } + } + + @Test + void pollableFromAnotherThread() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + AtomicBoolean done = new AtomicBoolean(false); + AtomicLong observedPeak = new AtomicLong(0L); + AtomicReference error = new AtomicReference<>(); + + Thread poller = + new Thread( + () -> { + try { + while (!done.get()) { + long p = ctx.memoryUsage().peakBytes(); + long prev = observedPeak.get(); + if (p > prev) { + observedPeak.compareAndSet(prev, p); + } + Thread.yield(); + } + } catch (Throwable t) { + error.set(t); + } + }); + poller.start(); + try { + try (DataFrame df = ctx.sql(HEAVY_SQL); + ArrowReader reader = df.collect(allocator)) { + while (reader.loadNextBatch()) { + assertNotNull(reader.getVectorSchemaRoot()); + } + } + } finally { + done.set(true); + poller.join(); + } + assertNotNull(observedPeak); + // The poller should have observed at least one non-zero peak. + assertTrue( + observedPeak.get() > 0L, + () -> "concurrent poller never saw non-zero peak; error=" + error.get()); + // And no exceptions on the polling thread. + assertEquals(null, error.get()); + } + } + + @Test + void closedContextThrows() { + SessionContext ctx = new SessionContext(); + ctx.close(); + assertThrows(IllegalStateException.class, ctx::memoryUsage); + } +} diff --git a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java new file mode 100644 index 0000000..120d179 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link SessionContext#runtimeStats()}. The {@code runtime-metrics} Cargo feature is off + * by default in {@code native/Cargo.toml}; if the native crate was built without it (and without + * {@code RUSTFLAGS="--cfg tokio_unstable"}), every test here is skipped via {@link + * #checkFeatureEnabled}. Run + * + *

{@code
+ * (cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics)
+ * }
+ * + * before {@code ./mvnw test} to exercise this class. + */ +class SessionContextRuntimeStatsTest { + + @BeforeAll + static void checkFeatureEnabled() { + try (SessionContext ctx = new SessionContext()) { + ctx.runtimeStats(); + } catch (RuntimeException e) { + String msg = e.getMessage() == null ? "" : e.getMessage(); + Assumptions.assumeFalse( + msg.contains("runtime-metrics` Cargo feature"), + "datafusion-jni built without the `runtime-metrics` feature; skipping runtime-stats" + + " tests"); + } + } + + @Test + void fieldsAreNonNegative() { + try (SessionContext ctx = new SessionContext()) { + RuntimeStats s = ctx.runtimeStats(); + assertTrue(s.numWorkers() > 0, () -> "numWorkers=" + s.numWorkers()); + assertTrue(s.liveTasksCount() >= 0, () -> "liveTasksCount=" + s.liveTasksCount()); + assertTrue(s.globalQueueDepth() >= 0, () -> "globalQueueDepth=" + s.globalQueueDepth()); + assertTrue(s.elapsedNanos() >= 0L, () -> "elapsedNanos=" + s.elapsedNanos()); + assertTrue(s.totalBusyNanos() >= 0L, () -> "totalBusyNanos=" + s.totalBusyNanos()); + assertTrue(s.totalParkCount() >= 0L); + assertTrue(s.totalPollsCount() >= 0L); + assertTrue(s.totalNoopCount() >= 0L); + assertTrue(s.totalStealCount() >= 0L); + assertTrue(s.totalLocalScheduleCount() >= 0L); + assertTrue(s.totalOverflowCount() >= 0L); + } + } + + @Test + void numWorkersMatchesRuntimeShape() { + try (SessionContext ctx = new SessionContext()) { + // The shared multi-threaded Tokio runtime defaults to one worker per CPU. + // We don't assert the exact count -- runners differ -- but it must be >= 1. + assertTrue(ctx.runtimeStats().numWorkers() >= 1); + } + } + + @Test + void elapsedIncreasesAcrossSnapshots() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + // The strict guarantee from `tokio_metrics::RuntimeMonitor::intervals()` + // is that each delta covers the wall-clock since the previous probe. + // `total_busy_duration` and `total_polls_count` only advance if work + // *actually runs on a Tokio worker thread* during the interval -- and + // for small in-memory DataFusion queries the future may complete + // synchronously on the JNI thread (which is not a worker), so neither + // metric is a reliable monotonic signal in tests. `elapsed` is, since + // it's pure wall-clock between probes. + RuntimeStats before = ctx.runtimeStats(); + for (int i = 0; i < 5; i++) { + try (DataFrame df = + ctx.sql( + "SELECT i FROM " + + " (SELECT * FROM generate_series(1, 50000) AS t(i)) " + + "ORDER BY i DESC"); + ArrowReader reader = df.collect(allocator)) { + while (reader.loadNextBatch()) { + assertNotNull(reader.getVectorSchemaRoot()); + } + } + } + RuntimeStats after = ctx.runtimeStats(); + assertTrue( + after.elapsedNanos() > before.elapsedNanos(), + () -> + "after.elapsedNanos=" + + after.elapsedNanos() + + " not greater than before=" + + before.elapsedNanos()); + } + } + + @Test + void closedContextThrows() { + SessionContext ctx = new SessionContext(); + ctx.close(); + assertThrows(IllegalStateException.class, ctx::runtimeStats); + } +} diff --git a/native/Cargo.lock b/native/Cargo.lock index d171c8a..919801f 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1294,6 +1294,7 @@ dependencies = [ "prost-build", "protoc-bin-vendored", "tokio", + "tokio-metrics", "url", ] @@ -3530,6 +3531,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-metrics" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e81d53caf955549b1dec7af4ac2149e94cc25ed97b4a545151140281e2f528" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-rustls" version = "0.26.4" diff --git a/native/Cargo.toml b/native/Cargo.toml index 42fab85..4dee54a 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -36,12 +36,17 @@ jni = "0.21" object_store = { version = "0.13", default-features = false } prost = "0.14" tokio = { version = "1", features = ["rt-multi-thread"] } +# Tokio runtime metrics. Optional + cfg-gated: this crate's API surface lives +# behind `--cfg tokio_unstable`, so enabling the `runtime-metrics` feature also +# requires the caller to set `RUSTFLAGS="--cfg tokio_unstable"` at build time. +tokio-metrics = { version = "0.5", optional = true } url = "2" [features] -# Enable all three cloud backends by default so `make test` exercises them. -# Downstream builds that strip a feature get a clear runtime error if a -# caller tries to register an unsupported backend. +# Object-store backends are on by default so `make test` exercises them; +# downstream builds that strip a feature get a clear runtime error if a caller +# tries to register an unsupported backend. `runtime-metrics` is opt-in because +# it requires the `tokio_unstable` cfg flag at build time (see below). default = [ "object-store-aws", "object-store-gcp", @@ -50,6 +55,17 @@ default = [ object-store-aws = ["object_store/aws"] object-store-gcp = ["object_store/gcp"] object-store-http = ["object_store/http"] +# Pulls in `tokio-metrics` so `SessionContext.runtimeStats()` returns real +# numbers from the underlying Tokio runtime. Off by default because +# `tokio-metrics` only compiles when `--cfg tokio_unstable` is set, and we +# don't want to force that flag onto every build. Enable explicitly: +# +# RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics +# +# The JNI handler returns a clear "feature not enabled" error from the JVM if +# invoked in a build that compiled the feature off, so the Java surface is +# unchanged either way. +runtime-metrics = ["dep:tokio-metrics"] [build-dependencies] prost-build = "0.14" diff --git a/native/src/lib.rs b/native/src/lib.rs index cebcb22..8bc8846 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -21,8 +21,10 @@ mod csv; mod errors; mod jni_util; mod json; +mod memory; mod object_store; mod proto; +mod runtime_metrics; mod schema; mod table_provider; mod udf; @@ -80,7 +82,36 @@ pub(crate) fn jvm() -> &'static JavaVM { pub(crate) fn runtime() -> &'static Runtime { static RT: OnceLock = OnceLock::new(); - RT.get_or_init(|| Runtime::new().expect("failed to create Tokio runtime")) + RT.get_or_init(|| { + let rt = Runtime::new().expect("failed to create Tokio runtime"); + // Eagerly install the runtime-metrics accumulator (no-op when the + // `runtime-metrics` Cargo feature is off). Initialising here -- not + // lazily on the first `runtimeStats()` call -- means the + // RuntimeMonitor's sampling baseline coincides with runtime start, so + // poll/park/busy totals reflect activity from the first query onward + // rather than from the first observation. + crate::runtime_metrics::init(rt.handle()); + rt + }) +} + +/// Wrap the (already-built) `RuntimeEnvBuilder`'s memory pool with a +/// `TrackingMemoryPool` so `Java_..._memoryUsage` can report current/peak +/// bytes for this session. Registers the tracker in the process-wide map +/// keyed by the JNI handle. Idempotent: if the builder didn't set a pool, +/// fills in DataFusion's default first so the wrap-and-register has the +/// same behavior either way. +fn install_memory_tracker( + builder: &mut RuntimeEnvBuilder, +) -> std::sync::Arc { + use datafusion::execution::memory_pool::UnboundedMemoryPool; + let inner: std::sync::Arc = builder + .memory_pool + .take() + .unwrap_or_else(|| std::sync::Arc::new(UnboundedMemoryPool::default())); + let tracker = std::sync::Arc::new(crate::memory::TrackingMemoryPool::new(inner)); + builder.memory_pool = Some(tracker.clone()); + tracker } #[no_mangle] @@ -89,8 +120,13 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo _class: JClass<'local>, ) -> jlong { try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult { - let ctx = SessionContext::new(); - Ok(Box::into_raw(Box::new(ctx)) as jlong) + let mut runtime_builder = RuntimeEnvBuilder::new(); + let tracker = install_memory_tracker(&mut runtime_builder); + let runtime_env = runtime_builder.build()?; + let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env)); + let handle = Box::into_raw(Box::new(ctx)) as jlong; + crate::memory::register(handle, tracker); + Ok(handle) }) } @@ -157,6 +193,13 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo config.options_mut().set(&opt.key, &opt.value)?; } + // Wrap the configured pool (default `UnboundedMemoryPool` or whatever + // `with_memory_limit` produced) with a tracker so `memoryUsage()` can + // report current/peak bytes for this session. The tracker is + // transparent to query execution -- it only intercepts grow/shrink to + // update two atomics. + let tracker = install_memory_tracker(&mut runtime_builder); + let runtime_env = runtime_builder.build()?; let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime_env)); @@ -166,7 +209,9 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo // via try_unwrap_or_throw. crate::object_store::apply_registrations(&ctx, &opts.object_stores)?; - Ok(Box::into_raw(Box::new(ctx)) as jlong) + let handle = Box::into_raw(Box::new(ctx)) as jlong; + crate::memory::register(handle, tracker); + Ok(handle) }) } @@ -707,6 +752,10 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_closeSessionCon ) { try_unwrap_or_throw(&mut env, (), |_env| -> JniResult<()> { if handle != 0 { + // Drop our side-table entry first so the tracker Arc's last + // strong reference is the SessionContext->RuntimeEnv chain, then + // drop the SessionContext itself. + crate::memory::unregister(handle); unsafe { drop(Box::from_raw(handle as *mut SessionContext)); } @@ -715,6 +764,59 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_closeSessionCon }) } +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_SessionContext_memoryUsageNative<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, +) -> jni::sys::jlongArray { + try_unwrap_or_throw( + &mut env, + std::ptr::null_mut(), + |env| -> JniResult { + if handle == 0 { + return Err("SessionContext handle is null".into()); + } + // Look up the tracker by JNI handle. Should always succeed because + // the ctor inserts before publishing the handle to Java. + let tracker = crate::memory::lookup(handle) + .ok_or("memory tracker not registered for this SessionContext")?; + // `snapshot()` returns a consistent (current, peak) pair where + // peak is always >= current even if a concurrent record_grow is + // in-flight. Callers see no transient `current > peak`. + let (current, peak) = tracker.snapshot(); + let values: [jlong; 2] = [current as jlong, peak as jlong]; + let arr = env.new_long_array(values.len() as jni::sys::jsize)?; + env.set_long_array_region(&arr, 0, &values)?; + Ok(arr.into_raw()) + }, + ) +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_SessionContext_runtimeStatsNative<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, +) -> jni::sys::jlongArray { + try_unwrap_or_throw( + &mut env, + std::ptr::null_mut(), + |env| -> JniResult { + if handle == 0 { + return Err("SessionContext handle is null".into()); + } + // Runtime stats are process-global -- the JNI library drives one + // shared multi-threaded runtime -- but we still gate on the + // SessionContext handle so callers can't ask after close(). + let stats = crate::runtime_metrics::runtime_stats()?; + let arr = env.new_long_array(stats.len() as jni::sys::jsize)?; + env.set_long_array_region(&arr, 0, &stats)?; + Ok(arr.into_raw()) + }, + ) +} + fn with_parquet_options( env: &mut JNIEnv, options_bytes: JByteArray, diff --git a/native/src/memory.rs b/native/src/memory.rs new file mode 100644 index 0000000..e105299 --- /dev/null +++ b/native/src/memory.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Session-wide memory accounting wrapper around any `MemoryPool`. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use datafusion::common::Result; +use datafusion::execution::memory_pool::{MemoryLimit, MemoryPool, MemoryReservation}; +use jni::sys::jlong; + +/// Wraps an inner `MemoryPool` with two counters: total bytes currently held +/// and the peak observed since the wrapper was constructed. Bookkeeping is two +/// atomic loads/stores per `grow`/`shrink` and adds no contention because the +/// inner pool already serializes its own state. +#[derive(Debug)] +pub struct TrackingMemoryPool { + inner: Arc, + current_bytes: AtomicU64, + peak_bytes: AtomicU64, +} + +impl TrackingMemoryPool { + pub fn new(inner: Arc) -> Self { + Self { + inner, + current_bytes: AtomicU64::new(0), + peak_bytes: AtomicU64::new(0), + } + } + + // Note: there are intentionally no separate `current_bytes()` / + // `peak_bytes()` accessors. Reading them independently allows a torn + // observation where `current > peak` while a concurrent `record_grow` is + // mid-flight. Callers must use `snapshot()` instead, which clamps. + + /// Returns `(current_bytes, peak_bytes)` as a consistent snapshot. Two + /// invariants are maintained: + /// + /// 1. **Within a snapshot:** `peak >= current`. This matters when a + /// concurrent `record_grow` is in-flight between its `fetch_add` of + /// `current_bytes` and its `fetch_max` of `peak_bytes` -- a naive + /// snapshot could observe the higher current but the stale peak. + /// 2. **Across snapshots:** `peak` is monotonically non-decreasing for + /// every observer. If reader R1 ever sees peak = X, no later reader R2 + /// (on any thread) sees peak < X. + /// + /// The read-side clamp `max(stored_peak, observed_current)` alone gives + /// (1) but not (2): it's possible for R1 to *synthesize* a peak of X + /// from a transiently-high current, and then for a `record_shrink` plus + /// the slow path of `record_grow`'s missing `fetch_max` to leave the + /// stored peak briefly below X while R2 polls. R2 would observe a + /// smaller current and a smaller stored peak -- non-monotonic. + /// + /// The fix is to persist the clamped value back into `peak_bytes` via + /// `fetch_max` *before* returning. Doing so guarantees every snapshot's + /// returned peak is also reflected in the global atomic, so any + /// subsequent observer's `peak_bytes.load()` is at least as high as the + /// largest peak any earlier observer returned. + /// + /// Both atomics are `Relaxed`-ordered; we tolerate slightly stale values + /// in exchange for cheaper writes. + pub fn snapshot(&self) -> (u64, u64) { + // Order matters on the load side too: read `peak` first, then + // `current`. If a `record_grow` is in flight, this order ensures + // that whenever we observe a fresh `current` we also observe at + // least the corresponding (or older) `peak` -- so the eventual + // clamp `max(peak, current)` produces the right value. + let peak_observed = self.peak_bytes.load(Ordering::Relaxed); + let current = self.current_bytes.load(Ordering::Relaxed); + let clamped_peak = peak_observed.max(current); + // Persist the clamped peak so future observers can never see a + // smaller peak than the one we just returned. fetch_max is + // monotonic, so this is a no-op when another thread (or this very + // thread on a previous call) already pushed peak past clamped_peak. + let stored_peak = self.peak_bytes.fetch_max(clamped_peak, Ordering::Relaxed); + (current, stored_peak.max(clamped_peak)) + } + + fn record_grow(&self, additional: usize) { + // We bump `current` first to avoid double-counting in `peak` if a + // concurrent reader sees a stale current and a fresh peak. The + // snapshot path's read-side `max(peak, current)` clamp closes the + // window where `current` is up-to-date but `peak` is not. + let now = self + .current_bytes + .fetch_add(additional as u64, Ordering::Relaxed) + .saturating_add(additional as u64); + // fetch_max is monotonic so concurrent grows still produce the right peak. + self.peak_bytes.fetch_max(now, Ordering::Relaxed); + } + + fn record_shrink(&self, shrink: usize) { + // Saturating: if the inner pool ever calls shrink with a value larger + // than the running total we'd rather report 0 than wrap around. + let prev = self + .current_bytes + .fetch_sub(shrink as u64, Ordering::Relaxed); + if prev < shrink as u64 { + // Restore to zero. Concurrent grows that interleaved are fine -- + // they'll just see a transient under-count. + self.current_bytes.store(0, Ordering::Relaxed); + } + } +} + +impl MemoryPool for TrackingMemoryPool { + fn register(&self, consumer: &datafusion::execution::memory_pool::MemoryConsumer) { + self.inner.register(consumer); + } + + fn unregister(&self, consumer: &datafusion::execution::memory_pool::MemoryConsumer) { + self.inner.unregister(consumer); + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + self.inner.grow(reservation, additional); + self.record_grow(additional); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + self.inner.shrink(reservation, shrink); + self.record_shrink(shrink); + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + self.inner.try_grow(reservation, additional)?; + self.record_grow(additional); + Ok(()) + } + + fn reserved(&self) -> usize { + self.inner.reserved() + } + + fn memory_limit(&self) -> MemoryLimit { + self.inner.memory_limit() + } +} + +/// Process-wide registry mapping a `SessionContext`'s native handle to its +/// `TrackingMemoryPool`. Populated at session creation, drained at session +/// close. We can't downcast `Arc` back to the concrete type +/// without an `Any` bound (which the trait does not require), so instead we +/// keep a parallel `Arc` indexed by the JNI handle Java already threads +/// through every call. +fn registry() -> &'static Mutex>> { + static REGISTRY: OnceLock>>> = OnceLock::new(); + REGISTRY.get_or_init(|| Mutex::new(HashMap::new())) +} + +pub fn register(handle: jlong, pool: Arc) { + registry() + .lock() + .expect("memory registry lock poisoned") + .insert(handle, pool); +} + +pub fn lookup(handle: jlong) -> Option> { + registry() + .lock() + .expect("memory registry lock poisoned") + .get(&handle) + .cloned() +} + +pub fn unregister(handle: jlong) { + registry() + .lock() + .expect("memory registry lock poisoned") + .remove(&handle); +} diff --git a/native/src/runtime_metrics.rs b/native/src/runtime_metrics.rs new file mode 100644 index 0000000..e69410e --- /dev/null +++ b/native/src/runtime_metrics.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tokio runtime metrics, gated behind the `runtime-metrics` Cargo feature. +//! +//! `tokio-metrics` only compiles when `--cfg tokio_unstable` is set, so the +//! whole module is conditional on the feature flag. The feature-off variant +//! still exists; `runtime_stats()` returns an `Err` whose message points at +//! the rebuild command. The Java surface (`SessionContext.runtimeStats()`) +//! is therefore stable across both build configurations. +//! +//! Field layout returned across the FFI boundary (in this exact order, all +//! `i64`): +//! 0 numWorkers +//! 1 liveTasksCount +//! 2 globalQueueDepth +//! 3 elapsedNanos +//! 4 totalBusyNanos +//! 5 totalParkCount +//! 6 totalPollsCount +//! 7 totalNoopCount +//! 8 totalStealCount +//! 9 totalLocalScheduleCount +//! 10 totalOverflowCount + +#[cfg(not(feature = "runtime-metrics"))] +use crate::errors::JniResult; + +/// Number of i64 values in the snapshot array; kept here so the Java side and +/// the feature-off stub agree on the layout. +pub const STATS_FIELD_COUNT: usize = 11; + +#[cfg(feature = "runtime-metrics")] +mod imp { + use std::sync::{Mutex, OnceLock}; + use std::time::Duration; + use tokio_metrics::{RuntimeIntervals, RuntimeMonitor}; + + use super::STATS_FIELD_COUNT; + use crate::errors::JniResult; + + /// `RuntimeMonitor::intervals().next()` returns *delta* metrics covering + /// the period since the previous call (or, on the very first call, since + /// the iterator was constructed). To present the documented + /// "totals since runtime start" semantic to Java callers, we own the + /// iterator process-wide and accumulate every field that's documented as + /// monotonic. + /// + /// Snapshot fields (`workers_count`, `live_tasks_count`, + /// `global_queue_depth`) are point-in-time and pass through unchanged. + struct RuntimeAccumulator { + intervals: RuntimeIntervals, + elapsed: Duration, + total_busy: Duration, + total_park_count: u64, + total_polls_count: u64, + total_noop_count: u64, + total_steal_count: u64, + total_local_schedule_count: u64, + total_overflow_count: u64, + } + + static ACC: OnceLock> = OnceLock::new(); + + fn accumulator() -> &'static Mutex { + // Warm `crate::runtime()` first so the shared Tokio runtime exists + // and its OnceLock initializer has already populated ACC via + // `runtime_metrics::init`. We must NOT call `runtime()` from inside + // `ACC.get_or_init`: that would acquire ACC's once-init slot, then + // (on the very first call) recurse through `RT.get_or_init` -> + // `init()` -> `ACC.get_or_init`, which is same-thread reentrancy on + // the same OnceLock and deadlocks indefinitely. + let _ = crate::runtime(); + // After `runtime()` returns, ACC is guaranteed populated. The + // defensive `get_or_init` covers a future build path that constructs + // the runtime without going through `crate::runtime` (none today); + // it is safe to invoke from here because no init lock is held. + ACC.get_or_init(build_accumulator) + } + + fn build_accumulator() -> Mutex { + let handle = crate::runtime().handle().clone(); + let monitor = RuntimeMonitor::new(&handle); + Mutex::new(RuntimeAccumulator { + intervals: monitor.intervals(), + elapsed: Duration::ZERO, + total_busy: Duration::ZERO, + total_park_count: 0, + total_polls_count: 0, + total_noop_count: 0, + total_steal_count: 0, + total_local_schedule_count: 0, + total_overflow_count: 0, + }) + } + + /// Eagerly construct the runtime-metrics accumulator. Called from + /// `crate::runtime()` immediately after the shared Tokio runtime is + /// created so the accumulator's interval baseline matches runtime start. + /// Without this, `RuntimeMonitor` would only start sampling at the first + /// `runtimeStats()` call, silently dropping every poll/busy nanosecond + /// that happened before that. + pub fn init(handle: &tokio::runtime::Handle) { + ACC.get_or_init(|| { + let monitor = RuntimeMonitor::new(handle); + Mutex::new(RuntimeAccumulator { + intervals: monitor.intervals(), + elapsed: Duration::ZERO, + total_busy: Duration::ZERO, + total_park_count: 0, + total_polls_count: 0, + total_noop_count: 0, + total_steal_count: 0, + total_local_schedule_count: 0, + total_overflow_count: 0, + }) + }); + } + + pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> { + let mut acc = accumulator() + .lock() + .map_err(|_| "runtime-metrics accumulator lock poisoned")?; + let delta = acc + .intervals + .next() + .ok_or("tokio-metrics RuntimeMonitor returned no interval")?; + // Workers count and the point-in-time fields are not deltas; they are + // the runtime's current state. Pass through directly. + let workers_count = delta.workers_count; + let live_tasks_count = delta.live_tasks_count; + let global_queue_depth = delta.global_queue_depth; + // Accumulate the deltas. + acc.elapsed = acc.elapsed.saturating_add(delta.elapsed); + acc.total_busy = acc.total_busy.saturating_add(delta.total_busy_duration); + acc.total_park_count = acc.total_park_count.saturating_add(delta.total_park_count); + acc.total_polls_count = acc + .total_polls_count + .saturating_add(delta.total_polls_count); + acc.total_noop_count = acc.total_noop_count.saturating_add(delta.total_noop_count); + acc.total_steal_count = acc + .total_steal_count + .saturating_add(delta.total_steal_count); + acc.total_local_schedule_count = acc + .total_local_schedule_count + .saturating_add(delta.total_local_schedule_count); + acc.total_overflow_count = acc + .total_overflow_count + .saturating_add(delta.total_overflow_count); + + Ok([ + workers_count as i64, + live_tasks_count as i64, + global_queue_depth as i64, + i128_to_i64_saturating(acc.elapsed.as_nanos() as i128), + i128_to_i64_saturating(acc.total_busy.as_nanos() as i128), + acc.total_park_count as i64, + acc.total_polls_count as i64, + acc.total_noop_count as i64, + acc.total_steal_count as i64, + acc.total_local_schedule_count as i64, + acc.total_overflow_count as i64, + ]) + } + + fn i128_to_i64_saturating(v: i128) -> i64 { + v.clamp(i64::MIN as i128, i64::MAX as i128) as i64 + } +} + +#[cfg(feature = "runtime-metrics")] +pub use imp::{init, runtime_stats}; + +/// No-op when the `runtime-metrics` feature is off: there is no monitor to +/// initialise. Kept as a regular function (rather than a `cfg` annotation at +/// every call site) so the runtime construction path stays unconditional. +#[cfg(not(feature = "runtime-metrics"))] +pub fn init(_handle: &tokio::runtime::Handle) {} + +#[cfg(not(feature = "runtime-metrics"))] +pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> { + Err( + "datafusion-jni was built without the `runtime-metrics` Cargo feature; \ + rebuild the native crate with \ + `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics` \ + to enable SessionContext.runtimeStats" + .into(), + ) +}