Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizing Expand+Aggregate in sqls with many count distinct #10798

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class GpuExpandExecMeta(
override def convertToGpu(): GpuExec = {
val projections = gpuProjections.map(_.map(_.convertToGpu()))
GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())(
preprojectEnabled = conf.isExpandPreprojectEnabled)
preprojectEnabled = conf.isExpandPreprojectEnabled,
coalesceAfter = conf.isCoalesceAfterExpandEnabled)
}
}

Expand All @@ -65,15 +66,21 @@ class GpuExpandExecMeta(
* output the same schema specified bye the parameter `output`
* @param output Attribute references to Output
* @param child Child operator
* @param preprojectEnabled Whether to enable pre-project before expanding
* @param coalesceAfter Whether to coalesce the output batches
*/
case class GpuExpandExec(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)(
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {
preprojectEnabled: Boolean = false,
binmahone marked this conversation as resolved.
Show resolved Hide resolved
override val coalesceAfter: Boolean = true
) extends ShimUnaryExecNode with GpuExec {

override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef](
preprojectEnabled.asInstanceOf[java.lang.Boolean])
preprojectEnabled.asInstanceOf[java.lang.Boolean],
coalesceAfter.asInstanceOf[java.lang.Boolean]
)

private val PRE_PROJECT_TIME = "preprojectTime"
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.shims.{ShimBinaryExpression, ShimExpression, Shim
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -52,6 +52,20 @@ object GpuExpressionsUtils {
"implemented and should have been disabled")
}

case class NullVecKey(d: DataType, n: Int)

// accessOrder = true makes it LRU
class NullVecCache
extends java.util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) {

override def clear(): Unit = {
super.values().forEach(_.close())
super.clear()
}
}

val cachedNullVectors = ThreadLocal.withInitial[NullVecCache](() => new NullVecCache)

/**
* Tries to resolve a `GpuColumnVector` from a Scala `Any`.
*
Expand All @@ -73,7 +87,19 @@ object GpuExpressionsUtils {
def resolveColumnVector(any: Any, numRows: Int): GpuColumnVector = {
withResourceIfAllowed(any) {
case c: GpuColumnVector => c.incRefCount()
case s: GpuScalar => GpuColumnVector.from(s, numRows, s.dataType)
case s: GpuScalar =>
if (!s.isValid) {
val key = NullVecKey(s.dataType, numRows)
if (!cachedNullVectors.get.containsKey(key)) {
cachedNullVectors.get.put(key,
GpuColumnVector.from(s, numRows, s.dataType))
}

val ret = cachedNullVectors.get.get(key)
ret.incRefCount()
} else {
GpuColumnVector.from(s, numRows, s.dataType)
}
case other =>
throw new IllegalArgumentException(s"Cannot resolve a ColumnVector from the value:" +
s" $other. Please convert it to a GpuScalar or a GpuColumnVector before returning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _: GpuDataSourceScanExec => true
case _: DataSourceV2ScanExecBase => true
case _: RDDScanExec => true // just in case an RDD was reading in data
case _: ExpandExec => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,12 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_COALESCE_AFTER_EXPAND = conf("spark.rapids.sql.coalesceAfterExpand.enabled")
.doc("When set to false disables the coalesce after GPU Expand. ")
.internal()
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " +
Expand Down Expand Up @@ -2846,6 +2852,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isExpandPreprojectEnabled: Boolean = get(ENABLE_EXPAND_PREPROJECT)

lazy val isCoalesceAfterExpandEnabled: Boolean = get(ENABLE_COALESCE_AFTER_EXPAND)

lazy val multiThreadReadNumThreads: Int = {
// Use the largest value set among all the options.
val deprecatedConfs = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,23 @@ object GpuProjectExec {
// This can help avoid contiguous splits in some cases when the input data is also contiguous
GpuColumnVector.incRefCounts(cb)
} else {
val newColumns = boundExprs.safeMap(_.columnarEval(cb)).toArray[ColumnVector]
new ColumnarBatch(newColumns, cb.numRows())
try {
// In some cases like Expand, we have a lot Expressions generating null vectors.
// We can cache the null vectors to avoid creating them every time.
// Since we're attempting to reuse the whole null vector, it is important to aware that
// datatype and vector length should be the same.
// Within project(cb: ColumnarBatch, boundExprs: Seq[Expression]), all output vectors share
// the same vector length, which facilitates the reuse of null vectors.
// When leaving the scope of project(cb: ColumnarBatch, boundExprs: Seq[Expression]),
// the cached null vectors will be cleared because the next ColumnBatch may have
// different vector length, thus not able to reuse cached vectors.
GpuExpressionsUtils.cachedNullVectors.get.clear()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more ways than just this method to run a expression. I don't trust this to fix it every time. It is probably god enough in practice, but I don't like the precedence it is setting. At a minimum I want to see some comments here explaining what is happening and why. Preferably with a follow on issue to fix this once we have the ability to delete a buffer from the spill framework instead of spilling it.


val newColumns = boundExprs.safeMap(_.columnarEval(cb)).toArray[ColumnVector]
new ColumnarBatch(newColumns, cb.numRows())
} finally {
GpuExpressionsUtils.cachedNullVectors.get.clear()
}
}
}

Expand Down
Loading