diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 47bff0c730b8a..cac376608be29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} import org.apache.spark.util.MutablePair /** @@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan) private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] override def output = child.output + override def outputPartitioning = SinglePartition /** * A custom implementation modeled after the take function on RDDs but which never runs any job @@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan) case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output = child.output + override def outputPartitioning = SinglePartition val ordering = new RowOrdering(sortOrder, child.output) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1ac205937714c..e8fbc28d0ad60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { (null, null, 6, "F") :: Nil) } + test("SPARK-3349 partitioning after limit") { + sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC") + .limit(2) + .registerTempTable("subset1") + sql("SELECT DISTINCT n FROM lowerCaseData") + .limit(2) + .registerTempTable("subset2") + checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"), + (3, "c", 3) :: + (4, "d", 4) :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"), + (1, "a", 1) :: + (2, "b", 2) :: Nil) + } + test("mixed-case keywords") { checkAnswer( sql(