From 5a7eac8dbda2fe3f2f9fb03be0548835e329014d Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 23 May 2017 16:58:52 +0200 Subject: [PATCH 1/9] [DOCS][MINOR] Scaladoc fixes (aka typo hunting) --- .../spark/sql/catalyst/ScalaReflection.scala | 6 +++-- .../sql/catalyst/analysis/Analyzer.scala | 5 ++-- .../catalyst/encoders/ExpressionEncoder.scala | 2 +- .../expressions/codegen/CodeGenerator.scala | 2 +- .../codegen/GenerateUnsafeProjection.scala | 6 ++--- .../expressions/windowExpressions.scala | 8 +++---- .../sql/catalyst/planning/QueryPlanner.scala | 14 +++++++---- .../scala/org/apache/spark/sql/Column.scala | 4 ++-- .../spark/sql/RelationalGroupedDataset.scala | 24 +++++++++---------- .../spark/sql/execution/SparkPlan.scala | 8 +++---- .../sql/execution/WholeStageCodegenExec.scala | 14 +++++------ .../apache/spark/sql/expressions/Window.scala | 4 ++-- .../org/apache/spark/sql/functions.scala | 4 ++-- .../internal/BaseSessionStateBuilder.scala | 2 +- .../spark/sql/internal/SessionState.scala | 2 +- .../apache/spark/sql/sources/interfaces.scala | 2 +- 16 files changed, 56 insertions(+), 51 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 6d1d019cc4743..87130532c89bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -88,8 +88,10 @@ object ScalaReflection extends ScalaReflection { } /** - * Given a type `T` this function constructs and ObjectType that holds a class of type - * Array[T]. Special handling is performed for primitive types to map them back to their raw + * Given a type `T` this function constructs `ObjectType` that holds a class of type + * `Array[T]`. + * + * Special handling is performed for primitive types to map them back to their raw * JVM form instead of the Scala Array that handles auto boxing. */ private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d58b8acefdade..bda22eebbd878 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -85,8 +85,7 @@ object AnalysisContext { /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a - * [[SessionCatalog]] and a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]]. */ class Analyzer( catalog: SessionCatalog, @@ -1882,7 +1881,7 @@ class Analyzer( * `[Sum(_w0) OVER (PARTITION BY _w1 ORDER BY _w2)]` and the second returned value will be * [col1, col2 + col3 as _w0, col4 as _w1, col5 as _w2]. * - * @return (seq of expressions containing at lease one window expressions, + * @return (seq of expressions containing at least one window expression, * seq of non-window expressions) */ private def extract( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index ec003cdc17b89..cc4d38ea1f95c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -235,7 +235,7 @@ case class ExpressionEncoder[T]( assert(serializer.flatMap { ser => val boundRefs = ser.collect { case b: BoundReference => b } assert(boundRefs.nonEmpty, - "each serializer expression should contains at least one `BoundReference`") + "each serializer expression should contain at least one `BoundReference`") boundRefs }.distinct.length <= 1, "all serializer expressions must use the same BoundReference.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f8da78b5f5e3e..fd9780245fcfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -800,7 +800,7 @@ class CodegenContext { /** * Generates code for expressions. If doSubexpressionElimination is true, subexpression - * elimination will be performed. Subexpression elimination assumes that the code will for each + * elimination will be performed. Subexpression elimination assumes that the code for each * expression will be combined in the `expressions` order. */ def generateExpressions(expressions: Seq[Expression], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index b358102d914bd..efbbc038bd33b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.types._ /** * Generates a [[Projection]] that returns an [[UnsafeRow]]. * - * It generates the code for all the expressions, compute the total length for all the columns - * (can be accessed via variables), and then copy the data into a scratch buffer space in the + * It generates the code for all the expressions, computes the total length for all the columns + * (can be accessed via variables), and then copies the data into a scratch buffer space in the * form of UnsafeRow (the scratch buffer will grow as needed). * - * Note: The returned UnsafeRow will be pointed to a scratch buffer inside the projection. + * @note The returned UnsafeRow will be pointed to a scratch buffer inside the projection. */ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafeProjection] { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 37190429fc423..88afd43223d1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -113,7 +113,7 @@ sealed trait FrameType * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered * as a physical offset. * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame, - * from the row precedes the current row to the row follows the current row. + * from the row that precedes the current row to the row that follows the current row. */ case object RowFrame extends FrameType @@ -126,7 +126,7 @@ case object RowFrame extends FrameType * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing rows whose values * `expr` are in the range of [v-1, v+1]. * - * If `ORDER BY` clause is not defined, all rows in the partition is considered as peers + * If `ORDER BY` clause is not defined, all rows in the partition are considered as peers * of the current row. */ case object RangeFrame extends FrameType @@ -217,11 +217,11 @@ case object UnboundedFollowing extends FrameBoundary { } /** - * The trait used to represent the a Window Frame. + * Represents a window frame. */ sealed trait WindowFrame -/** Used as a place holder when a frame specification is not defined. */ +/** Used as a placeholder when a frame specification is not defined. */ case object UnspecifiedFrame extends WindowFrame /** A specified Window Frame. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 5f694f44b6e8a..bc41dd0465e34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode /** * Given a [[LogicalPlan]], returns a list of `PhysicalPlan`s that can - * be used for execution. If this strategy does not apply to the give logical operation then an + * be used for execution. If this strategy does not apply to the given logical operation then an * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { @@ -42,9 +42,10 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L * Abstract class for transforming [[LogicalPlan]]s into physical plans. * Child classes are responsible for specifying a list of [[GenericStrategy]] objects that * each of which can return a list of possible physical plan options. - * If a given strategy is unable to plan all - * of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder - * object that will be filled in using other available strategies. + * If a given strategy is unable to plan all of the remaining operators in the tree, + * it can call [[GenericStrategy#planLater planLater]], which returns a placeholder + * object that will be [[collectPlaceholders collected]] and filled in + * using other available strategies. * * TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER... * PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER. @@ -93,7 +94,10 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { pruned } - /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ + /** + * Collects placeholders marked using [[GenericStrategy#planLater planLater]] + * by [[strategies]]. + */ protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] /** Prunes bad plans to prevent combinatorial explosion. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index b23ab1fa3514a..7e1f1d83cb3de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -1152,7 +1152,7 @@ class Column(val expr: Expression) extends Logging { def bitwiseXOR(other: Any): Column = withExpr { BitwiseXor(expr, lit(other).expr) } /** - * Define a windowing column. + * Defines a windowing column. * * {{{ * val w = Window.partitionBy("name").orderBy("id") @@ -1168,7 +1168,7 @@ class Column(val expr: Expression) extends Logging { def over(window: expressions.WindowSpec): Column = window.withAggregate(this) /** - * Define a empty analytic clause. In this case the analytic function is applied + * Defines an empty analytic clause. In this case the analytic function is applied * and presented for all rows in the result set. * * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 64755434784a0..485d2768ff92a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -35,12 +35,13 @@ import org.apache.spark.sql.types.NumericType import org.apache.spark.sql.types.StructType /** - * A set of methods for aggregations on a `DataFrame`, created by `Dataset.groupBy`. + * A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]], + * [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also [[pivot]]). * - * The main method is the agg function, which has multiple variants. This class also contains - * convenience some first order statistics such as mean, sum for convenience. + * The main method is the [[agg]] function, which has multiple variants. This class also contains + * some first-order statistics such as [[mean]], [[sum]] for convenience. * - * This class was named `GroupedData` in Spark 1.x. + * @note This class was named `GroupedData` in Spark 1.x. * * @since 2.0.0 */ @@ -297,8 +298,9 @@ class RelationalGroupedDataset protected[sql]( } /** - * Pivots a column of the current `DataFrame` and perform the specified aggregation. - * There are two versions of pivot function: one that requires the caller to specify the list + * Pivots a column of the current `DataFrame` and performs the specified aggregation. + * + * There are two versions of `pivot` function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. * @@ -337,7 +339,7 @@ class RelationalGroupedDataset protected[sql]( } /** - * Pivots a column of the current `DataFrame` and perform the specified aggregation. + * Pivots a column of the current `DataFrame` and performs the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -369,7 +371,9 @@ class RelationalGroupedDataset protected[sql]( } /** - * Pivots a column of the current `DataFrame` and perform the specified aggregation. + * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified + * aggregation. + * * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -433,10 +437,6 @@ class RelationalGroupedDataset protected[sql]( } } - -/** - * Companion object for GroupedData. - */ private[sql] object RelationalGroupedDataset { def apply( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c4ed96640eb19..fead315b25248 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -72,24 +72,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Return all metadata that describes more details of this SparkPlan. + * Returns all metadata that describes more details of this SparkPlan. */ def metadata: Map[String, String] = Map.empty /** - * Return all metrics containing metrics of this SparkPlan. + * Returns all metrics containing metrics of this SparkPlan. */ def metrics: Map[String, SQLMetric] = Map.empty /** - * Reset all the metrics. + * Resets all the metrics. */ def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) } /** - * Return a LongSQLMetric according to the name. + * Returns a [[SQLMetric]] according to the name. */ def longMetric(name: String): SQLMetric = metrics(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index c1e1a631c677e..ac30b11557adb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -70,7 +70,7 @@ trait CodegenSupport extends SparkPlan { /** * Returns all the RDDs of InternalRow which generates the input rows. * - * Note: right now we support up to two RDDs. + * @note Right now we support up to two RDDs */ def inputRDDs(): Seq[RDD[InternalRow]] @@ -227,7 +227,7 @@ trait CodegenSupport extends SparkPlan { /** - * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. + * InputAdapter is used to hide a SparkPlan from a subtree that supports codegen. * * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. @@ -282,10 +282,10 @@ object WholeStageCodegenExec { } /** - * WholeStageCodegen compile a subtree of plans that support codegen together into single Java + * WholeStageCodegen compiles a subtree of plans that support codegen together into single Java * function. * - * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): + * Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not): * * WholeStageCodegen Plan A FakeInput Plan B * ========================================================================= @@ -304,10 +304,10 @@ object WholeStageCodegenExec { * | * doConsume() <-------- consume() * - * SparkPlan A should override doProduce() and doConsume(). + * SparkPlan A should override `doProduce()` and `doConsume()`. * - * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, - * used to generated code for BoundReference. + * `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, + * used to generated code for [[BoundReference]]. */ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 00053485e614c..cd79128d8f375 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -170,7 +170,7 @@ object Window { * and `Window.currentRow` to specify special boundary values, rather than using integral * values directly. * - * A range based boundary is based on the actual value of the ORDER BY + * A range-based boundary is based on the actual value of the ORDER BY * expression(s). An offset is used to alter the value of the ORDER BY expression, for * instance if the current order by expression has a value of 10 and the lower bound offset * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a @@ -184,7 +184,7 @@ object Window { * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) * .toDF("id", "category") * val byCategoryOrderedById = - * Window.partitionBy('category).orderBy('id).rowsBetween(Window.currentRow, 1) + * Window.partitionBy('category).orderBy('id).rangeBetween(Window.currentRow, 1) * df.withColumn("sum", sum('id) over byCategoryOrderedById).show() * * +---+--------+---+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5edf03666ac22..d0a893d909473 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1265,7 +1265,7 @@ object functions { /** * Parses the expression string into the column that it represents, similar to - * DataFrame.selectExpr + * [[Dataset#selectExpr]]. * {{{ * // get the number of words of each length * df.groupBy(expr("length(word)")).count() @@ -2385,7 +2385,7 @@ object functions { def rtrim(e: Column): Column = withExpr { StringTrimRight(e.expr) } /** - * * Return the soundex code for the specified expression. + * Returns the soundex code for the specified expression. * * @group string_funcs * @since 1.5.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 2a801d87b12eb..2532b2ddb72df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -57,7 +57,7 @@ abstract class BaseSessionStateBuilder( type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder /** - * Function that produces a new instance of the SessionStateBuilder. This is used by the + * Function that produces a new instance of the `BaseSessionStateBuilder`. This is used by the * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own * [[SessionStateBuilder]]. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 1b341a12fc609..ac013ecf12ce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -109,7 +109,7 @@ private[sql] object SessionState { } /** - * Concrete implementation of a [[SessionStateBuilder]]. + * Concrete implementation of a [[BaseSessionStateBuilder]]. */ @Experimental @InterfaceStability.Unstable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ff8b15b3ff3ff..7f06698ce5a2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -91,7 +91,7 @@ trait RelationProvider { * * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that * users need to provide a schema when using a [[SchemaRelationProvider]]. - * A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]] + * A relation provider can inherit both [[RelationProvider]] and [[SchemaRelationProvider]] * if it can support both schema inference and user-specified schemas. * * @since 1.3.0 From 6a95691a1d9a025a01b3c7c1ac08bf8273c88607 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 25 May 2017 09:14:09 +0200 Subject: [PATCH 2/9] More scaladoc fixes + explicit type for internal lazy value --- .../execution/window/AggregateProcessor.scala | 17 +++++++++-------- .../spark/sql/execution/window/WindowExec.scala | 8 +++++--- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala index c9f5d3b3d92d7..bc141b36e63b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala @@ -26,17 +26,17 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ /** * This class prepares and manages the processing of a number of [[AggregateFunction]]s within a - * single frame. The [[WindowFunctionFrame]] takes care of processing the frame in the correct way, - * this reduces the processing of a [[AggregateWindowFunction]] to processing the underlying + * single frame. The [[WindowFunctionFrame]] takes care of processing the frame in the correct way + * that reduces the processing of a [[AggregateWindowFunction]] to processing the underlying * [[AggregateFunction]]. All [[AggregateFunction]]s are processed in [[Complete]] mode. * * [[SizeBasedWindowFunction]]s are initialized in a slightly different way. These functions - * require the size of the partition processed, this value is exposed to them when the processor is - * constructed. + * require the size of the partition processed and this value is exposed to them when + * the processor is constructed. * * Processing of distinct aggregates is currently not supported. * - * The implementation is split into an object which takes care of construction, and a the actual + * The implementation is split into an object which takes care of construction, and the actual * processor class. */ private[window] object AggregateProcessor { @@ -90,7 +90,7 @@ private[window] object AggregateProcessor { updateExpressions ++= noOps evaluateExpressions += imperative case other => - sys.error(s"Unsupported Aggregate Function: $other") + sys.error(s"Unsupported aggregate function: $other") } // Create the projections. @@ -154,6 +154,7 @@ private[window] final class AggregateProcessor( } /** Evaluate buffer. */ - def evaluate(target: InternalRow): Unit = - evaluateProjection.target(target)(buffer) + def evaluate(target: InternalRow): Unit = { + evaluateProjection.target(target)(buffer) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 950a6794a74a3..7089c1ae33ee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -153,10 +153,12 @@ case class WindowExec( } /** - * Collection containing an entry for each window frame to process. Each entry contains a frames' - * WindowExpressions and factory function for the WindowFrameFunction. + * Collection containing an entry for each window frame to process. Each entry contains a frame's + * [[WindowExpression]]s and factory function for the WindowFrameFunction. */ - private[this] lazy val windowFrameExpressionFactoryPairs = { + private[this] lazy val windowFrameExpressionFactoryPairs: + Seq[(mutable.Buffer[WindowExpression], InternalRow => WindowFunctionFrame)] = + { type FrameKey = (String, FrameType, Option[Int], Option[Int]) type ExpressionBuffer = mutable.Buffer[Expression] val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)] From ee9d00605c86e27eb8eac0c5dfe822d2401eeac4 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 25 May 2017 09:34:17 +0200 Subject: [PATCH 3/9] Fix types for windowFrameExpressionFactoryPairs --- .../apache/spark/sql/execution/window/WindowExec.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 7089c1ae33ee0..a31c1cc2f25da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -161,13 +161,16 @@ case class WindowExec( { type FrameKey = (String, FrameType, Option[Int], Option[Int]) type ExpressionBuffer = mutable.Buffer[Expression] - val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)] + type WindowExpressionBuffer = mutable.Buffer[WindowExpression] + val framedFunctions = mutable.Map.empty[FrameKey, (WindowExpressionBuffer, ExpressionBuffer)] // Add a function and its function to the map for a given frame. - def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = { + def collect( + tpe: String, fr: SpecifiedWindowFrame, e: WindowExpression, fn: Expression): Unit = + { val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd)) val (es, fns) = framedFunctions.getOrElseUpdate( - key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression])) + key, (ArrayBuffer.empty[WindowExpression], ArrayBuffer.empty[Expression])) es += e fns += fn } From 9479cccb9d21fb3054878587770288c4ea77e262 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 25 May 2017 09:34:46 +0200 Subject: [PATCH 4/9] Another typo --- .../scala/org/apache/spark/sql/expressions/WindowSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 6279d48c94de5..f653890f6c7ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -137,7 +137,7 @@ class WindowSpec private[sql]( * and `Window.currentRow` to specify special boundary values, rather than using integral * values directly. * - * A range based boundary is based on the actual value of the ORDER BY + * A range-based boundary is based on the actual value of the ORDER BY * expression(s). An offset is used to alter the value of the ORDER BY expression, for * instance if the current order by expression has a value of 10 and the lower bound offset * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a From 10104f2b2043bfd95828a62a8c8207fd32fa0987 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 25 May 2017 09:50:33 +0200 Subject: [PATCH 5/9] After review --- .../spark/sql/RelationalGroupedDataset.scala | 6 ++--- .../spark/sql/execution/SparkPlan.scala | 26 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 485d2768ff92a..147b549964913 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -36,10 +36,10 @@ import org.apache.spark.sql.types.StructType /** * A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]], - * [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also [[pivot]]). + * [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also `pivot`). * - * The main method is the [[agg]] function, which has multiple variants. This class also contains - * some first-order statistics such as [[mean]], [[sum]] for convenience. + * The main method is the `agg` function, which has multiple variants. This class also contains + * some first-order statistics such as `mean`, `sum` for convenience. * * @note This class was named `GroupedData` in Spark 1.x. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index fead315b25248..db975614c961a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -72,12 +72,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Returns all metadata that describes more details of this SparkPlan. + * @return Metadata that describes more details of this SparkPlan. */ def metadata: Map[String, String] = Map.empty /** - * Returns all metrics containing metrics of this SparkPlan. + * @return All metrics containing metrics of this SparkPlan. */ def metrics: Map[String, SQLMetric] = Map.empty @@ -89,7 +89,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Returns a [[SQLMetric]] according to the name. + * @return [[SQLMetric]] for the `name`. */ def longMetric(name: String): SQLMetric = metrics(name) @@ -128,7 +128,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Execute a query after preparing the query and adding query plan information to created RDDs + * Executes a query after preparing the query and adding query plan information to created RDDs * for visualization. */ protected final def executeQuery[T](query: => T): T = { @@ -176,7 +176,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private var prepared = false /** - * Prepare a SparkPlan for execution. It's idempotent. + * Prepares this SparkPlan for execution. It's idempotent. */ final def prepare(): Unit = { // doPrepare() may depend on it's children, we should call prepare() on all the children first. @@ -195,22 +195,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * `execute` of SparkPlan. This is helpful if we want to set up some state before executing the * query, e.g., `BroadcastHashJoin` uses it to broadcast asynchronously. * - * Note: the prepare method has already walked down the tree, so the implementation doesn't need - * to call children's prepare methods. + * @note `prepare` method has already walked down the tree, so the implementation doesn't have + * to call children's `prepare` methods. * * This will only be called once, protected by `this`. */ protected def doPrepare(): Unit = {} /** + * Produces the result of the query as an `RDD[InternalRow]` + * * Overridden by concrete implementations of SparkPlan. - * Produces the result of the query as an RDD[InternalRow] */ protected def doExecute(): RDD[InternalRow] /** - * Overridden by concrete implementations of SparkPlan. * Produces the result of the query as a broadcast variable. + * + * Overridden by concrete implementations of SparkPlan. */ protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { throw new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast") @@ -245,7 +247,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Decode the byte arrays back to UnsafeRows and put them into buffer. + * Decodes the byte arrays back to UnsafeRows and put them into buffer. */ private def decodeUnsafeRows(bytes: Array[Byte]): Iterator[InternalRow] = { val nFields = schema.length @@ -284,7 +286,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Runs this query returning the result as an iterator of InternalRow. * - * Note: this will trigger multiple jobs (one for each partition). + * @note Triggers multiple jobs (one for each partition). */ def executeToIterator(): Iterator[InternalRow] = { getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) @@ -301,7 +303,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Runs this query returning the first `n` rows as an array. * - * This is modeled after RDD.take but never runs any job locally on the driver. + * This is modeled after `RDD.take` but never runs any job locally on the driver. */ def executeTake(n: Int): Array[InternalRow] = { if (n == 0) { From d853b9740733e0695f8667d1631b47e5ef2d986f Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sat, 27 May 2017 20:21:31 +0200 Subject: [PATCH 6/9] After review (reverting changes) --- .../spark/sql/execution/window/WindowExec.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index a31c1cc2f25da..23071855e0dd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -156,21 +156,17 @@ case class WindowExec( * Collection containing an entry for each window frame to process. Each entry contains a frame's * [[WindowExpression]]s and factory function for the WindowFrameFunction. */ - private[this] lazy val windowFrameExpressionFactoryPairs: - Seq[(mutable.Buffer[WindowExpression], InternalRow => WindowFunctionFrame)] = - { + private[this] lazy val windowFrameExpressionFactoryPairs = { type FrameKey = (String, FrameType, Option[Int], Option[Int]) type ExpressionBuffer = mutable.Buffer[Expression] type WindowExpressionBuffer = mutable.Buffer[WindowExpression] - val framedFunctions = mutable.Map.empty[FrameKey, (WindowExpressionBuffer, ExpressionBuffer)] + val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)] // Add a function and its function to the map for a given frame. - def collect( - tpe: String, fr: SpecifiedWindowFrame, e: WindowExpression, fn: Expression): Unit = - { + def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = { val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd)) val (es, fns) = framedFunctions.getOrElseUpdate( - key, (ArrayBuffer.empty[WindowExpression], ArrayBuffer.empty[Expression])) + key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression])) es += e fns += fn } From a95cba00510c0fcb152b67d29c1d1453fd09ee01 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sun, 28 May 2017 22:01:29 +0200 Subject: [PATCH 7/9] Scaladoc --- .../apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index cc4d38ea1f95c..efc2882f0a3d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -208,7 +208,8 @@ object ExpressionEncoder { } /** - * A generic encoder for JVM objects. + * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` + * and a `deserializer`. * * @param schema The schema after converting `T` to a Spark SQL row. * @param serializer A set of expressions, one for each top-level field that can be used to From 379e775ca8fad55f59b5d9f765ef662b866f3224 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Mon, 29 May 2017 11:20:00 +0200 Subject: [PATCH 8/9] Another typo hunted --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2109c1c23b706..df66f9a082aee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -519,7 +519,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { protected def innerChildren: Seq[TreeNode[_]] = Seq.empty /** - * Appends the string represent of this node and its children to the given StringBuilder. + * Appends the string representation of this node and its children to the given StringBuilder. * * The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at * depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and From a79fa0a0caf027ae363ee8348d25734266765ab2 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 30 May 2017 19:51:06 +0200 Subject: [PATCH 9/9] Reverting non-scaladoc changes --- .../scala/org/apache/spark/sql/execution/window/WindowExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 23071855e0dd1..1820cb0ef540b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -159,7 +159,6 @@ case class WindowExec( private[this] lazy val windowFrameExpressionFactoryPairs = { type FrameKey = (String, FrameType, Option[Int], Option[Int]) type ExpressionBuffer = mutable.Buffer[Expression] - type WindowExpressionBuffer = mutable.Buffer[WindowExpression] val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)] // Add a function and its function to the map for a given frame.