Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make changes needed to have all host allocations go through a single plugable API #1778

Draft
wants to merge 3 commits into
base: branch-24.06
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/cpp/src/NativeParquetJni.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -791,7 +791,7 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumCol
}

JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_serializeThriftFile(
JNIEnv* env, jclass, jlong handle, jobject host_memory_allocator)
JNIEnv* env, jclass, jlong handle)
{
CUDF_FUNC_RANGE();
try {
Expand All @@ -807,7 +807,7 @@ JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_seriali
transportOut->getBuffer(&buf_ptr, &buf_size);

// 12 extra is for the MAGIC thrift_footer length MAGIC
jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false, host_memory_allocator);
jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false);
uint8_t* ret_addr = reinterpret_cast<uint8_t*>(cudf::jni::get_host_buffer_address(env, ret));
ret_addr[0] = 'P';
ret_addr[1] = 'A';
Expand Down
5 changes: 3 additions & 2 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1758,9 +1758,10 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
auto const tid = static_cast<long>(pthread_self());
auto const thread = threads.find(tid);
if (thread != threads.end()) {
log_status("DEALLOC", tid, thread->second.task_id, thread->second.state);
log_status(
is_for_cpu ? "CPU_DEALLOC" : "DEALLOC", tid, thread->second.task_id, thread->second.state);
} else {
log_status("DEALLOC", tid, -2, thread_state::UNKNOWN);
log_status(is_for_cpu ? "CPU_DEALLOC" : "DEALLOC", tid, -2, thread_state::UNKNOWN);
}

for (auto& [thread_id, t_state] : threads) {
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,12 +103,8 @@ private ParquetFooter(long handle) {
* footer file. This will include the MAGIC PAR1 at the beginning and end and also the
* length of the footer just before the PAR1 at the end.
*/
public HostMemoryBuffer serializeThriftFile(HostMemoryAllocator hostMemoryAllocator) {
return serializeThriftFile(nativeHandle, hostMemoryAllocator);
}

public HostMemoryBuffer serializeThriftFile() {
return serializeThriftFile(DefaultHostMemoryAllocator.get());
return serializeThriftFile(nativeHandle);
}

/**
Expand Down Expand Up @@ -236,6 +232,5 @@ private static native long readAndFilter(long address, long length,

private static native int getNumColumns(long nativeHandle);

private static native HostMemoryBuffer serializeThriftFile(long nativeHandle,
HostMemoryAllocator hostMemoryAllocator);
private static native HostMemoryBuffer serializeThriftFile(long nativeHandle);
}
61 changes: 52 additions & 9 deletions src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,18 +23,25 @@
import ai.rapids.cudf.RmmException;
import ai.rapids.cudf.RmmTrackingResourceAdaptor;

import java.util.Arrays;
import java.util.Map;

/**
* Initialize RMM in ways that are specific to Spark.
*/
public class RmmSpark {

public enum OomInjectionType {
CPU_OR_GPU,
CPU,
GPU;
CPU_OR_GPU(0),
CPU(1),
GPU(2);

private final int id;

OomInjectionType(int id) {
this.id = id;
}

public int getId() {
return id;
}
}

private static volatile SparkResourceAdaptor sra = null;
Expand Down Expand Up @@ -436,6 +443,7 @@ public static void forceRetryOOM(long threadId) {
forceRetryOOM(threadId, 1);
}

//TODO remove this API once we know no one is calling it.
/**
* Force the thread with the given ID to throw a GpuRetryOOM or CpuRetryOOM on their next
* allocation attempt, depending on the type of allocation being done.
Expand All @@ -454,8 +462,25 @@ public static void forceRetryOOM(long threadId, int numOOMs, int oomMode, int sk
}
}

/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public static void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
sra.forceRetryOOM(threadId, numOOMs, oom, skipCount);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
}
}
}

public static void forceRetryOOM(long threadId, int numOOMs) {
forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0);
forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0);
}

/**
Expand All @@ -467,6 +492,7 @@ public static void forceSplitAndRetryOOM(long threadId) {
forceSplitAndRetryOOM(threadId, 1);
}

// TODO remove this API one no one calls it
/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM or CpuSplitAndRetryOOm
* on their next allocation attempt, depending on the allocation being done.
Expand All @@ -485,8 +511,25 @@ public static void forceSplitAndRetryOOM(long threadId, int numOOMs, int oomMode
}
}

/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public static void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
sra.forceSplitAndRetryOOM(threadId, numOOMs, oom, skipCount);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
}
}
}

public static void forceSplitAndRetryOOM(long threadId, int numOOMs) {
forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0);
forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,6 +184,19 @@ public void doneWaitingOnPool(long threadId) {
doneWaitingOnPool(getHandle(), threadId);
}

/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
validateOOMInjectionParams(numOOMs, oom.getId(), skipCount);
forceRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount);
}

// TODO this should be removed one we move the tests over to use the enum.
/**
* Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
Expand All @@ -203,6 +216,19 @@ private void validateOOMInjectionParams(int numOOMs, int oomMode, int skipCount)
"non-negative oomMode<" + OomInjectionType.values().length + " expected: actual=" + oomMode;
}

/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown
* @param oom the type of OOM ot inject
* @param skipCount the number of times a matching allocation is skipped before injecting the first OOM
*/
public void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) {
validateOOMInjectionParams(numOOMs, oom.getId(), skipCount);
forceSplitAndRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount);
}

// TODO this should be removed one we move the tests over to use the enum.
/**
* Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt.
* @param threadId the ID of the thread to throw the exception (not java thread id).
Expand Down Expand Up @@ -251,7 +277,6 @@ public long getAndResetComputeTimeLostToRetry(long taskId) {
return getAndResetComputeTimeLostToRetry(getHandle(), taskId);
}


/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
* with testing.
Expand Down