Skip to content

Commit

Permalink
Make changes needed to have all host allocations go through a single …
Browse files Browse the repository at this point in the history
…plugable API

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 committed Feb 12, 2024
1 parent 23ca28a commit a6c6d95
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/main/cpp/src/NativeParquetJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumCol
}

JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_serializeThriftFile(
JNIEnv* env, jclass, jlong handle, jobject host_memory_allocator)
JNIEnv* env, jclass, jlong handle)
{
CUDF_FUNC_RANGE();
try {
Expand All @@ -807,7 +807,7 @@ JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_seriali
transportOut->getBuffer(&buf_ptr, &buf_size);

// 12 extra is for the MAGIC thrift_footer length MAGIC
jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false, host_memory_allocator);
jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false);
uint8_t* ret_addr = reinterpret_cast<uint8_t*>(cudf::jni::get_host_buffer_address(env, ret));
ret_addr[0] = 'P';
ret_addr[1] = 'A';
Expand Down
4 changes: 2 additions & 2 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1758,9 +1758,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
auto const tid = static_cast<long>(pthread_self());
auto const thread = threads.find(tid);
if (thread != threads.end()) {
log_status("DEALLOC", tid, thread->second.task_id, thread->second.state);
log_status(is_for_cpu? "CPU_DEALLOC" : "DEALLOC", tid, thread->second.task_id, thread->second.state);
} else {
log_status("DEALLOC", tid, -2, thread_state::UNKNOWN);
log_status(is_for_cpu? "CPU_DEALLOC" : "DEALLOC", tid, -2, thread_state::UNKNOWN);
}

for (auto& [thread_id, t_state] : threads) {
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,8 @@ private ParquetFooter(long handle) {
* footer file. This will include the MAGIC PAR1 at the beginning and end and also the
* length of the footer just before the PAR1 at the end.
*/
public HostMemoryBuffer serializeThriftFile(HostMemoryAllocator hostMemoryAllocator) {
return serializeThriftFile(nativeHandle, hostMemoryAllocator);
}

public HostMemoryBuffer serializeThriftFile() {
return serializeThriftFile(DefaultHostMemoryAllocator.get());
return serializeThriftFile(nativeHandle);
}

/**
Expand Down Expand Up @@ -236,6 +232,5 @@ private static native long readAndFilter(long address, long length,

private static native int getNumColumns(long nativeHandle);

private static native HostMemoryBuffer serializeThriftFile(long nativeHandle,
HostMemoryAllocator hostMemoryAllocator);
private static native HostMemoryBuffer serializeThriftFile(long nativeHandle);
}
61 changes: 52 additions & 9 deletions src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,18 +23,25 @@
import ai.rapids.cudf.RmmException;
import ai.rapids.cudf.RmmTrackingResourceAdaptor;

import java.util.Arrays;
import java.util.Map;

/**
* Initialize RMM in ways that are specific to Spark.
*/
public class RmmSpark {

public enum OomInjectionType {
CPU_OR_GPU,
CPU,
GPU;
CPU_OR_GPU(0),
CPU(1),
GPU(2);

private final int id;

OomInjectionType(int id) {
this.id = id;
}

public int getId() {
return id;
}
}

private static volatile SparkResourceAdaptor sra = null;
Expand Down Expand Up @@ -436,6 +443,7 @@ public static void forceRetryOOM(long threadId) {
forceRetryOOM(threadId, 1);
}

//TODO remove this API once we know no one is calling it.
/**
* Force the thread with the given ID to throw a GpuRetryOOM or CpuRetryOOM on their next
* allocation attempt, depending on the type of allocation being done.
Expand All @@ -454,8 +462,25 @@ public static void forceRetryOOM(long threadId, int numOOMs, int oomMode, int sk
}
}

/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public static void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
sra.forceRetryOOM(threadId, numOOMs, oom, skipCount);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
}
}
}

public static void forceRetryOOM(long threadId, int numOOMs) {
forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0);
forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0);
}

/**
Expand All @@ -467,6 +492,7 @@ public static void forceSplitAndRetryOOM(long threadId) {
forceSplitAndRetryOOM(threadId, 1);
}

// TODO remove this API one no one calls it
/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM or CpuSplitAndRetryOOm
* on their next allocation attempt, depending on the allocation being done.
Expand All @@ -485,8 +511,25 @@ public static void forceSplitAndRetryOOM(long threadId, int numOOMs, int oomMode
}
}

/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public static void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
sra.forceSplitAndRetryOOM(threadId, numOOMs, oom, skipCount);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
}
}
}

public static void forceSplitAndRetryOOM(long threadId, int numOOMs) {
forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0);
forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,6 +184,19 @@ public void doneWaitingOnPool(long threadId) {
doneWaitingOnPool(getHandle(), threadId);
}

/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
validateOOMInjectionParams(numOOMs, oom.getId(), skipCount);
forceRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount);
}

// TODO this should be removed one we move the tests over to use the enum.
/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
Expand All @@ -203,6 +216,19 @@ private void validateOOMInjectionParams(int numOOMs, int oomMode, int skipCount)
"non-negative oomMode<" + OomInjectionType.values().length + " expected: actual=" + oomMode;
}

/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
validateOOMInjectionParams(numOOMs, oom.getId(), skipCount);
forceSplitAndRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount);
}

// TODO this should be removed one we move the tests over to use the enum.
/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
Expand Down Expand Up @@ -251,7 +277,6 @@ public long getAndResetComputeTimeLostToRetry(long taskId) {
return getAndResetComputeTimeLostToRetry(getHandle(), taskId);
}


/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
* with testing.
Expand Down

0 comments on commit a6c6d95

Please sign in to comment.