Skip to content

Commit

Permalink
[SPARK-11113][SQL] Remove DeveloperApi annotation from private classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Oct 14, 2015
1 parent 615cc85 commit c479385
Show file tree
Hide file tree
Showing 29 changed files with 22 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.rules
import org.apache.spark.util.Utils

Expand All @@ -40,10 +39,8 @@ package object codegen {
}

/**
* :: DeveloperApi ::
* Dumps the bytecode from a class to the screen using javap.
*/
@DeveloperApi
object DumpByteCode {
import scala.sys.process._
val dumpDirectory = Utils.createTempDir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import java.util.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
Expand All @@ -28,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* :: DeveloperApi ::
* Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
* group.
*
Expand All @@ -38,7 +36,6 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
* @param aggregateExpressions expressions that are computed for each group.
* @param child the input data source.
*/
@DeveloperApi
case class Aggregate(
partial: Boolean,
groupingExpressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import java.util.Random

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.hash.HashShuffleManager
Expand All @@ -33,13 +33,10 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
import org.apache.spark._

/**
* :: DeveloperApi ::
* Performs a shuffle that will result in the desired `newPartitioning`.
*/
@DeveloperApi
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {

override def nodeName: String = if (tungstenMode) "TungstenExchange" else "Exchange"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
Expand All @@ -27,10 +26,7 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}

/**
* :: DeveloperApi ::
*/
@DeveloperApi

object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
Expand All @@ -32,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartit
* @param output The output Schema
* @param child Child operator
*/
@DeveloperApi
case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -35,7 +34,6 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
}

/**
* :: DeveloperApi ::
* Applies a [[Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
* programming with one important additional feature, which allows the input rows to be joined with
Expand All @@ -48,7 +46,6 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
* @param output the output attributes of this node, which constructed in analysis phase,
* and we can not change it, as the parent node bound with it already.
*/
@DeveloperApi
case class Generate(
generator: Generator,
join: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, optimizer}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
val analyzer = sqlContext.analyzer
val optimizer = sqlContext.optimizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType

private class ShuffledRowRDDPartition(val idx: Int) extends Partition {
override val index: Int = idx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -32,17 +31,16 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
import org.apache.spark.sql.types.DataType

object SparkPlan {
protected[sql] val currentContext = new ThreadLocal[SQLContext]()
}

/**
* :: DeveloperApi ::
* The base class for physical operators.
*/
@DeveloperApi
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package org.apache.spark.sql.execution

import java.util

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.CompactBuffer
import scala.collection.mutable

/**
* :: DeveloperApi ::
* This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
* partition. The aggregates are calculated for each row in the group. Special processing
* instructions, frames, are used to calculate these aggregates. Frames are processed in the order
Expand Down Expand Up @@ -76,7 +71,6 @@ import scala.collection.mutable
* Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair
* of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
*/
@DeveloperApi
case class Window(
projectList: Seq[Attribute],
windowExpression: Seq[NamedExpression],
Expand Down Expand Up @@ -229,7 +223,7 @@ case class Window(
// function result buffer.
val framedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification)
val factories = Array.ofDim[() => WindowFunctionFrame](framedWindowExprs.size)
val unboundExpressions = mutable.Buffer.empty[Expression]
val unboundExpressions = scala.collection.mutable.Buffer.empty[Expression]
framedWindowExprs.zipWithIndex.foreach {
case ((frame, unboundFrameExpressions), index) =>
// Track the ordinal.
Expand Down Expand Up @@ -529,7 +523,7 @@ private[execution] final class SlidingWindowFunctionFrame(
private[this] var inputLowIndex = 0

/** Buffer used for storing prepared input for the window functions. */
private[this] val buffer = new util.ArrayDeque[Array[AnyRef]]
private[this] val buffer = new java.util.ArrayDeque[Array[AnyRef]]

/** Index of the row we are currently writing. */
private[this] var outputIndex = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -28,10 +27,7 @@ import org.apache.spark.util.MutablePair
import org.apache.spark.util.random.PoissonSampler
import org.apache.spark.{HashPartitioner, SparkEnv}

/**
* :: DeveloperApi ::
*/
@DeveloperApi

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)

Expand Down Expand Up @@ -90,10 +86,6 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan)
}


/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

Expand Down Expand Up @@ -125,16 +117,15 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
}

/**
* :: DeveloperApi ::
* Sample the dataset.
*
* @param lowerBound Lower-bound of the sampling probability (usually 0.0)
* @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
* will be ub - lb.
* @param withReplacement Whether to sample with replacement.
* @param seed the random seed
* @param child the SparkPlan
*/
@DeveloperApi
case class Sample(
lowerBound: Double,
upperBound: Double,
Expand Down Expand Up @@ -165,9 +156,8 @@ case class Sample(
}

/**
* :: DeveloperApi ::
* Union two plans, without a distinct. This is UNION ALL in SQL.
*/
@DeveloperApi
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
Expand All @@ -179,14 +169,12 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
}

/**
* :: DeveloperApi ::
* Take the first limit elements. Note that the implementation is different depending on whether
* this is a terminal operator or not. If it is terminal and is invoked using executeCollect,
* this operator uses something similar to Spark's take method on the Spark driver. If it is not
* terminal or is invoked using execute, we first take the limit on each partition, and then
* repartition all the data to a single partition to compute the global limit.
*/
@DeveloperApi
case class Limit(limit: Int, child: SparkPlan)
extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
Expand Down Expand Up @@ -219,14 +207,12 @@ case class Limit(limit: Int, child: SparkPlan)
}

/**
* :: DeveloperApi ::
* Take the first limit elements as defined by the sortOrder, and do projection if needed.
* This is logically equivalent to having a [[Limit]] operator after a [[Sort]] operator,
* or having a [[Project]] operator between them.
* This could have been named TopK, but Spark's top operator does the opposite in ordering
* so we name it TakeOrdered to avoid confusion.
*/
@DeveloperApi
case class TakeOrderedAndProject(
limit: Int,
sortOrder: Seq[SortOrder],
Expand Down Expand Up @@ -271,13 +257,11 @@ case class TakeOrderedAndProject(
}

/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
*/
@DeveloperApi
case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

Expand All @@ -294,11 +278,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
}

/**
* :: DeveloperApi ::
* Returns a table with the elements from left that are not in right using
* the built-in spark subtract function.
*/
@DeveloperApi
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output

Expand All @@ -308,11 +290,9 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}

/**
* :: DeveloperApi ::
* Returns the rows in left that also appear in right using the built in spark
* intersection function.
*/
@DeveloperApi
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output

Expand All @@ -322,12 +302,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil

Expand Down

0 comments on commit c479385

Please sign in to comment.