Skip to content

Commit

Permalink
[SPARK-3349][SQL] Output partitioning of limit should not be inherite…
Browse files Browse the repository at this point in the history
…d from child

This resolves https://issues.apache.org/jira/browse/SPARK-3349

Author: Eric Liang <ekl@google.com>

Closes #2262 from ericl/spark-3349 and squashes the following commits:

3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition
  • Loading branch information
Eric Liang authored and marmbrus committed Sep 8, 2014
1 parent 08ce188 commit 7db5339
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair

/**
Expand Down Expand Up @@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan)
private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

override def output = child.output
override def outputPartitioning = SinglePartition

/**
* A custom implementation modeled after the take function on RDDs but which never runs any job
Expand Down Expand Up @@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan)
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {

override def output = child.output
override def outputPartitioning = SinglePartition

val ordering = new RowOrdering(sortOrder, child.output)

Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
(null, null, 6, "F") :: Nil)
}

test("SPARK-3349 partitioning after limit") {
sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
.limit(2)
.registerTempTable("subset1")
sql("SELECT DISTINCT n FROM lowerCaseData")
.limit(2)
.registerTempTable("subset2")
checkAnswer(
sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"),
(3, "c", 3) ::
(4, "d", 4) :: Nil)
checkAnswer(
sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"),
(1, "a", 1) ::
(2, "b", 2) :: Nil)
}

test("mixed-case keywords") {
checkAnswer(
sql(
Expand Down

0 comments on commit 7db5339

Please sign in to comment.