Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@
# specific language governing permissions and limitations
# under the License.

.PHONY: all native jvm test clean tpch-data
.PHONY: all native native-runtime-metrics jvm test clean tpch-data

all: native jvm

native:
cd native && cargo build

# Build the native crate with the `runtime-metrics` Cargo feature enabled.
# Requires `--cfg tokio_unstable` because tokio-metrics gates its API there.
# Default `make native` does not pull this in; callers who need
# SessionContext.runtimeStats() pick this target explicitly.
native-runtime-metrics:
cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics

jvm:
./mvnw package -DskipTests

Expand Down
60 changes: 60 additions & 0 deletions core/src/main/java/org/apache/datafusion/MemoryUsage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

/**
* Snapshot of session-wide memory usage from {@link SessionContext#memoryUsage()}. Returns the
* total bytes currently reserved against the session's {@code MemoryPool} and the peak observed
* since the session was created.
*
* <p>Multi-tenant attribution: place each tenant in its own {@link SessionContext}. Sessions in
* DataFusion are cheap; one per tenant is the standard pattern. Within a single session, the
* snapshot is the sum across all in-flight queries' operator reservations -- see {@link
* SessionContext#memoryUsage()} for the precise definition of what is and is not counted.
*
* <p>Instances are immutable.
*/
public final class MemoryUsage {
private final long currentBytes;
private final long peakBytes;

public MemoryUsage(long currentBytes, long peakBytes) {
this.currentBytes = currentBytes;
this.peakBytes = peakBytes;
}

/** Bytes currently reserved against this session's {@code MemoryPool}. */
public long currentBytes() {
return currentBytes;
}

/**
* Maximum value of {@link #currentBytes()} observed since the session was created. Monotonic:
* never decreases for a given session.
*/
public long peakBytes() {
return peakBytes;
}

@Override
public String toString() {
return "MemoryUsage{currentBytes=" + currentBytes + ", peakBytes=" + peakBytes + "}";
}
}
153 changes: 153 additions & 0 deletions core/src/main/java/org/apache/datafusion/RuntimeStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

/**
* Snapshot of the underlying Tokio runtime from {@link SessionContext#runtimeStats()}. Mirrors the
* subset of {@code tokio_metrics::RuntimeMetrics} fields that node-level dashboards typically
* surface (worker count, busy time, queue depth, etc.).
*
* <p>Instances are immutable. The snapshot is process-wide rather than per-session because the JNI
* library drives a single shared multi-threaded Tokio runtime; gating on the {@code SessionContext}
* only ensures the caller still has a live handle.
*
* <p>Requires the {@code runtime-metrics} Cargo feature on the native crate, plus {@code
* RUSTFLAGS="--cfg tokio_unstable"} at build time.
*/
public final class RuntimeStats {
private final int numWorkers;
private final long liveTasksCount;
private final long globalQueueDepth;
private final long elapsedNanos;
private final long totalBusyNanos;
private final long totalParkCount;
private final long totalPollsCount;
private final long totalNoopCount;
private final long totalStealCount;
private final long totalLocalScheduleCount;
private final long totalOverflowCount;

public RuntimeStats(
int numWorkers,
long liveTasksCount,
long globalQueueDepth,
long elapsedNanos,
long totalBusyNanos,
long totalParkCount,
long totalPollsCount,
long totalNoopCount,
long totalStealCount,
long totalLocalScheduleCount,
long totalOverflowCount) {
this.numWorkers = numWorkers;
this.liveTasksCount = liveTasksCount;
this.globalQueueDepth = globalQueueDepth;
this.elapsedNanos = elapsedNanos;
this.totalBusyNanos = totalBusyNanos;
this.totalParkCount = totalParkCount;
this.totalPollsCount = totalPollsCount;
this.totalNoopCount = totalNoopCount;
this.totalStealCount = totalStealCount;
this.totalLocalScheduleCount = totalLocalScheduleCount;
this.totalOverflowCount = totalOverflowCount;
}

/** Number of OS worker threads driving the multi-threaded Tokio runtime. */
public int numWorkers() {
return numWorkers;
}

/** Number of tasks currently scheduled (alive) on the runtime. */
public long liveTasksCount() {
return liveTasksCount;
}

/** Tasks waiting in the global injection queue, not yet picked up by a worker. */
public long globalQueueDepth() {
return globalQueueDepth;
}

/** Wall-clock nanoseconds covered by this snapshot's interval (since runtime start). */
public long elapsedNanos() {
return elapsedNanos;
}

/** Total nanoseconds workers spent doing work (sum across workers). */
public long totalBusyNanos() {
return totalBusyNanos;
}

/** Times a worker has parked itself (gone idle waiting for work). */
public long totalParkCount() {
return totalParkCount;
}

/** Total task polls completed across workers. */
public long totalPollsCount() {
return totalPollsCount;
}

/** Times a worker unparked but found no work (false wakeup). */
public long totalNoopCount() {
return totalNoopCount;
}

/** Times a worker successfully stole tasks from another worker. */
public long totalStealCount() {
return totalStealCount;
}

/** Tasks scheduled into a worker's local queue. */
public long totalLocalScheduleCount() {
return totalLocalScheduleCount;
}

/** Times a worker's local queue overflowed and pushed tasks to the injector. */
public long totalOverflowCount() {
return totalOverflowCount;
}

@Override
public String toString() {
return "RuntimeStats{numWorkers="
+ numWorkers
+ ", liveTasksCount="
+ liveTasksCount
+ ", globalQueueDepth="
+ globalQueueDepth
+ ", elapsedNanos="
+ elapsedNanos
+ ", totalBusyNanos="
+ totalBusyNanos
+ ", totalParkCount="
+ totalParkCount
+ ", totalPollsCount="
+ totalPollsCount
+ ", totalNoopCount="
+ totalNoopCount
+ ", totalStealCount="
+ totalStealCount
+ ", totalLocalScheduleCount="
+ totalLocalScheduleCount
+ ", totalOverflowCount="
+ totalOverflowCount
+ "}";
}
}
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/datafusion/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,73 @@ public DataFrame fromProto(byte[] planBytes) {
return new DataFrame(dfHandle);
}

/**
* Snapshot the session's memory pool: bytes currently held and the peak observed since this
* session was created. Thread-safe; can be polled while queries run.
*
* <p>For multi-tenant attribution, place each tenant in its own {@link SessionContext}. Within a
* single session, the snapshot is the sum across all in-flight queries -- there is no
* per-DataFrame breakdown today.
*
* <p>The session's {@code MemoryPool} is wrapped transparently with a tracking adapter at
* construction time; the wrapper layers on top of whatever pool {@link
* SessionContextBuilder#memoryLimit(long, double)} produced (or DataFusion's default unbounded
* pool) and does not change pool semantics (limits, eviction, spilling).
*
* <p><b>What this counts:</b> bytes reserved against the {@code MemoryPool} -- operator state for
* sorts, hash joins, aggregates, repartition buffers, and anything else that uses DataFusion's
* {@code MemoryReservation} machinery during execution.
*
* <p><b>What this does <i>not</i> count:</b> memory held outside the pool, including record-batch
* buffers materialised by {@link DataFrame#cache()} (stored in an in-memory {@code MemTable} as
* plain {@code Vec<RecordBatch>} with no reservation), record-batch buffers that have crossed the
* FFI boundary into Arrow's Java allocator, and JVM-side allocations. Operator-level reservations
* are released as the plan unwinds, so a query that runs to completion typically returns {@code
* currentBytes} to ~0 even if the result set is large.
*
* @throws IllegalStateException if this context is closed.
* @throws RuntimeException if the native side has not registered a tracker for this handle
* (should not happen in practice -- tracker registration is done by the constructor).
*/
public MemoryUsage memoryUsage() {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
long[] values = memoryUsageNative(nativeHandle);
return new MemoryUsage(values[0], values[1]);
}

/**
* Snapshot operational counters from the underlying Tokio runtime: worker count, busy time, queue
* depth, etc. Thread-safe; can be polled while queries run.
*
* <p>The runtime is process-wide rather than per-session because the JNI library drives a single
* shared multi-threaded Tokio runtime. The {@link SessionContext} handle is checked only to
* ensure the caller still has a live session; the values returned are not session-specific.
*
* <p>Requires the {@code runtime-metrics} Cargo feature on the native crate (off by default).
* Rebuild with:
*
* <pre>{@code
* RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
* }</pre>
*
* <p>If invoked against a native binary built without the feature, this method throws {@link
* RuntimeException} with a message pointing at the rebuild command.
*
* @throws IllegalStateException if this context is closed.
* @throws RuntimeException if the native crate was built without the {@code runtime-metrics}
* feature.
*/
public RuntimeStats runtimeStats() {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
long[] s = runtimeStatsNative(nativeHandle);
return new RuntimeStats(
(int) s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7], s[8], s[9], s[10]);
}

/**
* Return the Arrow {@link Schema} of a registered table. Transferred via Arrow IPC; no {@link
* org.apache.arrow.memory.BufferAllocator} is required because a schema carries no buffer data.
Expand Down Expand Up @@ -524,6 +591,10 @@ public void close() {

private static native String getOptionNative(long handle, String key);

private static native long[] memoryUsageNative(long handle);

private static native long[] runtimeStatsNative(long handle);

private static native void registerParquetWithOptions(
long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes);

Expand Down
Loading