Skip to content

Commit

Permalink
[SPARK-8914][SQL] Remove RDDApi
Browse files Browse the repository at this point in the history
As rxin suggested in #7298 , we should consider to remove `RDDApi`.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #7302 from sarutak/remove-rddapi and squashes the following commits:

e495d35 [Kousuke Saruta] Fixed mima
cb7ebb9 [Kousuke Saruta] Removed overriding RDDApi
  • Loading branch information
sarutak authored and rxin committed Jul 9, 2015
1 parent f472b8c commit 2a4f88b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 87 deletions.
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Expand Up @@ -70,7 +70,12 @@ object MimaExcludes {
"org.apache.spark.mllib.linalg.Matrix.numNonzeros"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Matrix.numActives")
) ++ Seq(
// SPARK-8914 Remove RDDApi
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.RDDApi")
)

case v if v.startsWith("1.4") =>
Seq(
MimaBuild.excludeSparkPackage("deploy"),
Expand Down
39 changes: 19 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Expand Up @@ -115,8 +115,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
extends RDDApi[Row] with Serializable {
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {

/**
* A constructor that automatically analyzes the logical plan.
Expand Down Expand Up @@ -1320,29 +1319,29 @@ class DataFrame private[sql](
* @group action
* @since 1.3.0
*/
override def first(): Row = head()
def first(): Row = head()

/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0
*/
override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)

/**
* Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
* and then flattening the results.
* @group rdd
* @since 1.3.0
*/
override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)

/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}

Expand All @@ -1351,49 +1350,49 @@ class DataFrame private[sql](
* @group rdd
* @since 1.3.0
*/
override def foreach(f: Row => Unit): Unit = rdd.foreach(f)
def foreach(f: Row => Unit): Unit = rdd.foreach(f)

/**
* Applies a function f to each partition of this [[DataFrame]].
* @group rdd
* @since 1.3.0
*/
override def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f)
def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f)

/**
* Returns the first `n` rows in the [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def take(n: Int): Array[Row] = head(n)
def take(n: Int): Array[Row] = head(n)

/**
* Returns an array that contains all of [[Row]]s in this [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()

/**
* Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*)
def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*)

/**
* Returns the number of rows in the [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def count(): Long = groupBy().count().collect().head.getLong(0)
def count(): Long = groupBy().count().collect().head.getLong(0)

/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* @group rdd
* @since 1.3.0
*/
override def repartition(numPartitions: Int): DataFrame = {
def repartition(numPartitions: Int): DataFrame = {
Repartition(numPartitions, shuffle = true, logicalPlan)
}

Expand All @@ -1405,7 +1404,7 @@ class DataFrame private[sql](
* @group rdd
* @since 1.4.0
*/
override def coalesce(numPartitions: Int): DataFrame = {
def coalesce(numPartitions: Int): DataFrame = {
Repartition(numPartitions, shuffle = false, logicalPlan)
}

Expand All @@ -1415,13 +1414,13 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
override def distinct(): DataFrame = dropDuplicates()
def distinct(): DataFrame = dropDuplicates()

/**
* @group basic
* @since 1.3.0
*/
override def persist(): this.type = {
def persist(): this.type = {
sqlContext.cacheManager.cacheQuery(this)
this
}
Expand All @@ -1430,13 +1429,13 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def cache(): this.type = persist()
def cache(): this.type = persist()

/**
* @group basic
* @since 1.3.0
*/
override def persist(newLevel: StorageLevel): this.type = {
def persist(newLevel: StorageLevel): this.type = {
sqlContext.cacheManager.cacheQuery(this, None, newLevel)
this
}
Expand All @@ -1445,7 +1444,7 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def unpersist(blocking: Boolean): this.type = {
def unpersist(blocking: Boolean): this.type = {
sqlContext.cacheManager.tryUncacheQuery(this, blocking)
this
}
Expand All @@ -1454,7 +1453,7 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def unpersist(): this.type = unpersist(blocking = false)
def unpersist(): this.type = unpersist(blocking = false)

/////////////////////////////////////////////////////////////////////////////
// I/O
Expand Down
67 changes: 0 additions & 67 deletions sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala

This file was deleted.

0 comments on commit 2a4f88b

Please sign in to comment.