From b6fa2641730545068703fb5d50a652dcbf3e8e88 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 10 Jul 2014 18:10:37 +0900 Subject: [PATCH] Add except and intersect methods to SchemaRDD. --- .../org/apache/spark/sql/SchemaRDD.scala | 20 ++++++++++++++++++ .../org/apache/spark/sql/DslQuerySuite.scala | 21 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8bcfc7c064c2f..0c95b668545f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -256,6 +256,26 @@ class SchemaRDD( def unionAll(otherPlan: SchemaRDD) = new SchemaRDD(sqlContext, Union(logicalPlan, otherPlan.logicalPlan)) + /** + * Performs a relational except on two SchemaRDDs + * + * @param otherPlan the [[SchemaRDD]] that should be excepted from this one. + * + * @group Query + */ + def except(otherPlan: SchemaRDD): SchemaRDD = + new SchemaRDD(sqlContext, Except(logicalPlan, otherPlan.logicalPlan)) + + /** + * Performs a relational intersect on two SchemaRDDs + * + * @param otherPlan the [[SchemaRDD]] that should be intersected with this one. + * + * @group Query + */ + def intersect(otherPlan: SchemaRDD): SchemaRDD = + new SchemaRDD(sqlContext, Intersect(logicalPlan, otherPlan.logicalPlan)) + /** * Filters tuples using a function over the value of the specified column. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 04ac008682f5f..68dae58728a2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -168,4 +168,25 @@ class DslQuerySuite extends QueryTest { test("zero count") { assert(emptyTableData.count() === 0) } + + test("except") { + checkAnswer( + lowerCaseData.except(upperCaseData), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer(lowerCaseData.except(lowerCaseData), Nil) + checkAnswer(upperCaseData.except(upperCaseData), Nil) + } + + test("intersect") { + checkAnswer( + lowerCaseData.intersect(lowerCaseData), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer(lowerCaseData.intersect(upperCaseData), Nil) + } }