Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
p.mapChildren(removeProject(_, false))
}
case op: TakeOrderedAndProjectExec =>
op.mapChildren(removeProject(_, false))
// The planner turns Limit + Sort into TakeOrderedAndProjectExec which adds an additional
// Project that does not exist in the logical plan. We shouldn't use this additional Project
// to optimize out other Projects, otherwise when AQE turns physical plan back to
// logical plan, we lose the Project and may mess up the output column order. So column
// ordering is required if AQE is enabled and projectList is the same as child output.
val requireColOrdering = conf.adaptiveExecutionEnabled && op.projectList == op.child.output
op.mapChildren(removeProject(_, requireColOrdering))
case a: BaseAggregateExec =>
// BaseAggregateExec require specific column ordering when mode is Final or PartialMerge.
// See comments in BaseAggregateExec inputAttributes method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_RE
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
Expand Down Expand Up @@ -2803,6 +2804,26 @@ test("SPARK-44040: Fix compute stats when AggregateExec nodes above QueryStageEx
val unionDF = aggDf1.union(aggDf2)
checkAnswer(unionDF.select("id").distinct, Seq(Row(null)))
}

test("SPARK-50258: Fix output column order changed issue after AQE optimization") {
withTable("t") {
sql("SELECT course, year, earnings FROM courseSales").write.saveAsTable("t")
val df = sql(
"""
|SELECT year, course, earnings, SUM(earnings) OVER (ORDER BY year, course) AS balance
|FROM t ORDER BY year, course
|LIMIT 100
|""".stripMargin)
df.collect()

val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec])
assert(plan.finalPhysicalPlan.isInstanceOf[WindowExec])
plan.inputPlan.output.zip(plan.finalPhysicalPlan.output).foreach { case (o1, o2) =>
assert(o1.semanticEquals(o2), "Different output column order after AQE optimization")
}
}
}
}

/**
Expand Down