Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {

def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)

def cudfEnableValidation: Boolean = getConf(CUDF_ENABLE_VALIDATION)

def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)

def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
Expand Down Expand Up @@ -624,6 +626,14 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)

val CUDF_ENABLE_VALIDATION =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableValidation")
.doc(
"Heuristics you can apply to validate a cuDF/GPU plan and only offload when " +
"the entire stage can be fully and profitably executed on GPU")
.booleanConf
.createWithDefault(true)

val MEMORY_DUMP_ON_EXIT =
buildConf("spark.gluten.monitor.memoryDumpOnExit")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.gluten.extension

import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, WholeStageTransformer}
import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, TransformSupport, WholeStageTransformer}

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -37,7 +38,22 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl
case _: LeafTransformSupport => true
case _ => false
}.isDefined
transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
if (!hasLeaf && VeloxConfig.get.cudfEnableValidation) {
if (
VeloxCudfPlanValidatorJniWrapper.validate(
transformer.substraitPlan.toProtobuf.toByteArray)
) {
transformer.foreach {
case _: LeafTransformSupport =>
case t: TransformSupport =>
t.setTagValue(CudfTag.CudfTag, true)
case _ =>
}
transformer.setTagValue(CudfTag.CudfTag, true)
}
} else {
transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
}
Comment on lines +41 to +56
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A dumb question, after this PR, what happens if user sets spark.gluten.sql.columnar.backend.velox.cudf.enableValidation=false but the plan is actually invalid for cuDF?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will not check if the pan is fully offloaded to GPU, may introduce extra cudf to velox format conversion. This is not a suggested behavior.

} else {
transformer.setTagValue(CudfTag.CudfTag, true)
}
Expand Down
17 changes: 13 additions & 4 deletions cpp/velox/cudf/CudfPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,21 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"
#include "velox/exec/TableScan.h"
#include "velox/experimental/cudf/exec/NvtxHelper.h"
#include "velox/experimental/cudf/exec/ToCudf.h"

using namespace facebook;

namespace gluten {

namespace {

bool isCudfOperator(const exec::Operator* op) {
return dynamic_cast<const velox::cudf_velox::NvtxHelper*>(op) != nullptr;
}

}

bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
std::vector<::substrait::ReadRel_LocalFiles> localFiles;
Expand Down Expand Up @@ -64,10 +74,9 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
if (dynamic_cast<const velox::exec::TableScan*>(op) != nullptr) {
continue;
}
// TODO: wait for PR https://github.com/facebookincubator/velox/pull/13341
// if (cudf_velox::isCudfOperator(op)) {
// continue;
// }
if (isCudfOperator(op)) {
continue;
}
if (dynamic_cast<const ValueStream*>(op) != nullptr) {
continue;
}
Expand Down
7 changes: 4 additions & 3 deletions docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ nav_order: 16

## Gluten Velox backend configurations

| Key | Default | Description |
| Key | Default | Description |
|----------------------------------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.gluten.sql.columnar.backend.velox.IOThreads | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. |
| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task |
Expand All @@ -23,6 +23,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan |
| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. |
| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan |
| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU |
| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. |
| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. |
| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize |
Expand All @@ -48,8 +49,8 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. |
| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. |
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. |
| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. If this is set to false Gluten will fallback to vanilla Spark if it does not support all column types present in any of the schemas of the tables being read, at this time unsupported types include TimestampNTZ and Char. |
| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. If this is set to false Gluten will fallback to vanilla Spark if it does not support all column types present in any of the schemas of the tables being read, at this time unsupported types include TimestampNTZ and Char. |
| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. |
| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. |
| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan |
| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. |
| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. |
Expand Down