Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimzing Expand+Aggregate in sqls with many count distinct [WIP] #10798

Open
wants to merge 2 commits into
base: branch-24.08
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.util.Random

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuExpressionsUtils.NullVecCache
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -54,7 +55,9 @@ class GpuExpandExecMeta(
val projections = gpuProjections.map(_.map(_.convertToGpu()))
GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled,
preprojectEnabled = conf.isExpandPreprojectEnabled)
preprojectEnabled = conf.isExpandPreprojectEnabled,
cacheNullMaxCount = conf.expandCachingNullVecMaxCount,
coalesceAfter = conf.isCoalesceAfterExpandEnabled)
}
}

Expand All @@ -72,11 +75,17 @@ case class GpuExpandExec(
output: Seq[Attribute],
child: SparkPlan)(
useTieredProject: Boolean = false,
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {
preprojectEnabled: Boolean = false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add scaladoc documentation for the new arguments.

cacheNullMaxCount: Int = 0,
override val coalesceAfter: Boolean = true
) extends ShimUnaryExecNode with GpuExec {

override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef](
useTieredProject.asInstanceOf[java.lang.Boolean],
preprojectEnabled.asInstanceOf[java.lang.Boolean])
preprojectEnabled.asInstanceOf[java.lang.Boolean],
cacheNullMaxCount.asInstanceOf[java.lang.Integer],
coalesceAfter.asInstanceOf[java.lang.Boolean]
)

private val PRE_PROJECT_TIME = "preprojectTime"
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
Expand Down Expand Up @@ -127,7 +136,7 @@ case class GpuExpandExec(
}

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it))
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it), cacheNullMaxCount)
}
}

Expand Down Expand Up @@ -191,7 +200,8 @@ case class GpuExpandExec(
class GpuExpandIterator(
boundProjections: Seq[GpuTieredProject],
metrics: Map[String, GpuMetric],
it: Iterator[ColumnarBatch])
it: Iterator[ColumnarBatch],
cacheNullMaxCount: Int)
extends Iterator[ColumnarBatch] {

private var sb: Option[SpillableColumnarBatch] = None
Expand All @@ -206,9 +216,20 @@ class GpuExpandIterator(
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
sb.foreach(_.close())

if (cacheNullMaxCount > 0) {
import scala.collection.JavaConverters._
GpuExpressionsUtils.cachedNullVectors.get().values().asScala.foreach(_.close())
GpuExpressionsUtils.cachedNullVectors.get().clear()
}
}
}

if (cacheNullMaxCount > 0 && GpuExpressionsUtils.cachedNullVectors.get() == null) {
GpuExpressionsUtils.cachedNullVectors.set(new NullVecCache(cacheNullMaxCount))
}


override def hasNext: Boolean = sb.isDefined || it.hasNext

override def next(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.{ShimBinaryExpression, ShimExpression, ShimTernaryExpression, ShimUnaryExpression}
import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -52,6 +53,40 @@ object GpuExpressionsUtils {
"implemented and should have been disabled")
}

// This is only for ExpandExec which will generate a lot of null vectors
case class NullVecKey(d: DataType, n: Int)

class NullVecCache(private val maxNulls: Int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data stored in the cache needs to be spillable in some form. Eventually it would be nice to make it so instead of spilling we can just delete the value from the cache, but in the short term we need to make sure that everything stored in the cache is spillable.

It would also be really nice to have a timeout of some kind. If an entry is unused for a specific amount of time it should be deleted to avoid adding more memory pressure to the system.

extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't understand why we are extending a map instead of wrapping it? Or even better using some other cache data structure built for this type of use case.

If we wrapped it, then we could get true LRU functionality and be able to reset the the priority on a read. It would let us not need to override remove so it throws. That API would just not exist.

private var totalNulls: Long = 0L

override def clear(): Unit = {
super.clear()
totalNulls = 0
}

override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = {
if (v.getRowCount > maxNulls) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to throw if we can't cache a vector that has too many rows to be cached?

I think it would be better if it would skip the cache.

Also the user needs to set the number of rows they expect not per column, but overall for all null columns that they might cache (that's how I understand maxNulls). I do not know how to set that number.

Could we maybe configure this via a byte limit? I.e. "cache up to X MB worth of null vectors", that seems easier to reason about, and we can tie this to memory limits. That said, in addition to maxNulls being a sum total of rows, it's also per concurrently executing task. The MB limit would be similar "cache up to X MB worth of null vectors for this task". I am also having a hard time coming up with a good number here.

Ideally this is tied to the spill framework. The spill framework has no limits: if we made all these spillable columns we'd be allowed to fill GPU memory with spillable nulls. I do not know if it makes sense to support limits for categories of objects in that framework seeing this work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 @jlowe would be good to get your take on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We do not want to fail if the cache is configured badly. We want to run, but with possible reduced performance. I would say that when this happens we clear the cache and return the GpuColumnVector

throw new IllegalStateException(s"spark.rapids.sql.expandCachingNullVec.maxNulls" +
s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.")
}
val iter = entrySet().iterator()
while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why we are going off of maxNulls and not the size of the data being cached? We have potentially multiple orders of magnitude difference in the amount of memory used to cache something. 100 BOOL8 null values would take up 113 bytes, but 100 DECIMAL128 null values would take up 1613 bytes. It is not that hard to figure out the size of the data needed, even for nested types. If you need help with this please let me know.

val entry = iter.next()
iter.remove()
totalNulls -= entry.getValue.getRowCount
}

val ret = super.put(key, v)
totalNulls += v.getRowCount
ret
}

override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException()
}

val cachedNullVectors = new ThreadLocal[NullVecCache]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a thread local for this? This makes the GPU memory used dependent on the number of task threads in Spark. I really would prefer to have a set size in bytes that is for the entire cache.


/**
* Tries to resolve a `GpuColumnVector` from a Scala `Any`.
*
Expand All @@ -73,7 +108,18 @@ object GpuExpressionsUtils {
def resolveColumnVector(any: Any, numRows: Int): GpuColumnVector = {
withResourceIfAllowed(any) {
case c: GpuColumnVector => c.incRefCount()
case s: GpuScalar => GpuColumnVector.from(s, numRows, s.dataType)
case s: GpuScalar =>
if (!s.isValid && cachedNullVectors.get() != null) {
if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) {
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit,

Suggested change
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
cachedNullVectors.get.put(NullVecKey(s.dataType, numRows),

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also could we create the key once and pass it both to containsKey and put?

GpuColumnVector.from(s, numRows, s.dataType))
}

val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with a single cache, or even evicting data on spill this will need to change because it would introduce a race condition. Could we please have the API either return an optional value, or not have a separate get and put APIs, but have it be a factory API where we just ask it to hand us back a null column, and it does it.

ret.incRefCount()
} else {
GpuColumnVector.from(s, numRows, s.dataType)
}
case other =>
throw new IllegalArgumentException(s"Cannot resolve a ColumnVector from the value:" +
s" $other. Please convert it to a GpuScalar or a GpuColumnVector before returning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _: GpuDataSourceScanExec => true
case _: DataSourceV2ScanExecBase => true
case _: RDDScanExec => true // just in case an RDD was reading in data
case _: ExpandExec => true
case _ => false
}

Expand Down
18 changes: 18 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,20 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_COALESCE_AFTER_EXPAND = conf("spark.rapids.sql.coalesceAfterExpand.enabled")
.doc("When set to false disables the coalesce after GPU Expand. ")
.internal()
.booleanConf
.createWithDefault(false)

val EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT =
conf("spark.rapids.sql.expandCachingNullVec.maxNulls")
.doc("Max number of null scalar in null vectors to cache for GPU Expand. " +
"If the number of null scala exceeds this value, the null vectors will not be cached." +
"The value has to be positive for caching to be enabled.")
.internal().integerConf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the plan here? Both of these features/configs are disabled by default. Is the plan to work with some customers and see if enabling them helps, and then after that possibly enable them by default?

.createWithDefault(0)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " +
Expand Down Expand Up @@ -2628,6 +2642,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isExpandPreprojectEnabled: Boolean = get(ENABLE_EXPAND_PREPROJECT)

lazy val isCoalesceAfterExpandEnabled: Boolean = get(ENABLE_COALESCE_AFTER_EXPAND)

lazy val expandCachingNullVecMaxCount: Int = get(EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT)

lazy val multiThreadReadNumThreads: Int = {
// Use the largest value set among all the options.
val deprecatedConfs = Seq(
Expand Down