Skip to content

Commit

Permalink
[SPARK-44340][SQL][FOLLOWUP][3.5] Set partition index correctly for W…
Browse files Browse the repository at this point in the history
…indowGroupLimitExec

### What changes were proposed in this pull request?
This is a followup of #41899, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup.

### Why are the changes needed?
future-proof

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
existing tests

Closes #42233 from beliefer/SPARK-44340_followup_3.5.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Jul 31, 2023
1 parent 47224b3 commit ee3ddf8
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,21 @@ case class WindowGroupLimitExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

val evaluatorFactory =
new WindowGroupLimitEvaluatorFactory(
partitionSpec,
orderSpec,
rankLikeFunction,
limit,
child.output,
numOutputRows)
longMetric("numOutputRows"))

if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsInternal { iter =>
child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down

0 comments on commit ee3ddf8

Please sign in to comment.