Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update host memory allocation to use new Raw alloc APIs #10412

Draft
wants to merge 12 commits into
base: branch-24.06
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.shims.{GpuTypeShims, ShimUnaryExecNode}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -634,7 +635,11 @@ class RowToColumnarIterator(
while (rowIter.hasNext &&
(rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) {
val row = rowIter.next()
byteCount += converters.convert(row, builders)
withRetryNoSplit {
// Sadly there is no good way to make the converters spillable in-between each use
// https://github.com/NVIDIA/spark-rapids/issues/8887
byteCount += converters.convert(row, builders)
}
rowCount += 1
}

Expand All @@ -655,6 +660,9 @@ class RowToColumnarIterator(
val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN,
opTime)) { _ =>
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
// Same problem here ideally we will need a way to make the builder spillable before
// going down this path.
// https://github.com/NVIDIA/spark-rapids/issues/8887
builders.tryBuild(rowCount)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,12 +82,12 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
synchronized {
currentNonPinnedAllocated += amount
}
Some(HostMemoryBuffer.allocate(amount, false))
Some(HostMemoryBuffer.allocateRaw(amount))
} else {
synchronized {
if ((currentNonPinnedAllocated + amount) <= nonPinnedLimit) {
currentNonPinnedAllocated += amount
Some(HostMemoryBuffer.allocate(amount, false))
Some(HostMemoryBuffer.allocateRaw(amount))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
Array(1), partValues.take(1), partSchema, maxGpuColumnSizeBytes)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
withResource(resultBatchIter) { _ =>
assertThrows[GpuSplitAndRetryOOM] {
resultBatchIter.next()
Expand All @@ -87,7 +87,7 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
// Assert that the final count of rows matches expected batch
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand All @@ -55,7 +55,7 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class CsvScanRetrySuite extends RmmSparkRetrySuiteBase {
StructField("b", IntegerType))))
val opts = CSVOptions.builder().hasHeader(false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val table = CSVPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "CSV", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// this forces a retry on the copy of the host column to a device column
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
Expand All @@ -79,7 +79,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// we mock things this way due to code generation issues with mockito.
// when we add a table we have
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
rapidsBufferSpy = spy(res.asInstanceOf[RapidsBuffer])
rapidsBufferSpy
}
Expand All @@ -93,7 +93,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assertResult(0)(getAndResetNumRetryThrowCurrentTask)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
Expand Down Expand Up @@ -122,7 +122,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// we mock things this way due to code generation issues with mockito.
// when we add a table we have
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
rapidsBufferSpy = spy(res.asInstanceOf[RapidsBuffer])
// at this point we have created a buffer in the Spill Framework
// lets spill it
Expand All @@ -139,7 +139,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assertResult(0)(getAndResetNumRetryThrowCurrentTask)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
Expand Down Expand Up @@ -169,7 +169,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(1),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assertThrows[GpuSplitAndRetryOOM] {
myIter.next()
}
Expand All @@ -191,7 +191,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
assert(ctriter.hasNext)
// this forces a retry on the allocation of the combined offsets/data buffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
RmmSpark.OomInjectionType.CPU, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
Expand All @@ -217,7 +217,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the combined offsets/data buffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
RmmSpark.OomInjectionType.CPU, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
Expand All @@ -228,4 +228,4 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,11 @@ class GpuCoalesceBatchesRetrySuite
def injectError(injectRetry: Int, injectSplitAndRetry: Int): Unit = {
if (injectRetry > 0) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
}
if (injectSplitAndRetry > 0) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
gpuSorter,
singleBatch = false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
while (eachBatchIter.hasNext) {
var pos = 0
var curValue = 0
Expand All @@ -236,7 +236,7 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
gpuSorter,
singleBatch = false)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assertThrows[GpuSplitAndRetryOOM] {
eachBatchIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class HashAggregateRetrySuite
test("computeAndAggregate reduction with retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -135,7 +135,7 @@ class HashAggregateRetrySuite
test("computeAndAggregate reduction with two retries") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -165,7 +165,7 @@ class HashAggregateRetrySuite
test("computeAndAggregate group by with retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -199,7 +199,7 @@ class HashAggregateRetrySuite
test("computeAndAggregate reduction with split and retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -218,7 +218,7 @@ class HashAggregateRetrySuite
test("computeAndAggregate group by with split retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -254,7 +254,7 @@ class HashAggregateRetrySuite
val groupByBatch = buildGroupByBatch()
// we force a split because that would cause us to compute two aggs
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = doGroupBy(groupByBatch, forceMerge = true)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HostColumnToGpuRetrySuite extends RmmSparkRetrySuiteBase {
builder.copyColumnar(arrowColumn, 0, NUM_ROWS)
}
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
builder.tryBuild(NUM_ROWS)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class JsonScanRetrySuite extends RmmSparkRetrySuiteBase {
StructField("b", IntegerType))))
val opts = JSONOptions.builder().withLines(true).build()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val table = JsonPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "JSON", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
var curValue = offset
var pos = 0
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assert(topNIter.hasNext)
withResource(topNIter.next()) { scb =>
withResource(scb.getColumnarBatch()) { cb =>
Expand Down Expand Up @@ -83,7 +83,7 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
var leftRows = if (limit > totalRows) totalRows - offset else limit - offset
var curValue = offset
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
while(limitIter.hasNext) {
var pos = 0
withResource(limitIter.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {
closeOnExcept(sb) { _ =>
if (forceRetry) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
}
}
boundProjectList.projectAndCloseWithRetrySingleBatch(sb)
Expand Down Expand Up @@ -119,7 +119,7 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {
val cb = buildBatch()
if (forceRetry) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
}
val batchSeq = GpuFilter.filterAndClose(cb, boundCondition,
NoopMetric, NoopMetric, NoopMetric).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, Seq(expr))
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -106,7 +106,7 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
val result = tp.projectAndCloseWithRetrySingleBatch(sb)
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -141,7 +141,7 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
when(mockPlan.output).thenReturn(Seq(a, b))
val ast = GpuProjectAstExec(List(expr.asInstanceOf[Expression]), mockPlan)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
withResource(sb) { sb =>
withResource(ast.buildRetryableAstIterator(Seq(sb.getColumnarBatch).iterator)) { result =>
withResource(result.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RangeRetrySuite extends RmmSparkRetrySuiteBase {
test("GPU range iterator with split and retry OOM") {
val rangeIter = new GpuRangeIterator(start, end, step, maxRows, null, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
// It should produce two batches, and rows numbers are
// 10 (=20/2) after retry, and
// 15 (25-10), the remaining ones.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
Expand All @@ -40,7 +40,7 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
assertThrows[GpuSplitAndRetryOOM] {
row2ColIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down Expand Up @@ -70,7 +70,7 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmSpark.OomInjectionType.GPU, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down
Loading
Loading