diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala index 98969f60c2b43..e975f3b219aeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala @@ -72,8 +72,6 @@ case class WindowGroupLimitExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - val evaluatorFactory = new WindowGroupLimitEvaluatorFactory( partitionSpec, @@ -81,14 +79,14 @@ case class WindowGroupLimitExec( rankLikeFunction, limit, child.output, - numOutputRows) + longMetric("numOutputRows")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { iter => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, iter) + evaluator.eval(index, rowIterator) } } }