From e883e00a5c41fa75def091da792edd6cfa08fafa Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 13:39:44 +0800 Subject: [PATCH] Spark SQL basicOperator add distinct operator Return a new SparkPlan containing the distinct elements . --- .../spark/sql/execution/basicOperators.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c69933..85ac6f31985b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,3 +204,18 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } +/** + * :: DeveloperApi :: + *Return a new SparkPlan containing the distinct elements . + */ +import org.apache.spark.SparkContext._ +@DeveloperApi +case class Distinct(child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode { + // TODO: Implement a distinct, and use a strategy to generate the proper Distinct plan: + + override def output = child.output + + override def execute() = { + child.execute().map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) + } +}