diff --git a/src/main/cpp/src/NativeParquetJni.cpp b/src/main/cpp/src/NativeParquetJni.cpp index 9c8674e800..fc0a2e4983 100644 --- a/src/main/cpp/src/NativeParquetJni.cpp +++ b/src/main/cpp/src/NativeParquetJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -791,7 +791,7 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumCol } JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_serializeThriftFile( - JNIEnv* env, jclass, jlong handle, jobject host_memory_allocator) + JNIEnv* env, jclass, jlong handle) { CUDF_FUNC_RANGE(); try { @@ -807,7 +807,7 @@ JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_seriali transportOut->getBuffer(&buf_ptr, &buf_size); // 12 extra is for the MAGIC thrift_footer length MAGIC - jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false, host_memory_allocator); + jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false); uint8_t* ret_addr = reinterpret_cast(cudf::jni::get_host_buffer_address(env, ret)); ret_addr[0] = 'P'; ret_addr[1] = 'A'; diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index 2df1c7ea77..19312ecd4b 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -1758,9 +1758,10 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { auto const tid = static_cast(pthread_self()); auto const thread = threads.find(tid); if (thread != threads.end()) { - log_status("DEALLOC", tid, thread->second.task_id, thread->second.state); + log_status( + is_for_cpu ? "CPU_DEALLOC" : "DEALLOC", tid, thread->second.task_id, thread->second.state); } else { - log_status("DEALLOC", tid, -2, thread_state::UNKNOWN); + log_status(is_for_cpu ? "CPU_DEALLOC" : "DEALLOC", tid, -2, thread_state::UNKNOWN); } for (auto& [thread_id, t_state] : threads) { diff --git a/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java index 681a01d81d..5f001b8c01 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,12 +103,8 @@ private ParquetFooter(long handle) { * footer file. This will include the MAGIC PAR1 at the beginning and end and also the * length of the footer just before the PAR1 at the end. */ - public HostMemoryBuffer serializeThriftFile(HostMemoryAllocator hostMemoryAllocator) { - return serializeThriftFile(nativeHandle, hostMemoryAllocator); - } - public HostMemoryBuffer serializeThriftFile() { - return serializeThriftFile(DefaultHostMemoryAllocator.get()); + return serializeThriftFile(nativeHandle); } /** @@ -236,6 +232,5 @@ private static native long readAndFilter(long address, long length, private static native int getNumColumns(long nativeHandle); - private static native HostMemoryBuffer serializeThriftFile(long nativeHandle, - HostMemoryAllocator hostMemoryAllocator); + private static native HostMemoryBuffer serializeThriftFile(long nativeHandle); } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java index e171894601..40d50db501 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,18 +23,25 @@ import ai.rapids.cudf.RmmException; import ai.rapids.cudf.RmmTrackingResourceAdaptor; -import java.util.Arrays; -import java.util.Map; - /** * Initialize RMM in ways that are specific to Spark. */ public class RmmSpark { public enum OomInjectionType { - CPU_OR_GPU, - CPU, - GPU; + CPU_OR_GPU(0), + CPU(1), + GPU(2); + + private final int id; + + OomInjectionType(int id) { + this.id = id; + } + + public int getId() { + return id; + } } private static volatile SparkResourceAdaptor sra = null; @@ -436,6 +443,7 @@ public static void forceRetryOOM(long threadId) { forceRetryOOM(threadId, 1); } + //TODO remove this API once we know no one is calling it. /** * Force the thread with the given ID to throw a GpuRetryOOM or CpuRetryOOM on their next * allocation attempt, depending on the type of allocation being done. @@ -454,8 +462,25 @@ public static void forceRetryOOM(long threadId, int numOOMs, int oomMode, int sk } } + /** + * Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt. + * @param threadId the ID of the thread to throw the exception (not java thread id). + * @param numOOMs the number of times the GpuRetryOOM should be thrown + * @param oom the type of OOM ot inject + * @param skipCount the number of times a matching allocation is skipped before injecting the first OOM + */ + public static void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) { + synchronized (Rmm.class) { + if (sra != null && sra.isOpen()) { + sra.forceRetryOOM(threadId, numOOMs, oom, skipCount); + } else { + throw new IllegalStateException("RMM has not been configured for OOM injection"); + } + } + } + public static void forceRetryOOM(long threadId, int numOOMs) { - forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0); + forceRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0); } /** @@ -467,6 +492,7 @@ public static void forceSplitAndRetryOOM(long threadId) { forceSplitAndRetryOOM(threadId, 1); } + // TODO remove this API one no one calls it /** * Force the thread with the given ID to throw a GpuSplitAndRetryOOM or CpuSplitAndRetryOOm * on their next allocation attempt, depending on the allocation being done. @@ -485,8 +511,25 @@ public static void forceSplitAndRetryOOM(long threadId, int numOOMs, int oomMode } } + /** + * Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt. + * @param threadId the ID of the thread to throw the exception (not java thread id). + * @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown + * @param oom the type of OOM ot inject + * @param skipCount the number of times a matching allocation is skipped before injecting the first OOM + */ + public static void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) { + synchronized (Rmm.class) { + if (sra != null && sra.isOpen()) { + sra.forceSplitAndRetryOOM(threadId, numOOMs, oom, skipCount); + } else { + throw new IllegalStateException("RMM has not been configured for OOM injection"); + } + } + } + public static void forceSplitAndRetryOOM(long threadId, int numOOMs) { - forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU.ordinal(), 0); + forceSplitAndRetryOOM(threadId, numOOMs, OomInjectionType.CPU_OR_GPU, 0); } /** diff --git a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java index d766c34230..e3fa791bfa 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -184,6 +184,19 @@ public void doneWaitingOnPool(long threadId) { doneWaitingOnPool(getHandle(), threadId); } + /** + * Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt. + * @param threadId the ID of the thread to throw the exception (not java thread id). + * @param numOOMs the number of times the GpuRetryOOM should be thrown + * @param oom the type of OOM ot inject + * @param skipCount the number of times a matching allocation is skipped before injecting the first OOM + */ + public void forceRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) { + validateOOMInjectionParams(numOOMs, oom.getId(), skipCount); + forceRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount); + } + + // TODO this should be removed one we move the tests over to use the enum. /** * Force the thread with the given ID to throw a GpuRetryOOM on their next allocation attempt. * @param threadId the ID of the thread to throw the exception (not java thread id). @@ -203,6 +216,19 @@ private void validateOOMInjectionParams(int numOOMs, int oomMode, int skipCount) "non-negative oomMode<" + OomInjectionType.values().length + " expected: actual=" + oomMode; } + /** + * Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt. + * @param threadId the ID of the thread to throw the exception (not java thread id). + * @param numOOMs the number of times the GpuSplitAndRetryOOM should be thrown + * @param oom the type of OOM ot inject + * @param skipCount the number of times a matching allocation is skipped before injecting the first OOM + */ + public void forceSplitAndRetryOOM(long threadId, int numOOMs, OomInjectionType oom, int skipCount) { + validateOOMInjectionParams(numOOMs, oom.getId(), skipCount); + forceSplitAndRetryOOM(getHandle(), threadId, numOOMs, oom.getId(), skipCount); + } + + // TODO this should be removed one we move the tests over to use the enum. /** * Force the thread with the given ID to throw a GpuSplitAndRetryOOM on their next allocation attempt. * @param threadId the ID of the thread to throw the exception (not java thread id). @@ -251,7 +277,6 @@ public long getAndResetComputeTimeLostToRetry(long taskId) { return getAndResetComputeTimeLostToRetry(getHandle(), taskId); } - /** * Called before doing an allocation on the CPU. This could throw an injected exception to help * with testing. diff --git a/thirdparty/cudf b/thirdparty/cudf index 630c885001..0a1a6ced95 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit 630c885001b679cb16ee997c0249b9c69212f4d1 +Subproject commit 0a1a6ced951db54c0930c0bb16e3133daf319d21