Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions native-engine/auron-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,6 @@ pub struct JniBridge<'a> {
pub method_getTaskOnHeapSpillManager_ret: ReturnType,
pub method_isTaskRunning: JStaticMethodID,
pub method_isTaskRunning_ret: ReturnType,
pub method_isDriverSide: JStaticMethodID,
pub method_isDriverSide_ret: ReturnType,
pub method_openFileAsDataInputWrapper: JStaticMethodID,
pub method_openFileAsDataInputWrapper_ret: ReturnType,
pub method_createFileAsDataOutputWrapper: JStaticMethodID,
Expand Down Expand Up @@ -626,7 +624,6 @@ impl<'a> JniBridge<'a> {
method_getTaskOnHeapSpillManager_ret: ReturnType::Object,
method_isTaskRunning: env.get_static_method_id(class, "isTaskRunning", "()Z")?,
method_isTaskRunning_ret: ReturnType::Primitive(Primitive::Boolean),
method_isDriverSide: env.get_static_method_id(class, "isDriverSide", "()Z")?,
method_openFileAsDataInputWrapper: env.get_static_method_id(
class,
"openFileAsDataInputWrapper",
Expand All @@ -639,7 +636,6 @@ impl<'a> JniBridge<'a> {
"(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/auron/hadoop/fs/FSDataOutputWrapper;",
)?,
method_createFileAsDataOutputWrapper_ret: ReturnType::Object,
method_isDriverSide_ret: ReturnType::Primitive(Primitive::Boolean),
method_getDirectMemoryUsed: env.get_static_method_id(
class,
"getDirectMemoryUsed",
Expand Down
3 changes: 2 additions & 1 deletion native-engine/datafusion-ext-plans/src/memmgr/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ fn spill_compression_codec() -> &'static str {
}

pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result<Box<dyn Spill>> {
if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? {
if !is_jni_bridge_inited() {
// is driver
Ok(Box::new(FileSpill::try_new(spill_metrics)?))
} else {
// use on heap spill if on-heap memory is available, otherwise use file spill
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ public static boolean isTaskRunning() {
return !tc.isCompleted() && !tc.isInterrupted();
}

public static boolean isDriverSide() {
TaskContext tc = getTaskContext();
return tc == null;
}

public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception {
// the path is a URI string, so we need to convert it to a URI object, ref:
// org.apache.spark.paths.SparkPath.toPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class OnHeapSpillManager(taskContext: TaskContext)
*/
@SuppressWarnings(Array("unused"))
def isOnHeapAvailable: Boolean = {
// if driver, tc always null.
if (taskContext == null) {
return false
}
val memoryPool = OnHeapSpillManagerHelper.getOnHeapExecutionMemoryPool
val memoryUsed = memoryPool.memoryUsed
val memoryFree = memoryPool.memoryFree
Expand Down