Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.LegacyInjector
import org.apache.gluten.extension.joinagg.{ImplementJoinAggregate, PushAggregateThroughJoinBatch}
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -56,6 +57,8 @@ object VeloxRuleApi {
injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply)
injector.injectOptimizerRule(RewriteCastFromArray.apply)
injector.injectOptimizerRule(RewriteUnboundedWindow.apply)
injector.injectOptimizerRule(PushAggregateThroughJoinBatch.apply)
injector.injectPlannerStrategy(ImplementJoinAggregate.apply)

if (!BackendsApiManager.getSettings.enableJoinKeysRewrite()) {
injector.injectPlannerStrategy(_ => org.apache.gluten.extension.GlutenJoinKeysCapture())
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,17 @@ class VeloxTPCHV1BhjOffheapSuite extends VeloxTPCHSuite {
}
}

class VeloxTPCHV1BhjPushAggThroughJoinSuite extends VeloxTPCHSuite {
override def subType(): String = "v1-bhj-push-agg-through-join"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "30M")
.set(GlutenConfig.PUSH_AGGREGATE_THROUGH_JOIN_ENABLED.key, "true")
}
}

class VeloxTPCHV2Suite extends VeloxTPCHSuite {
override def subType(): String = "v2"

Expand Down
2 changes: 2 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ nav_order: 15
| spark.gluten.sql.native.writeColumnMetadataExclusionList | comment | Native write files does not support column metadata. Metadata in list would be removed to support native write files. Multiple values separated by commas. |
| spark.gluten.sql.native.writer.enabled | <undefined> | This is config to specify whether to enable the native columnar parquet/orc writer |
| spark.gluten.sql.orc.charType.scan.fallback.enabled | true | Force fallback for orc char type scan. |
| spark.gluten.sql.pushAggregateThroughJoin.enabled | false | Enables the push-aggregate-through-join optimization in Gluten. When enabled, aggregate operators may be pushed below joins during logical optimization and corresponding physical plans may be rewritten to execute the aggregation earlier. |
| spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. |
| spark.gluten.sql.removeNativeWriteFilesSortAndProject | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. |
| spark.gluten.sql.rewrite.dateTimestampComparison | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` |
| spark.gluten.sql.scan.fileSchemeValidation.enabled | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
def enableExtendedColumnPruning: Boolean =
getConf(ENABLE_EXTENDED_COLUMN_PRUNING)

def pushAggregateThroughJoinEnabled: Boolean =
getConf(PUSH_AGGREGATE_THROUGH_JOIN_ENABLED)

def pushAggregateThroughJoinMaxDepth: Int =
getConf(PUSH_AGGREGATE_THROUGH_JOIN_MAX_DEPTH)

def forceOrcCharTypeScanFallbackEnabled: Boolean =
getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)

Expand Down Expand Up @@ -706,6 +712,31 @@ object GlutenConfig extends ConfigRegistry {
.stringConf
.createWithDefault("and,or");

val PUSH_AGGREGATE_THROUGH_JOIN_ENABLED =
buildConf("spark.gluten.sql.pushAggregateThroughJoin.enabled")
.doc(
"Enables the push-aggregate-through-join optimization in Gluten. " +
"When enabled, aggregate operators may be pushed below joins " +
"during logical optimization " +
"and corresponding physical plans may be rewritten to execute " +
"the aggregation earlier."
)
.booleanConf
.createWithDefault(false)

val PUSH_AGGREGATE_THROUGH_JOIN_MAX_DEPTH =
buildConf("spark.gluten.sql.pushAggregateThroughJoin.maxDepth")
.doc(
"Maximum join traversal depth when applying the push-aggregate-through-join " +
"optimization. " +
"A value of 1 allows pushing an aggregate through a single join; larger " +
"values allow the rule " +
"to traverse and push through multiple consecutive joins."
)
.intConf
.checkValue(_ >= 1, "must be greater than or equal to 1.")
.createWithDefault(Int.MaxValue)

val GLUTEN_SOFT_AFFINITY_ENABLED =
buildConf("spark.gluten.soft-affinity.enabled")
.doc("Whether to enable Soft Affinity scheduling.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension.columnar.rewrite

import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan, UnaryExecNode}

/**
Expand All @@ -30,10 +31,16 @@ object ProjectColumnPruning extends RewriteSingleNode {
}
}

private def getReferences(plan: SparkPlan): AttributeSet = {
// SPARK-55979 - aggregate.references is unreliable.
AttributeSet(plan.expressions) -- (plan.producedAttributes -- plan.children.flatMap(
_.outputSet))
}

override def rewrite(plan: SparkPlan): SparkPlan = plan match {
case parent: UnaryExecNode if parent.child.isInstanceOf[ProjectExec] =>
val project = parent.child.asInstanceOf[ProjectExec]
val unusedAttribute = project.outputSet -- (parent.references ++ parent.outputSet)
val unusedAttribute = project.outputSet -- (getReferences(parent) ++ parent.outputSet)

if (unusedAttribute.nonEmpty) {
val newProject = project.copy(projectList = project.projectList.diff(unusedAttribute.toSeq))
Expand Down
Loading
Loading