diff --git a/core/src/main/scala/org/apache/spark/annotation/AlphaComponent.java b/core/src/main/java/org/apache/spark/annotation/AlphaComponent.java
similarity index 100%
rename from core/src/main/scala/org/apache/spark/annotation/AlphaComponent.java
rename to core/src/main/java/org/apache/spark/annotation/AlphaComponent.java
diff --git a/core/src/main/scala/org/apache/spark/annotation/DeveloperApi.java b/core/src/main/java/org/apache/spark/annotation/DeveloperApi.java
similarity index 100%
rename from core/src/main/scala/org/apache/spark/annotation/DeveloperApi.java
rename to core/src/main/java/org/apache/spark/annotation/DeveloperApi.java
diff --git a/core/src/main/scala/org/apache/spark/annotation/Experimental.java b/core/src/main/java/org/apache/spark/annotation/Experimental.java
similarity index 100%
rename from core/src/main/scala/org/apache/spark/annotation/Experimental.java
rename to core/src/main/java/org/apache/spark/annotation/Experimental.java
diff --git a/core/src/main/scala/org/apache/spark/annotation/Private.java b/core/src/main/java/org/apache/spark/annotation/Private.java
similarity index 100%
rename from core/src/main/scala/org/apache/spark/annotation/Private.java
rename to core/src/main/java/org/apache/spark/annotation/Private.java
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 4adc6596ba21c..2b71f55b7bb4f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -368,7 +368,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
info.accumulables
.find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
- .map { acc => acc.value.toLong }
+ .map { acc => acc.update.getOrElse("0").toLong }
.getOrElse(0L)
.toDouble
}
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 93cba322d8521..54c131cdae367 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
+ throwable.foreach { t => throw improveStackTrace(t) }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
- throwable.foreach { t => throw improveStackTrace(t) }
}
test("set local properties in different thread") {
@@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())
sem.acquire(5)
- assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw improveStackTrace(t) }
+ assert(sc.getLocalProperty("test") === null)
}
test("set and get local properties in parent-children thread") {
@@ -207,9 +207,9 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())
sem.acquire(5)
+ throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
- throwable.foreach { t => throw improveStackTrace(t) }
}
test("mutation in parent local property does not affect child (SPARK-10563)") {
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 3388c6dca81f1..86699e7f56953 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -23,7 +23,7 @@ import scala.xml.Node
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite, Success}
+import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
@@ -47,6 +47,14 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
assert(html3.contains(targetString))
}
+ test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
+ val unsafeConf = "spark.sql.unsafe.enabled"
+ val conf = new SparkConf(false).set(unsafeConf, "true")
+ val html = renderStagePage(conf).toString().toLowerCase
+ // verify min/25/50/75/max show task value not cumulative values
+ assert(html.contains("
10.0 b | " * 5))
+ }
+
/**
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
@@ -67,12 +75,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
- val taskInfo = new TaskInfo(0, 0, 0, 0, "0", "localhost", TaskLocality.ANY, false)
- jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
- jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
- taskInfo.markSuccessful()
- jobListener.onTaskEnd(
- SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
+ // Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
+ (1 to 2).foreach {
+ taskId =>
+ val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
+ val peakExecutionMemory = 10
+ taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
+ Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
+ jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
+ jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
+ taskInfo.markSuccessful()
+ jobListener.onTaskEnd(
+ SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
+ }
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/java/org/apache/spark/graphx/TripletFields.java
similarity index 100%
rename from graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
rename to graphx/src/main/java/org/apache/spark/graphx/TripletFields.java
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java b/graphx/src/main/java/org/apache/spark/graphx/impl/EdgeActiveness.java
similarity index 100%
rename from graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
rename to graphx/src/main/java/org/apache/spark/graphx/impl/EdgeActiveness.java
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index be31eb2eda546..627148df80c11 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -35,7 +35,8 @@ object Main extends Logging {
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
- val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
+ // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
+ lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
var interp = new SparkILoop // this is a public var because tests reset it.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java b/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java
similarity index 100%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d652fce3fd9b6..687ca000d12bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -42,6 +42,7 @@ object DateTimeUtils {
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val MICROS_PER_SECOND = 1000L * 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
+ final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
@@ -190,13 +191,14 @@ object DateTimeUtils {
/**
* Returns Julian day and nanoseconds in a day from the number of microseconds
+ *
+ * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
- val seconds = us / MICROS_PER_SECOND
- val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
- val secondsInDay = seconds % SECONDS_PER_DAY
- val nanos = (us % MICROS_PER_SECOND) * 1000L
- (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
+ val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
+ val day = julian_us / MICROS_PER_DAY
+ val micros = julian_us % MICROS_PER_DAY
+ (day.toInt, micros * 1000L)
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 1596bb79fa94b..6b9a11f0ff743 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -52,15 +52,14 @@ class DateTimeUtilsSuite extends SparkFunSuite {
assert(ns === 0)
assert(fromJulianDay(d, ns) == 0L)
- val t = Timestamp.valueOf("2015-06-11 10:10:10.100")
- val (d1, ns1) = toJulianDay(fromJavaTimestamp(t))
- val t1 = toJavaTimestamp(fromJulianDay(d1, ns1))
- assert(t.equals(t1))
-
- val t2 = Timestamp.valueOf("2015-06-11 20:10:10.100")
- val (d2, ns2) = toJulianDay(fromJavaTimestamp(t2))
- val t22 = toJavaTimestamp(fromJulianDay(d2, ns2))
- assert(t2.equals(t22))
+ Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"),
+ Timestamp.valueOf("2015-06-11 20:10:10.100"),
+ Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t =>
+ val (d, ns) = toJulianDay(fromJavaTimestamp(t))
+ assert(ns > 0)
+ val t1 = toJavaTimestamp(fromJulianDay(d, ns))
+ assert(t.equals(t1))
+ }
}
test("SPARK-6785: java date conversion before and after epoch") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 1a687b2374f14..3e61123c145cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -114,7 +114,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
- @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
+ @DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable {
// Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4e8414af50b44..e3fdd782e6ff6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -38,6 +38,10 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
+import org.apache.spark.sql.execution.{Filter, _}
+import org.apache.spark.sql.{execution => sparkexecution}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
@@ -188,9 +192,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
- protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
+ protected[sql] def executeSql(sql: String):
+ org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
- protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
+ protected[sql] def executePlan(plan: LogicalPlan) =
+ new sparkexecution.QueryExecution(this, plan)
@transient
protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
@@ -781,77 +787,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}.toArray
}
- protected[sql] class SparkPlanner extends SparkStrategies {
- val sparkContext: SparkContext = self.sparkContext
-
- val sqlContext: SQLContext = self
-
- def codegenEnabled: Boolean = self.conf.codegenEnabled
-
- def unsafeEnabled: Boolean = self.conf.unsafeEnabled
-
- def numPartitions: Int = self.conf.numShufflePartitions
-
- def strategies: Seq[Strategy] =
- experimental.extraStrategies ++ (
- DataSourceStrategy ::
- DDLStrategy ::
- TakeOrderedAndProject ::
- HashAggregation ::
- Aggregation ::
- LeftSemiJoin ::
- EquiJoinSelection ::
- InMemoryScans ::
- BasicOperators ::
- CartesianProduct ::
- BroadcastNestedLoopJoin :: Nil)
-
- /**
- * Used to build table scan operators where complex projection and filtering are done using
- * separate physical operators. This function returns the given scan operator with Project and
- * Filter nodes added only when needed. For example, a Project operator is only used when the
- * final desired output requires complex expressions to be evaluated or when columns can be
- * further eliminated out after filtering has been done.
- *
- * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
- * away by the filter pushdown optimization.
- *
- * The required attributes for both filtering and expression evaluation are passed to the
- * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
- */
- def pruneFilterProject(
- projectList: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- prunePushedDownFilters: Seq[Expression] => Seq[Expression],
- scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
-
- val projectSet = AttributeSet(projectList.flatMap(_.references))
- val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
- val filterCondition =
- prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
-
- // Right now we still use a projection even if the only evaluation is applying an alias
- // to a column. Since this is a no-op, it could be avoided. However, using this
- // optimization with the current implementation would change the output schema.
- // TODO: Decouple final output schema from expression evaluation so this copy can be
- // avoided safely.
-
- if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
- filterSet.subsetOf(projectSet)) {
- // When it is possible to just use column pruning to get the right projection and
- // when the columns of this projection are enough to evaluate all filter conditions,
- // just do a scan followed by a filter, with no extra project.
- val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
- filterCondition.map(Filter(_, scan)).getOrElse(scan)
- } else {
- val scan = scanBuilder((projectSet ++ filterSet).toSeq)
- Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
- }
- }
- }
+ @deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0")
+ protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this)
@transient
- protected[sql] val planner = new SparkPlanner
+ protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
@transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
@@ -898,59 +838,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val conf: SQLConf = new SQLConf
}
- /**
- * :: DeveloperApi ::
- * The primary workflow for executing relational queries using Spark. Designed to allow easy
- * access to the intermediate phases of query execution for developers.
- */
- @DeveloperApi
- protected[sql] class QueryExecution(val logical: LogicalPlan) {
- def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
-
- lazy val analyzed: LogicalPlan = analyzer.execute(logical)
- lazy val withCachedData: LogicalPlan = {
- assertAnalyzed()
- cacheManager.useCachedData(analyzed)
- }
- lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
-
- // TODO: Don't just pick the first one...
- lazy val sparkPlan: SparkPlan = {
- SparkPlan.currentContext.set(self)
- planner.plan(optimizedPlan).next()
- }
- // executedPlan should not be used to initialize any SparkPlan. It should be
- // only used for execution.
- lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
-
- /** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
-
- protected def stringOrError[A](f: => A): String =
- try f.toString catch { case e: Throwable => e.toString }
-
- def simpleString: String =
- s"""== Physical Plan ==
- |${stringOrError(executedPlan)}
- """.stripMargin.trim
-
- override def toString: String = {
- def output =
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
-
- s"""== Parsed Logical Plan ==
- |${stringOrError(logical)}
- |== Analyzed Logical Plan ==
- |${stringOrError(output)}
- |${stringOrError(analyzed)}
- |== Optimized Logical Plan ==
- |${stringOrError(optimizedPlan)}
- |== Physical Plan ==
- |${stringOrError(executedPlan)}
- |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
- """.stripMargin.trim
- }
- }
+ @deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
+ protected[sql] class QueryExecution(logical: LogicalPlan)
+ extends sparkexecution.QueryExecution(this, logical)
/**
* Parses the data type in our internal string representation. The data type string should
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
new file mode 100644
index 0000000000000..7bb4133a29059
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, optimizer}
+import org.apache.spark.sql.{SQLContext, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * :: DeveloperApi ::
+ * The primary workflow for executing relational queries using Spark. Designed to allow easy
+ * access to the intermediate phases of query execution for developers.
+ */
+@DeveloperApi
+class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
+ val analyzer = sqlContext.analyzer
+ val optimizer = sqlContext.optimizer
+ val planner = sqlContext.planner
+ val cacheManager = sqlContext.cacheManager
+ val prepareForExecution = sqlContext.prepareForExecution
+
+ def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+
+ lazy val analyzed: LogicalPlan = analyzer.execute(logical)
+ lazy val withCachedData: LogicalPlan = {
+ assertAnalyzed()
+ cacheManager.useCachedData(analyzed)
+ }
+ lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+ // TODO: Don't just pick the first one...
+ lazy val sparkPlan: SparkPlan = {
+ SparkPlan.currentContext.set(sqlContext)
+ planner.plan(optimizedPlan).next()
+ }
+ // executedPlan should not be used to initialize any SparkPlan. It should be
+ // only used for execution.
+ lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+
+ /** Internal version of the RDD. Avoids copies and has no schema */
+ lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+
+ protected def stringOrError[A](f: => A): String =
+ try f.toString catch { case e: Throwable => e.toString }
+
+ def simpleString: String =
+ s"""== Physical Plan ==
+ |${stringOrError(executedPlan)}
+ """.stripMargin.trim
+
+
+ override def toString: String = {
+ def output =
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
+
+ s"""== Parsed Logical Plan ==
+ |${stringOrError(logical)}
+ |== Analyzed Logical Plan ==
+ |${stringOrError(output)}
+ |${stringOrError(analyzed)}
+ |== Optimized Logical Plan ==
+ |${stringOrError(optimizedPlan)}
+ |== Physical Plan ==
+ |${stringOrError(executedPlan)}
+ |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
+ """.stripMargin.trim
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index cee58218a885b..1422e15549c94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -37,7 +37,7 @@ private[sql] object SQLExecution {
* we can connect them with an execution.
*/
def withNewExecutionId[T](
- sqlContext: SQLContext, queryExecution: SQLContext#QueryExecution)(body: => T): T = {
+ sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
val sc = sqlContext.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
if (oldExecutionId == null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
new file mode 100644
index 0000000000000..b346f43faebe2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+@Experimental
+class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
+ val sparkContext: SparkContext = sqlContext.sparkContext
+
+ def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
+
+ def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
+
+ def numPartitions: Int = sqlContext.conf.numShufflePartitions
+
+ def strategies: Seq[Strategy] =
+ sqlContext.experimental.extraStrategies ++ (
+ DataSourceStrategy ::
+ DDLStrategy ::
+ TakeOrderedAndProject ::
+ HashAggregation ::
+ Aggregation ::
+ LeftSemiJoin ::
+ EquiJoinSelection ::
+ InMemoryScans ::
+ BasicOperators ::
+ CartesianProduct ::
+ BroadcastNestedLoopJoin :: Nil)
+
+ /**
+ * Used to build table scan operators where complex projection and filtering are done using
+ * separate physical operators. This function returns the given scan operator with Project and
+ * Filter nodes added only when needed. For example, a Project operator is only used when the
+ * final desired output requires complex expressions to be evaluated or when columns can be
+ * further eliminated out after filtering has been done.
+ *
+ * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
+ * away by the filter pushdown optimization.
+ *
+ * The required attributes for both filtering and expression evaluation are passed to the
+ * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
+ */
+ def pruneFilterProject(
+ projectList: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ prunePushedDownFilters: Seq[Expression] => Seq[Expression],
+ scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
+
+ val projectSet = AttributeSet(projectList.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+ val filterCondition =
+ prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
+
+ // Right now we still use a projection even if the only evaluation is applying an alias
+ // to a column. Since this is a no-op, it could be avoided. However, using this
+ // optimization with the current implementation would change the output schema.
+ // TODO: Decouple final output schema from expression evaluation so this copy can be
+ // avoided safely.
+
+ if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
+ filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
+ filterCondition.map(Filter(_, scan)).getOrElse(scan)
+ } else {
+ val scan = scanBuilder((projectSet ++ filterSet).toSeq)
+ Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4572d5efc92bb..5e40d77689045 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
- self: SQLContext#SparkPlanner =>
+ self: SparkPlanner =>
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
new file mode 100644
index 0000000000000..2aff156d18b54
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}
+
+case class ExpandNode(
+ conf: SQLConf,
+ projections: Seq[Seq[Expression]],
+ output: Seq[Attribute],
+ child: LocalNode) extends UnaryLocalNode(conf) {
+
+ assert(projections.size > 0)
+
+ private[this] var result: InternalRow = _
+ private[this] var idx: Int = _
+ private[this] var input: InternalRow = _
+ private[this] var groups: Array[Projection] = _
+
+ override def open(): Unit = {
+ child.open()
+ groups = projections.map(ee => newProjection(ee, child.output)).toArray
+ idx = groups.length
+ }
+
+ override def next(): Boolean = {
+ if (idx >= groups.length) {
+ if (child.next()) {
+ input = child.fetch()
+ idx = 0
+ } else {
+ return false
+ }
+ }
+ result = groups(idx)(input)
+ idx += 1
+ true
+ }
+
+ override def fetch(): InternalRow = result
+
+ override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index e540ef8555eb6..9840080e16953 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{SQLConf, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType
@@ -69,6 +69,18 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
*/
def close(): Unit
+ /** Specifies whether this operator outputs UnsafeRows */
+ def outputsUnsafeRows: Boolean = false
+
+ /** Specifies whether this operator is capable of processing UnsafeRows */
+ def canProcessUnsafeRows: Boolean = false
+
+ /**
+ * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
+ * that are not UnsafeRows).
+ */
+ def canProcessSafeRows: Boolean = true
+
/**
* Returns the content through the [[Iterator]] interface.
*/
@@ -91,6 +103,28 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
result
}
+ protected def newProjection(
+ expressions: Seq[Expression],
+ inputSchema: Seq[Attribute]): Projection = {
+ log.debug(
+ s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
+ if (codegenEnabled) {
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate projection, fallback to interpret", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+ } else {
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
@@ -113,6 +147,25 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
}
}
+ protected def newPredicate(
+ expression: Expression,
+ inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
+ if (codegenEnabled) {
+ try {
+ GeneratePredicate.generate(expression, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate predicate, fallback to interpreted", e)
+ InterpretedPredicate.create(expression, inputSchema)
+ }
+ }
+ } else {
+ InterpretedPredicate.create(expression, inputSchema)
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
new file mode 100644
index 0000000000000..7321fc66b4dde
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+
+case class NestedLoopJoinNode(
+ conf: SQLConf,
+ left: LocalNode,
+ right: LocalNode,
+ buildSide: BuildSide,
+ joinType: JoinType,
+ condition: Option[Expression]) extends BinaryLocalNode(conf) {
+
+ override def output: Seq[Attribute] = {
+ joinType match {
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+ case x =>
+ throw new IllegalArgumentException(
+ s"NestedLoopJoin should not take $x as the JoinType")
+ }
+ }
+
+ private[this] def genResultProjection: InternalRow => InternalRow = {
+ if (outputsUnsafeRows) {
+ UnsafeProjection.create(schema)
+ } else {
+ identity[InternalRow]
+ }
+ }
+
+ private[this] var currentRow: InternalRow = _
+
+ private[this] var iterator: Iterator[InternalRow] = _
+
+ override def open(): Unit = {
+ val (streamed, build) = buildSide match {
+ case BuildRight => (left, right)
+ case BuildLeft => (right, left)
+ }
+ build.open()
+ val buildRelation = new CompactBuffer[InternalRow]
+ while (build.next()) {
+ buildRelation += build.fetch().copy()
+ }
+ build.close()
+
+ val boundCondition =
+ newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
+
+ val leftNulls = new GenericMutableRow(left.output.size)
+ val rightNulls = new GenericMutableRow(right.output.size)
+ val joinedRow = new JoinedRow
+ val matchedBuildTuples = new BitSet(buildRelation.size)
+ val resultProj = genResultProjection
+ streamed.open()
+
+ // streamedRowMatches also contains null rows if using outer join
+ val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
+ val matchedRows = new CompactBuffer[InternalRow]
+
+ var i = 0
+ var streamRowMatched = false
+
+ // Scan the build relation to look for matches for each streamed row
+ while (i < buildRelation.size) {
+ val buildRow = buildRelation(i)
+ buildSide match {
+ case BuildRight => joinedRow(streamedRow, buildRow)
+ case BuildLeft => joinedRow(buildRow, streamedRow)
+ }
+ if (boundCondition(joinedRow)) {
+ matchedRows += resultProj(joinedRow).copy()
+ streamRowMatched = true
+ matchedBuildTuples.set(i)
+ }
+ i += 1
+ }
+
+ // If this row had no matches and we're using outer join, join it with the null rows
+ if (!streamRowMatched) {
+ (joinType, buildSide) match {
+ case (LeftOuter | FullOuter, BuildRight) =>
+ matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
+ case (RightOuter | FullOuter, BuildLeft) =>
+ matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
+ case _ =>
+ }
+ }
+
+ matchedRows.iterator
+ }
+
+ // If we're using outer join, find rows on the build side that didn't match anything
+ // and join them with the null row
+ lazy val unmatchedBuildRows: Iterator[InternalRow] = {
+ var i = 0
+ buildRelation.filter { row =>
+ val r = !matchedBuildTuples.get(i)
+ i += 1
+ r
+ }.iterator
+ }
+ iterator = (joinType, buildSide) match {
+ case (RightOuter | FullOuter, BuildRight) =>
+ streamedRowMatches ++
+ unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
+ case (LeftOuter | FullOuter, BuildLeft) =>
+ streamedRowMatches ++
+ unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
+ case _ => streamedRowMatches
+ }
+ }
+
+ override def next(): Boolean = {
+ if (iterator.hasNext) {
+ currentRow = iterator.next()
+ true
+ } else {
+ false
+ }
+ }
+
+ override def fetch(): InternalRow = currentRow
+
+ override def close(): Unit = {
+ left.close()
+ right.close()
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
new file mode 100644
index 0000000000000..cfa7f3f6dcb97
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+class ExpandNodeSuite extends LocalNodeTest {
+
+ import testImplicits._
+
+ test("expand") {
+ val input = Seq((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)).toDF("key", "value")
+ checkAnswer(
+ input,
+ node =>
+ ExpandNode(conf, Seq(
+ Seq(
+ input.col("key") + input.col("value"), input.col("key") - input.col("value")
+ ).map(_.expr),
+ Seq(
+ input.col("key") * input.col("value"), input.col("key") / input.col("value")
+ ).map(_.expr)
+ ), node.output, node),
+ Seq(
+ (2, 0),
+ (1, 1),
+ (4, 0),
+ (4, 1),
+ (6, 0),
+ (9, 1),
+ (8, 0),
+ (16, 1),
+ (10, 0),
+ (25, 1)
+ ).toDF().collect()
+ )
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
index 43b6f06aead88..78d891351f4a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
@@ -24,20 +24,6 @@ class HashJoinNodeSuite extends LocalNodeTest {
import testImplicits._
- private def wrapForUnsafe(
- f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
- if (conf.unsafeEnabled) {
- (left: LocalNode, right: LocalNode) => {
- val _left = ConvertToUnsafeNode(conf, left)
- val _right = ConvertToUnsafeNode(conf, right)
- val r = f(_left, _right)
- ConvertToSafeNode(conf, r)
- }
- } else {
- f
- }
- }
-
def joinSuite(suiteName: String, confPairs: (String, String)*): Unit = {
test(s"$suiteName: inner join with one match per row") {
withSQLConf(confPairs: _*) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
index b95d4ea7f8f2a..86dd28064cc6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
@@ -27,6 +27,20 @@ class LocalNodeTest extends SparkFunSuite with SharedSQLContext {
def conf: SQLConf = sqlContext.conf
+ protected def wrapForUnsafe(
+ f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
+ if (conf.unsafeEnabled) {
+ (left: LocalNode, right: LocalNode) => {
+ val _left = ConvertToUnsafeNode(conf, left)
+ val _right = ConvertToUnsafeNode(conf, right)
+ val r = f(_left, _right)
+ ConvertToSafeNode(conf, r)
+ }
+ } else {
+ f
+ }
+ }
+
/**
* Runs the LocalNode and makes sure the answer matches the expected result.
* @param input the input data to be used.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
new file mode 100644
index 0000000000000..b1ef26ba82f16
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
@@ -0,0 +1,239 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+
+class NestedLoopJoinNodeSuite extends LocalNodeTest {
+
+ import testImplicits._
+
+ private def joinSuite(
+ suiteName: String, buildSide: BuildSide, confPairs: (String, String)*): Unit = {
+ test(s"$suiteName: left outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some((upperCaseData.col("N") === lowerCaseData.col("n")).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N", "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left").collect())
+ }
+ }
+
+ test(s"$suiteName: right outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N", "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right").collect())
+ }
+ }
+
+ test(s"$suiteName: full outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N", "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "full").collect())
+ }
+ }
+ }
+
+ joinSuite(
+ "general-build-left",
+ BuildLeft,
+ SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> "false")
+ joinSuite(
+ "general-build-right",
+ BuildRight,
+ SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> "false")
+ joinSuite(
+ "tungsten-build-left",
+ BuildLeft,
+ SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> "true")
+ joinSuite(
+ "tungsten-build-right",
+ BuildRight,
+ SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> "true")
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java b/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java
similarity index 100%
rename from streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
rename to streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index b08412414aa1c..17d9943c795e3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -105,9 +105,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
| place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
- | --num-executors NUM Number of executors to start (Default: 2)
| --executor-cores NUM Number of cores for the executors (Default: 1)
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+ | --properties-file FILE Path to a custom Spark properties file.
""".stripMargin)
// scalastyle:on println
System.exit(exitCode)