diff --git a/Makefile b/Makefile
index ca894d3..10e225e 100644
--- a/Makefile
+++ b/Makefile
@@ -15,13 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-.PHONY: all native jvm test clean tpch-data
+.PHONY: all native native-runtime-metrics jvm test clean tpch-data
all: native jvm
native:
cd native && cargo build
+# Build the native crate with the `runtime-metrics` Cargo feature enabled.
+# Requires `--cfg tokio_unstable` because tokio-metrics gates its API there.
+# Default `make native` does not pull this in; callers who need
+# SessionContext.runtimeStats() pick this target explicitly.
+native-runtime-metrics:
+ cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+
jvm:
./mvnw package -DskipTests
diff --git a/core/src/main/java/org/apache/datafusion/MemoryUsage.java b/core/src/main/java/org/apache/datafusion/MemoryUsage.java
new file mode 100644
index 0000000..e4864d7
--- /dev/null
+++ b/core/src/main/java/org/apache/datafusion/MemoryUsage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+/**
+ * Snapshot of session-wide memory usage from {@link SessionContext#memoryUsage()}. Returns the
+ * total bytes currently reserved against the session's {@code MemoryPool} and the peak observed
+ * since the session was created.
+ *
+ *
Multi-tenant attribution: place each tenant in its own {@link SessionContext}. Sessions in
+ * DataFusion are cheap; one per tenant is the standard pattern. Within a single session, the
+ * snapshot is the sum across all in-flight queries' operator reservations -- see {@link
+ * SessionContext#memoryUsage()} for the precise definition of what is and is not counted.
+ *
+ *
Instances are immutable.
+ */
+public final class MemoryUsage {
+ private final long currentBytes;
+ private final long peakBytes;
+
+ public MemoryUsage(long currentBytes, long peakBytes) {
+ this.currentBytes = currentBytes;
+ this.peakBytes = peakBytes;
+ }
+
+ /** Bytes currently reserved against this session's {@code MemoryPool}. */
+ public long currentBytes() {
+ return currentBytes;
+ }
+
+ /**
+ * Maximum value of {@link #currentBytes()} observed since the session was created. Monotonic:
+ * never decreases for a given session.
+ */
+ public long peakBytes() {
+ return peakBytes;
+ }
+
+ @Override
+ public String toString() {
+ return "MemoryUsage{currentBytes=" + currentBytes + ", peakBytes=" + peakBytes + "}";
+ }
+}
diff --git a/core/src/main/java/org/apache/datafusion/RuntimeStats.java b/core/src/main/java/org/apache/datafusion/RuntimeStats.java
new file mode 100644
index 0000000..b80795b
--- /dev/null
+++ b/core/src/main/java/org/apache/datafusion/RuntimeStats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+/**
+ * Snapshot of the underlying Tokio runtime from {@link SessionContext#runtimeStats()}. Mirrors the
+ * subset of {@code tokio_metrics::RuntimeMetrics} fields that node-level dashboards typically
+ * surface (worker count, busy time, queue depth, etc.).
+ *
+ *
Instances are immutable. The snapshot is process-wide rather than per-session because the JNI
+ * library drives a single shared multi-threaded Tokio runtime; gating on the {@code SessionContext}
+ * only ensures the caller still has a live handle.
+ *
+ *
Requires the {@code runtime-metrics} Cargo feature on the native crate, plus {@code
+ * RUSTFLAGS="--cfg tokio_unstable"} at build time.
+ */
+public final class RuntimeStats {
+ private final int numWorkers;
+ private final long liveTasksCount;
+ private final long globalQueueDepth;
+ private final long elapsedNanos;
+ private final long totalBusyNanos;
+ private final long totalParkCount;
+ private final long totalPollsCount;
+ private final long totalNoopCount;
+ private final long totalStealCount;
+ private final long totalLocalScheduleCount;
+ private final long totalOverflowCount;
+
+ public RuntimeStats(
+ int numWorkers,
+ long liveTasksCount,
+ long globalQueueDepth,
+ long elapsedNanos,
+ long totalBusyNanos,
+ long totalParkCount,
+ long totalPollsCount,
+ long totalNoopCount,
+ long totalStealCount,
+ long totalLocalScheduleCount,
+ long totalOverflowCount) {
+ this.numWorkers = numWorkers;
+ this.liveTasksCount = liveTasksCount;
+ this.globalQueueDepth = globalQueueDepth;
+ this.elapsedNanos = elapsedNanos;
+ this.totalBusyNanos = totalBusyNanos;
+ this.totalParkCount = totalParkCount;
+ this.totalPollsCount = totalPollsCount;
+ this.totalNoopCount = totalNoopCount;
+ this.totalStealCount = totalStealCount;
+ this.totalLocalScheduleCount = totalLocalScheduleCount;
+ this.totalOverflowCount = totalOverflowCount;
+ }
+
+ /** Number of OS worker threads driving the multi-threaded Tokio runtime. */
+ public int numWorkers() {
+ return numWorkers;
+ }
+
+ /** Number of tasks currently scheduled (alive) on the runtime. */
+ public long liveTasksCount() {
+ return liveTasksCount;
+ }
+
+ /** Tasks waiting in the global injection queue, not yet picked up by a worker. */
+ public long globalQueueDepth() {
+ return globalQueueDepth;
+ }
+
+ /** Wall-clock nanoseconds covered by this snapshot's interval (since runtime start). */
+ public long elapsedNanos() {
+ return elapsedNanos;
+ }
+
+ /** Total nanoseconds workers spent doing work (sum across workers). */
+ public long totalBusyNanos() {
+ return totalBusyNanos;
+ }
+
+ /** Times a worker has parked itself (gone idle waiting for work). */
+ public long totalParkCount() {
+ return totalParkCount;
+ }
+
+ /** Total task polls completed across workers. */
+ public long totalPollsCount() {
+ return totalPollsCount;
+ }
+
+ /** Times a worker unparked but found no work (false wakeup). */
+ public long totalNoopCount() {
+ return totalNoopCount;
+ }
+
+ /** Times a worker successfully stole tasks from another worker. */
+ public long totalStealCount() {
+ return totalStealCount;
+ }
+
+ /** Tasks scheduled into a worker's local queue. */
+ public long totalLocalScheduleCount() {
+ return totalLocalScheduleCount;
+ }
+
+ /** Times a worker's local queue overflowed and pushed tasks to the injector. */
+ public long totalOverflowCount() {
+ return totalOverflowCount;
+ }
+
+ @Override
+ public String toString() {
+ return "RuntimeStats{numWorkers="
+ + numWorkers
+ + ", liveTasksCount="
+ + liveTasksCount
+ + ", globalQueueDepth="
+ + globalQueueDepth
+ + ", elapsedNanos="
+ + elapsedNanos
+ + ", totalBusyNanos="
+ + totalBusyNanos
+ + ", totalParkCount="
+ + totalParkCount
+ + ", totalPollsCount="
+ + totalPollsCount
+ + ", totalNoopCount="
+ + totalNoopCount
+ + ", totalStealCount="
+ + totalStealCount
+ + ", totalLocalScheduleCount="
+ + totalLocalScheduleCount
+ + ", totalOverflowCount="
+ + totalOverflowCount
+ + "}";
+ }
+}
diff --git a/core/src/main/java/org/apache/datafusion/SessionContext.java b/core/src/main/java/org/apache/datafusion/SessionContext.java
index 674341a..6486a51 100644
--- a/core/src/main/java/org/apache/datafusion/SessionContext.java
+++ b/core/src/main/java/org/apache/datafusion/SessionContext.java
@@ -101,6 +101,73 @@ public DataFrame fromProto(byte[] planBytes) {
return new DataFrame(dfHandle);
}
+ /**
+ * Snapshot the session's memory pool: bytes currently held and the peak observed since this
+ * session was created. Thread-safe; can be polled while queries run.
+ *
+ *
For multi-tenant attribution, place each tenant in its own {@link SessionContext}. Within a
+ * single session, the snapshot is the sum across all in-flight queries -- there is no
+ * per-DataFrame breakdown today.
+ *
+ *
The session's {@code MemoryPool} is wrapped transparently with a tracking adapter at
+ * construction time; the wrapper layers on top of whatever pool {@link
+ * SessionContextBuilder#memoryLimit(long, double)} produced (or DataFusion's default unbounded
+ * pool) and does not change pool semantics (limits, eviction, spilling).
+ *
+ *
What this counts: bytes reserved against the {@code MemoryPool} -- operator state for
+ * sorts, hash joins, aggregates, repartition buffers, and anything else that uses DataFusion's
+ * {@code MemoryReservation} machinery during execution.
+ *
+ *
What this does not count: memory held outside the pool, including record-batch
+ * buffers materialised by {@link DataFrame#cache()} (stored in an in-memory {@code MemTable} as
+ * plain {@code Vec} with no reservation), record-batch buffers that have crossed the
+ * FFI boundary into Arrow's Java allocator, and JVM-side allocations. Operator-level reservations
+ * are released as the plan unwinds, so a query that runs to completion typically returns {@code
+ * currentBytes} to ~0 even if the result set is large.
+ *
+ * @throws IllegalStateException if this context is closed.
+ * @throws RuntimeException if the native side has not registered a tracker for this handle
+ * (should not happen in practice -- tracker registration is done by the constructor).
+ */
+ public MemoryUsage memoryUsage() {
+ if (nativeHandle == 0) {
+ throw new IllegalStateException("SessionContext is closed");
+ }
+ long[] values = memoryUsageNative(nativeHandle);
+ return new MemoryUsage(values[0], values[1]);
+ }
+
+ /**
+ * Snapshot operational counters from the underlying Tokio runtime: worker count, busy time, queue
+ * depth, etc. Thread-safe; can be polled while queries run.
+ *
+ * The runtime is process-wide rather than per-session because the JNI library drives a single
+ * shared multi-threaded Tokio runtime. The {@link SessionContext} handle is checked only to
+ * ensure the caller still has a live session; the values returned are not session-specific.
+ *
+ *
Requires the {@code runtime-metrics} Cargo feature on the native crate (off by default).
+ * Rebuild with:
+ *
+ *
{@code
+ * RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+ * }
+ *
+ * If invoked against a native binary built without the feature, this method throws {@link
+ * RuntimeException} with a message pointing at the rebuild command.
+ *
+ * @throws IllegalStateException if this context is closed.
+ * @throws RuntimeException if the native crate was built without the {@code runtime-metrics}
+ * feature.
+ */
+ public RuntimeStats runtimeStats() {
+ if (nativeHandle == 0) {
+ throw new IllegalStateException("SessionContext is closed");
+ }
+ long[] s = runtimeStatsNative(nativeHandle);
+ return new RuntimeStats(
+ (int) s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7], s[8], s[9], s[10]);
+ }
+
/**
* Return the Arrow {@link Schema} of a registered table. Transferred via Arrow IPC; no {@link
* org.apache.arrow.memory.BufferAllocator} is required because a schema carries no buffer data.
@@ -524,6 +591,10 @@ public void close() {
private static native String getOptionNative(long handle, String key);
+ private static native long[] memoryUsageNative(long handle);
+
+ private static native long[] runtimeStatsNative(long handle);
+
private static native void registerParquetWithOptions(
long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes);
diff --git a/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java b/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java
new file mode 100644
index 0000000..7a67ea0
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/SessionContextMemoryUsageTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.Test;
+
+class SessionContextMemoryUsageTest {
+
+ /** A query with enough rows to drive a non-trivial sort, so peak memory is measurable. */
+ private static final String HEAVY_SQL =
+ "SELECT a, b FROM "
+ + " (SELECT i AS a, i * 7 AS b FROM "
+ + " (SELECT * FROM "
+ + " generate_series(1, 200000) AS t(i))) "
+ + "ORDER BY b DESC";
+
+ @Test
+ void snapshotIsZeroBeforeAnyQuery() {
+ try (SessionContext ctx = new SessionContext()) {
+ MemoryUsage usage = ctx.memoryUsage();
+ assertEquals(0L, usage.currentBytes());
+ assertEquals(0L, usage.peakBytes());
+ }
+ }
+
+ @Test
+ void peakIncreasesAfterMemoryHeavyQuery() throws Exception {
+ try (BufferAllocator allocator = new RootAllocator();
+ SessionContext ctx = new SessionContext()) {
+ try (DataFrame df = ctx.sql(HEAVY_SQL);
+ ArrowReader reader = df.collect(allocator)) {
+ // Drain the result so the sort actually runs and allocates buffers.
+ while (reader.loadNextBatch()) {
+ assertNotNull(reader.getVectorSchemaRoot());
+ }
+ }
+ MemoryUsage usage = ctx.memoryUsage();
+ assertTrue(usage.peakBytes() > 0L, () -> "peakBytes was " + usage.peakBytes());
+ assertTrue(
+ usage.peakBytes() >= usage.currentBytes(),
+ () -> "peak " + usage.peakBytes() + " < current " + usage.currentBytes());
+ }
+ }
+
+ @Test
+ void peakStaysAfterCurrentDrops() throws Exception {
+ try (BufferAllocator allocator = new RootAllocator();
+ SessionContext ctx = new SessionContext()) {
+ try (DataFrame df = ctx.sql(HEAVY_SQL);
+ ArrowReader reader = df.collect(allocator)) {
+ while (reader.loadNextBatch()) {
+ assertNotNull(reader.getVectorSchemaRoot());
+ }
+ }
+ MemoryUsage afterRun = ctx.memoryUsage();
+ // After the query is fully drained and closed, current should be ~0
+ // (DataFusion shrinks reservations on drop) but peak should remain.
+ assertEquals(0L, afterRun.currentBytes());
+ assertTrue(afterRun.peakBytes() > 0L);
+ }
+ }
+
+ @Test
+ void pollableFromAnotherThread() throws Exception {
+ try (BufferAllocator allocator = new RootAllocator();
+ SessionContext ctx = new SessionContext()) {
+ AtomicBoolean done = new AtomicBoolean(false);
+ AtomicLong observedPeak = new AtomicLong(0L);
+ AtomicReference error = new AtomicReference<>();
+
+ Thread poller =
+ new Thread(
+ () -> {
+ try {
+ while (!done.get()) {
+ long p = ctx.memoryUsage().peakBytes();
+ long prev = observedPeak.get();
+ if (p > prev) {
+ observedPeak.compareAndSet(prev, p);
+ }
+ Thread.yield();
+ }
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ });
+ poller.start();
+ try {
+ try (DataFrame df = ctx.sql(HEAVY_SQL);
+ ArrowReader reader = df.collect(allocator)) {
+ while (reader.loadNextBatch()) {
+ assertNotNull(reader.getVectorSchemaRoot());
+ }
+ }
+ } finally {
+ done.set(true);
+ poller.join();
+ }
+ assertNotNull(observedPeak);
+ // The poller should have observed at least one non-zero peak.
+ assertTrue(
+ observedPeak.get() > 0L,
+ () -> "concurrent poller never saw non-zero peak; error=" + error.get());
+ // And no exceptions on the polling thread.
+ assertEquals(null, error.get());
+ }
+ }
+
+ @Test
+ void closedContextThrows() {
+ SessionContext ctx = new SessionContext();
+ ctx.close();
+ assertThrows(IllegalStateException.class, ctx::memoryUsage);
+ }
+}
diff --git a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java
new file mode 100644
index 0000000..120d179
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link SessionContext#runtimeStats()}. The {@code runtime-metrics} Cargo feature is off
+ * by default in {@code native/Cargo.toml}; if the native crate was built without it (and without
+ * {@code RUSTFLAGS="--cfg tokio_unstable"}), every test here is skipped via {@link
+ * #checkFeatureEnabled}. Run
+ *
+ * {@code
+ * (cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics)
+ * }
+ *
+ * before {@code ./mvnw test} to exercise this class.
+ */
+class SessionContextRuntimeStatsTest {
+
+ @BeforeAll
+ static void checkFeatureEnabled() {
+ try (SessionContext ctx = new SessionContext()) {
+ ctx.runtimeStats();
+ } catch (RuntimeException e) {
+ String msg = e.getMessage() == null ? "" : e.getMessage();
+ Assumptions.assumeFalse(
+ msg.contains("runtime-metrics` Cargo feature"),
+ "datafusion-jni built without the `runtime-metrics` feature; skipping runtime-stats"
+ + " tests");
+ }
+ }
+
+ @Test
+ void fieldsAreNonNegative() {
+ try (SessionContext ctx = new SessionContext()) {
+ RuntimeStats s = ctx.runtimeStats();
+ assertTrue(s.numWorkers() > 0, () -> "numWorkers=" + s.numWorkers());
+ assertTrue(s.liveTasksCount() >= 0, () -> "liveTasksCount=" + s.liveTasksCount());
+ assertTrue(s.globalQueueDepth() >= 0, () -> "globalQueueDepth=" + s.globalQueueDepth());
+ assertTrue(s.elapsedNanos() >= 0L, () -> "elapsedNanos=" + s.elapsedNanos());
+ assertTrue(s.totalBusyNanos() >= 0L, () -> "totalBusyNanos=" + s.totalBusyNanos());
+ assertTrue(s.totalParkCount() >= 0L);
+ assertTrue(s.totalPollsCount() >= 0L);
+ assertTrue(s.totalNoopCount() >= 0L);
+ assertTrue(s.totalStealCount() >= 0L);
+ assertTrue(s.totalLocalScheduleCount() >= 0L);
+ assertTrue(s.totalOverflowCount() >= 0L);
+ }
+ }
+
+ @Test
+ void numWorkersMatchesRuntimeShape() {
+ try (SessionContext ctx = new SessionContext()) {
+ // The shared multi-threaded Tokio runtime defaults to one worker per CPU.
+ // We don't assert the exact count -- runners differ -- but it must be >= 1.
+ assertTrue(ctx.runtimeStats().numWorkers() >= 1);
+ }
+ }
+
+ @Test
+ void elapsedIncreasesAcrossSnapshots() throws Exception {
+ try (BufferAllocator allocator = new RootAllocator();
+ SessionContext ctx = new SessionContext()) {
+ // The strict guarantee from `tokio_metrics::RuntimeMonitor::intervals()`
+ // is that each delta covers the wall-clock since the previous probe.
+ // `total_busy_duration` and `total_polls_count` only advance if work
+ // *actually runs on a Tokio worker thread* during the interval -- and
+ // for small in-memory DataFusion queries the future may complete
+ // synchronously on the JNI thread (which is not a worker), so neither
+ // metric is a reliable monotonic signal in tests. `elapsed` is, since
+ // it's pure wall-clock between probes.
+ RuntimeStats before = ctx.runtimeStats();
+ for (int i = 0; i < 5; i++) {
+ try (DataFrame df =
+ ctx.sql(
+ "SELECT i FROM "
+ + " (SELECT * FROM generate_series(1, 50000) AS t(i)) "
+ + "ORDER BY i DESC");
+ ArrowReader reader = df.collect(allocator)) {
+ while (reader.loadNextBatch()) {
+ assertNotNull(reader.getVectorSchemaRoot());
+ }
+ }
+ }
+ RuntimeStats after = ctx.runtimeStats();
+ assertTrue(
+ after.elapsedNanos() > before.elapsedNanos(),
+ () ->
+ "after.elapsedNanos="
+ + after.elapsedNanos()
+ + " not greater than before="
+ + before.elapsedNanos());
+ }
+ }
+
+ @Test
+ void closedContextThrows() {
+ SessionContext ctx = new SessionContext();
+ ctx.close();
+ assertThrows(IllegalStateException.class, ctx::runtimeStats);
+ }
+}
diff --git a/native/Cargo.lock b/native/Cargo.lock
index d171c8a..919801f 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1294,6 +1294,7 @@ dependencies = [
"prost-build",
"protoc-bin-vendored",
"tokio",
+ "tokio-metrics",
"url",
]
@@ -3530,6 +3531,18 @@ dependencies = [
"syn",
]
+[[package]]
+name = "tokio-metrics"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9e81d53caf955549b1dec7af4ac2149e94cc25ed97b4a545151140281e2f528"
+dependencies = [
+ "futures-util",
+ "pin-project-lite",
+ "tokio",
+ "tokio-stream",
+]
+
[[package]]
name = "tokio-rustls"
version = "0.26.4"
diff --git a/native/Cargo.toml b/native/Cargo.toml
index 42fab85..4dee54a 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -36,12 +36,17 @@ jni = "0.21"
object_store = { version = "0.13", default-features = false }
prost = "0.14"
tokio = { version = "1", features = ["rt-multi-thread"] }
+# Tokio runtime metrics. Optional + cfg-gated: this crate's API surface lives
+# behind `--cfg tokio_unstable`, so enabling the `runtime-metrics` feature also
+# requires the caller to set `RUSTFLAGS="--cfg tokio_unstable"` at build time.
+tokio-metrics = { version = "0.5", optional = true }
url = "2"
[features]
-# Enable all three cloud backends by default so `make test` exercises them.
-# Downstream builds that strip a feature get a clear runtime error if a
-# caller tries to register an unsupported backend.
+# Object-store backends are on by default so `make test` exercises them;
+# downstream builds that strip a feature get a clear runtime error if a caller
+# tries to register an unsupported backend. `runtime-metrics` is opt-in because
+# it requires the `tokio_unstable` cfg flag at build time (see below).
default = [
"object-store-aws",
"object-store-gcp",
@@ -50,6 +55,17 @@ default = [
object-store-aws = ["object_store/aws"]
object-store-gcp = ["object_store/gcp"]
object-store-http = ["object_store/http"]
+# Pulls in `tokio-metrics` so `SessionContext.runtimeStats()` returns real
+# numbers from the underlying Tokio runtime. Off by default because
+# `tokio-metrics` only compiles when `--cfg tokio_unstable` is set, and we
+# don't want to force that flag onto every build. Enable explicitly:
+#
+# RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+#
+# The JNI handler returns a clear "feature not enabled" error from the JVM if
+# invoked in a build that compiled the feature off, so the Java surface is
+# unchanged either way.
+runtime-metrics = ["dep:tokio-metrics"]
[build-dependencies]
prost-build = "0.14"
diff --git a/native/src/lib.rs b/native/src/lib.rs
index cebcb22..8bc8846 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -21,8 +21,10 @@ mod csv;
mod errors;
mod jni_util;
mod json;
+mod memory;
mod object_store;
mod proto;
+mod runtime_metrics;
mod schema;
mod table_provider;
mod udf;
@@ -80,7 +82,36 @@ pub(crate) fn jvm() -> &'static JavaVM {
pub(crate) fn runtime() -> &'static Runtime {
static RT: OnceLock = OnceLock::new();
- RT.get_or_init(|| Runtime::new().expect("failed to create Tokio runtime"))
+ RT.get_or_init(|| {
+ let rt = Runtime::new().expect("failed to create Tokio runtime");
+ // Eagerly install the runtime-metrics accumulator (no-op when the
+ // `runtime-metrics` Cargo feature is off). Initialising here -- not
+ // lazily on the first `runtimeStats()` call -- means the
+ // RuntimeMonitor's sampling baseline coincides with runtime start, so
+ // poll/park/busy totals reflect activity from the first query onward
+ // rather than from the first observation.
+ crate::runtime_metrics::init(rt.handle());
+ rt
+ })
+}
+
+/// Wrap the (already-built) `RuntimeEnvBuilder`'s memory pool with a
+/// `TrackingMemoryPool` so `Java_..._memoryUsage` can report current/peak
+/// bytes for this session. Registers the tracker in the process-wide map
+/// keyed by the JNI handle. Idempotent: if the builder didn't set a pool,
+/// fills in DataFusion's default first so the wrap-and-register has the
+/// same behavior either way.
+fn install_memory_tracker(
+ builder: &mut RuntimeEnvBuilder,
+) -> std::sync::Arc {
+ use datafusion::execution::memory_pool::UnboundedMemoryPool;
+ let inner: std::sync::Arc = builder
+ .memory_pool
+ .take()
+ .unwrap_or_else(|| std::sync::Arc::new(UnboundedMemoryPool::default()));
+ let tracker = std::sync::Arc::new(crate::memory::TrackingMemoryPool::new(inner));
+ builder.memory_pool = Some(tracker.clone());
+ tracker
}
#[no_mangle]
@@ -89,8 +120,13 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo
_class: JClass<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult {
- let ctx = SessionContext::new();
- Ok(Box::into_raw(Box::new(ctx)) as jlong)
+ let mut runtime_builder = RuntimeEnvBuilder::new();
+ let tracker = install_memory_tracker(&mut runtime_builder);
+ let runtime_env = runtime_builder.build()?;
+ let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env));
+ let handle = Box::into_raw(Box::new(ctx)) as jlong;
+ crate::memory::register(handle, tracker);
+ Ok(handle)
})
}
@@ -157,6 +193,13 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo
config.options_mut().set(&opt.key, &opt.value)?;
}
+ // Wrap the configured pool (default `UnboundedMemoryPool` or whatever
+ // `with_memory_limit` produced) with a tracker so `memoryUsage()` can
+ // report current/peak bytes for this session. The tracker is
+ // transparent to query execution -- it only intercepts grow/shrink to
+ // update two atomics.
+ let tracker = install_memory_tracker(&mut runtime_builder);
+
let runtime_env = runtime_builder.build()?;
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime_env));
@@ -166,7 +209,9 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_createSessionCo
// via try_unwrap_or_throw.
crate::object_store::apply_registrations(&ctx, &opts.object_stores)?;
- Ok(Box::into_raw(Box::new(ctx)) as jlong)
+ let handle = Box::into_raw(Box::new(ctx)) as jlong;
+ crate::memory::register(handle, tracker);
+ Ok(handle)
})
}
@@ -707,6 +752,10 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_closeSessionCon
) {
try_unwrap_or_throw(&mut env, (), |_env| -> JniResult<()> {
if handle != 0 {
+ // Drop our side-table entry first so the tracker Arc's last
+ // strong reference is the SessionContext->RuntimeEnv chain, then
+ // drop the SessionContext itself.
+ crate::memory::unregister(handle);
unsafe {
drop(Box::from_raw(handle as *mut SessionContext));
}
@@ -715,6 +764,59 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_closeSessionCon
})
}
+#[no_mangle]
+pub extern "system" fn Java_org_apache_datafusion_SessionContext_memoryUsageNative<'local>(
+ mut env: JNIEnv<'local>,
+ _class: JClass<'local>,
+ handle: jlong,
+) -> jni::sys::jlongArray {
+ try_unwrap_or_throw(
+ &mut env,
+ std::ptr::null_mut(),
+ |env| -> JniResult {
+ if handle == 0 {
+ return Err("SessionContext handle is null".into());
+ }
+ // Look up the tracker by JNI handle. Should always succeed because
+ // the ctor inserts before publishing the handle to Java.
+ let tracker = crate::memory::lookup(handle)
+ .ok_or("memory tracker not registered for this SessionContext")?;
+ // `snapshot()` returns a consistent (current, peak) pair where
+ // peak is always >= current even if a concurrent record_grow is
+ // in-flight. Callers see no transient `current > peak`.
+ let (current, peak) = tracker.snapshot();
+ let values: [jlong; 2] = [current as jlong, peak as jlong];
+ let arr = env.new_long_array(values.len() as jni::sys::jsize)?;
+ env.set_long_array_region(&arr, 0, &values)?;
+ Ok(arr.into_raw())
+ },
+ )
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_datafusion_SessionContext_runtimeStatsNative<'local>(
+ mut env: JNIEnv<'local>,
+ _class: JClass<'local>,
+ handle: jlong,
+) -> jni::sys::jlongArray {
+ try_unwrap_or_throw(
+ &mut env,
+ std::ptr::null_mut(),
+ |env| -> JniResult {
+ if handle == 0 {
+ return Err("SessionContext handle is null".into());
+ }
+ // Runtime stats are process-global -- the JNI library drives one
+ // shared multi-threaded runtime -- but we still gate on the
+ // SessionContext handle so callers can't ask after close().
+ let stats = crate::runtime_metrics::runtime_stats()?;
+ let arr = env.new_long_array(stats.len() as jni::sys::jsize)?;
+ env.set_long_array_region(&arr, 0, &stats)?;
+ Ok(arr.into_raw())
+ },
+ )
+}
+
fn with_parquet_options(
env: &mut JNIEnv,
options_bytes: JByteArray,
diff --git a/native/src/memory.rs b/native/src/memory.rs
new file mode 100644
index 0000000..e105299
--- /dev/null
+++ b/native/src/memory.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Session-wide memory accounting wrapper around any `MemoryPool`.
+
+use std::collections::HashMap;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex, OnceLock};
+
+use datafusion::common::Result;
+use datafusion::execution::memory_pool::{MemoryLimit, MemoryPool, MemoryReservation};
+use jni::sys::jlong;
+
+/// Wraps an inner `MemoryPool` with two counters: total bytes currently held
+/// and the peak observed since the wrapper was constructed. Bookkeeping is two
+/// atomic loads/stores per `grow`/`shrink` and adds no contention because the
+/// inner pool already serializes its own state.
+#[derive(Debug)]
+pub struct TrackingMemoryPool {
+ inner: Arc,
+ current_bytes: AtomicU64,
+ peak_bytes: AtomicU64,
+}
+
+impl TrackingMemoryPool {
+ pub fn new(inner: Arc) -> Self {
+ Self {
+ inner,
+ current_bytes: AtomicU64::new(0),
+ peak_bytes: AtomicU64::new(0),
+ }
+ }
+
+ // Note: there are intentionally no separate `current_bytes()` /
+ // `peak_bytes()` accessors. Reading them independently allows a torn
+ // observation where `current > peak` while a concurrent `record_grow` is
+ // mid-flight. Callers must use `snapshot()` instead, which clamps.
+
+ /// Returns `(current_bytes, peak_bytes)` as a consistent snapshot. Two
+ /// invariants are maintained:
+ ///
+ /// 1. **Within a snapshot:** `peak >= current`. This matters when a
+ /// concurrent `record_grow` is in-flight between its `fetch_add` of
+ /// `current_bytes` and its `fetch_max` of `peak_bytes` -- a naive
+ /// snapshot could observe the higher current but the stale peak.
+ /// 2. **Across snapshots:** `peak` is monotonically non-decreasing for
+ /// every observer. If reader R1 ever sees peak = X, no later reader R2
+ /// (on any thread) sees peak < X.
+ ///
+ /// The read-side clamp `max(stored_peak, observed_current)` alone gives
+ /// (1) but not (2): it's possible for R1 to *synthesize* a peak of X
+ /// from a transiently-high current, and then for a `record_shrink` plus
+ /// the slow path of `record_grow`'s missing `fetch_max` to leave the
+ /// stored peak briefly below X while R2 polls. R2 would observe a
+ /// smaller current and a smaller stored peak -- non-monotonic.
+ ///
+ /// The fix is to persist the clamped value back into `peak_bytes` via
+ /// `fetch_max` *before* returning. Doing so guarantees every snapshot's
+ /// returned peak is also reflected in the global atomic, so any
+ /// subsequent observer's `peak_bytes.load()` is at least as high as the
+ /// largest peak any earlier observer returned.
+ ///
+ /// Both atomics are `Relaxed`-ordered; we tolerate slightly stale values
+ /// in exchange for cheaper writes.
+ pub fn snapshot(&self) -> (u64, u64) {
+ // Order matters on the load side too: read `peak` first, then
+ // `current`. If a `record_grow` is in flight, this order ensures
+ // that whenever we observe a fresh `current` we also observe at
+ // least the corresponding (or older) `peak` -- so the eventual
+ // clamp `max(peak, current)` produces the right value.
+ let peak_observed = self.peak_bytes.load(Ordering::Relaxed);
+ let current = self.current_bytes.load(Ordering::Relaxed);
+ let clamped_peak = peak_observed.max(current);
+ // Persist the clamped peak so future observers can never see a
+ // smaller peak than the one we just returned. fetch_max is
+ // monotonic, so this is a no-op when another thread (or this very
+ // thread on a previous call) already pushed peak past clamped_peak.
+ let stored_peak = self.peak_bytes.fetch_max(clamped_peak, Ordering::Relaxed);
+ (current, stored_peak.max(clamped_peak))
+ }
+
+ fn record_grow(&self, additional: usize) {
+ // We bump `current` first to avoid double-counting in `peak` if a
+ // concurrent reader sees a stale current and a fresh peak. The
+ // snapshot path's read-side `max(peak, current)` clamp closes the
+ // window where `current` is up-to-date but `peak` is not.
+ let now = self
+ .current_bytes
+ .fetch_add(additional as u64, Ordering::Relaxed)
+ .saturating_add(additional as u64);
+ // fetch_max is monotonic so concurrent grows still produce the right peak.
+ self.peak_bytes.fetch_max(now, Ordering::Relaxed);
+ }
+
+ fn record_shrink(&self, shrink: usize) {
+ // Saturating: if the inner pool ever calls shrink with a value larger
+ // than the running total we'd rather report 0 than wrap around.
+ let prev = self
+ .current_bytes
+ .fetch_sub(shrink as u64, Ordering::Relaxed);
+ if prev < shrink as u64 {
+ // Restore to zero. Concurrent grows that interleaved are fine --
+ // they'll just see a transient under-count.
+ self.current_bytes.store(0, Ordering::Relaxed);
+ }
+ }
+}
+
+impl MemoryPool for TrackingMemoryPool {
+ fn register(&self, consumer: &datafusion::execution::memory_pool::MemoryConsumer) {
+ self.inner.register(consumer);
+ }
+
+ fn unregister(&self, consumer: &datafusion::execution::memory_pool::MemoryConsumer) {
+ self.inner.unregister(consumer);
+ }
+
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ self.inner.grow(reservation, additional);
+ self.record_grow(additional);
+ }
+
+ fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+ self.inner.shrink(reservation, shrink);
+ self.record_shrink(shrink);
+ }
+
+ fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
+ self.inner.try_grow(reservation, additional)?;
+ self.record_grow(additional);
+ Ok(())
+ }
+
+ fn reserved(&self) -> usize {
+ self.inner.reserved()
+ }
+
+ fn memory_limit(&self) -> MemoryLimit {
+ self.inner.memory_limit()
+ }
+}
+
+/// Process-wide registry mapping a `SessionContext`'s native handle to its
+/// `TrackingMemoryPool`. Populated at session creation, drained at session
+/// close. We can't downcast `Arc` back to the concrete type
+/// without an `Any` bound (which the trait does not require), so instead we
+/// keep a parallel `Arc` indexed by the JNI handle Java already threads
+/// through every call.
+fn registry() -> &'static Mutex>> {
+ static REGISTRY: OnceLock>>> = OnceLock::new();
+ REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
+}
+
+pub fn register(handle: jlong, pool: Arc) {
+ registry()
+ .lock()
+ .expect("memory registry lock poisoned")
+ .insert(handle, pool);
+}
+
+pub fn lookup(handle: jlong) -> Option> {
+ registry()
+ .lock()
+ .expect("memory registry lock poisoned")
+ .get(&handle)
+ .cloned()
+}
+
+pub fn unregister(handle: jlong) {
+ registry()
+ .lock()
+ .expect("memory registry lock poisoned")
+ .remove(&handle);
+}
diff --git a/native/src/runtime_metrics.rs b/native/src/runtime_metrics.rs
new file mode 100644
index 0000000..e69410e
--- /dev/null
+++ b/native/src/runtime_metrics.rs
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tokio runtime metrics, gated behind the `runtime-metrics` Cargo feature.
+//!
+//! `tokio-metrics` only compiles when `--cfg tokio_unstable` is set, so the
+//! whole module is conditional on the feature flag. The feature-off variant
+//! still exists; `runtime_stats()` returns an `Err` whose message points at
+//! the rebuild command. The Java surface (`SessionContext.runtimeStats()`)
+//! is therefore stable across both build configurations.
+//!
+//! Field layout returned across the FFI boundary (in this exact order, all
+//! `i64`):
+//! 0 numWorkers
+//! 1 liveTasksCount
+//! 2 globalQueueDepth
+//! 3 elapsedNanos
+//! 4 totalBusyNanos
+//! 5 totalParkCount
+//! 6 totalPollsCount
+//! 7 totalNoopCount
+//! 8 totalStealCount
+//! 9 totalLocalScheduleCount
+//! 10 totalOverflowCount
+
+#[cfg(not(feature = "runtime-metrics"))]
+use crate::errors::JniResult;
+
+/// Number of i64 values in the snapshot array; kept here so the Java side and
+/// the feature-off stub agree on the layout.
+pub const STATS_FIELD_COUNT: usize = 11;
+
+#[cfg(feature = "runtime-metrics")]
+mod imp {
+ use std::sync::{Mutex, OnceLock};
+ use std::time::Duration;
+ use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
+
+ use super::STATS_FIELD_COUNT;
+ use crate::errors::JniResult;
+
+ /// `RuntimeMonitor::intervals().next()` returns *delta* metrics covering
+ /// the period since the previous call (or, on the very first call, since
+ /// the iterator was constructed). To present the documented
+ /// "totals since runtime start" semantic to Java callers, we own the
+ /// iterator process-wide and accumulate every field that's documented as
+ /// monotonic.
+ ///
+ /// Snapshot fields (`workers_count`, `live_tasks_count`,
+ /// `global_queue_depth`) are point-in-time and pass through unchanged.
+ struct RuntimeAccumulator {
+ intervals: RuntimeIntervals,
+ elapsed: Duration,
+ total_busy: Duration,
+ total_park_count: u64,
+ total_polls_count: u64,
+ total_noop_count: u64,
+ total_steal_count: u64,
+ total_local_schedule_count: u64,
+ total_overflow_count: u64,
+ }
+
+ static ACC: OnceLock> = OnceLock::new();
+
+ fn accumulator() -> &'static Mutex {
+ // Warm `crate::runtime()` first so the shared Tokio runtime exists
+ // and its OnceLock initializer has already populated ACC via
+ // `runtime_metrics::init`. We must NOT call `runtime()` from inside
+ // `ACC.get_or_init`: that would acquire ACC's once-init slot, then
+ // (on the very first call) recurse through `RT.get_or_init` ->
+ // `init()` -> `ACC.get_or_init`, which is same-thread reentrancy on
+ // the same OnceLock and deadlocks indefinitely.
+ let _ = crate::runtime();
+ // After `runtime()` returns, ACC is guaranteed populated. The
+ // defensive `get_or_init` covers a future build path that constructs
+ // the runtime without going through `crate::runtime` (none today);
+ // it is safe to invoke from here because no init lock is held.
+ ACC.get_or_init(build_accumulator)
+ }
+
+ fn build_accumulator() -> Mutex {
+ let handle = crate::runtime().handle().clone();
+ let monitor = RuntimeMonitor::new(&handle);
+ Mutex::new(RuntimeAccumulator {
+ intervals: monitor.intervals(),
+ elapsed: Duration::ZERO,
+ total_busy: Duration::ZERO,
+ total_park_count: 0,
+ total_polls_count: 0,
+ total_noop_count: 0,
+ total_steal_count: 0,
+ total_local_schedule_count: 0,
+ total_overflow_count: 0,
+ })
+ }
+
+ /// Eagerly construct the runtime-metrics accumulator. Called from
+ /// `crate::runtime()` immediately after the shared Tokio runtime is
+ /// created so the accumulator's interval baseline matches runtime start.
+ /// Without this, `RuntimeMonitor` would only start sampling at the first
+ /// `runtimeStats()` call, silently dropping every poll/busy nanosecond
+ /// that happened before that.
+ pub fn init(handle: &tokio::runtime::Handle) {
+ ACC.get_or_init(|| {
+ let monitor = RuntimeMonitor::new(handle);
+ Mutex::new(RuntimeAccumulator {
+ intervals: monitor.intervals(),
+ elapsed: Duration::ZERO,
+ total_busy: Duration::ZERO,
+ total_park_count: 0,
+ total_polls_count: 0,
+ total_noop_count: 0,
+ total_steal_count: 0,
+ total_local_schedule_count: 0,
+ total_overflow_count: 0,
+ })
+ });
+ }
+
+ pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> {
+ let mut acc = accumulator()
+ .lock()
+ .map_err(|_| "runtime-metrics accumulator lock poisoned")?;
+ let delta = acc
+ .intervals
+ .next()
+ .ok_or("tokio-metrics RuntimeMonitor returned no interval")?;
+ // Workers count and the point-in-time fields are not deltas; they are
+ // the runtime's current state. Pass through directly.
+ let workers_count = delta.workers_count;
+ let live_tasks_count = delta.live_tasks_count;
+ let global_queue_depth = delta.global_queue_depth;
+ // Accumulate the deltas.
+ acc.elapsed = acc.elapsed.saturating_add(delta.elapsed);
+ acc.total_busy = acc.total_busy.saturating_add(delta.total_busy_duration);
+ acc.total_park_count = acc.total_park_count.saturating_add(delta.total_park_count);
+ acc.total_polls_count = acc
+ .total_polls_count
+ .saturating_add(delta.total_polls_count);
+ acc.total_noop_count = acc.total_noop_count.saturating_add(delta.total_noop_count);
+ acc.total_steal_count = acc
+ .total_steal_count
+ .saturating_add(delta.total_steal_count);
+ acc.total_local_schedule_count = acc
+ .total_local_schedule_count
+ .saturating_add(delta.total_local_schedule_count);
+ acc.total_overflow_count = acc
+ .total_overflow_count
+ .saturating_add(delta.total_overflow_count);
+
+ Ok([
+ workers_count as i64,
+ live_tasks_count as i64,
+ global_queue_depth as i64,
+ i128_to_i64_saturating(acc.elapsed.as_nanos() as i128),
+ i128_to_i64_saturating(acc.total_busy.as_nanos() as i128),
+ acc.total_park_count as i64,
+ acc.total_polls_count as i64,
+ acc.total_noop_count as i64,
+ acc.total_steal_count as i64,
+ acc.total_local_schedule_count as i64,
+ acc.total_overflow_count as i64,
+ ])
+ }
+
+ fn i128_to_i64_saturating(v: i128) -> i64 {
+ v.clamp(i64::MIN as i128, i64::MAX as i128) as i64
+ }
+}
+
+#[cfg(feature = "runtime-metrics")]
+pub use imp::{init, runtime_stats};
+
+/// No-op when the `runtime-metrics` feature is off: there is no monitor to
+/// initialise. Kept as a regular function (rather than a `cfg` annotation at
+/// every call site) so the runtime construction path stays unconditional.
+#[cfg(not(feature = "runtime-metrics"))]
+pub fn init(_handle: &tokio::runtime::Handle) {}
+
+#[cfg(not(feature = "runtime-metrics"))]
+pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> {
+ Err(
+ "datafusion-jni was built without the `runtime-metrics` Cargo feature; \
+ rebuild the native crate with \
+ `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics` \
+ to enable SessionContext.runtimeStats"
+ .into(),
+ )
+}