Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into concurrent-sql-…
Browse files Browse the repository at this point in the history
…executions

Conflicts:
	core/src/test/scala/org/apache/spark/ThreadingSuite.scala
  • Loading branch information
Andrew Or committed Sep 14, 2015
2 parents fce3819 + 7b6c856 commit 75a8d90
Show file tree
Hide file tree
Showing 28 changed files with 813 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
info.accumulables
.find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
.map { acc => acc.value.toLong }
.map { acc => acc.update.getOrElse("0").toLong }
.getOrElse(0L)
.toDouble
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
throwable.foreach { t => throw improveStackTrace(t) }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
throwable.foreach { t => throw improveStackTrace(t) }
}

test("set local properties in different thread") {
Expand All @@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === null)
}

test("set and get local properties in parent-children thread") {
Expand Down Expand Up @@ -207,9 +207,9 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
throwable.foreach { t => throw improveStackTrace(t) }
}

test("mutation in parent local property does not affect child (SPARK-10563)") {
Expand Down
29 changes: 22 additions & 7 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.xml.Node

import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite, Success}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
Expand All @@ -47,6 +47,14 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
assert(html3.contains(targetString))
}

test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains("<td>10.0 b</td>" * 5))
}

/**
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
Expand All @@ -67,12 +75,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
val taskInfo = new TaskInfo(0, 0, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
// Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
(1 to 2).foreach {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
val peakExecutionMemory = 10
taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
}
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object Main extends Logging {
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
var interp = new SparkILoop // this is a public var because tests reset it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object DateTimeUtils {
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val MICROS_PER_SECOND = 1000L * 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY

final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

Expand Down Expand Up @@ -190,13 +191,14 @@ object DateTimeUtils {

/**
* Returns Julian day and nanoseconds in a day from the number of microseconds
*
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val seconds = us / MICROS_PER_SECOND
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (us % MICROS_PER_SECOND) * 1000L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY
(day.toInt, micros * 1000L)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ class DateTimeUtilsSuite extends SparkFunSuite {
assert(ns === 0)
assert(fromJulianDay(d, ns) == 0L)

val t = Timestamp.valueOf("2015-06-11 10:10:10.100")
val (d1, ns1) = toJulianDay(fromJavaTimestamp(t))
val t1 = toJavaTimestamp(fromJulianDay(d1, ns1))
assert(t.equals(t1))

val t2 = Timestamp.valueOf("2015-06-11 20:10:10.100")
val (d2, ns2) = toJulianDay(fromJavaTimestamp(t2))
val t22 = toJavaTimestamp(fromJulianDay(d2, ns2))
assert(t2.equals(t22))
Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"),
Timestamp.valueOf("2015-06-11 20:10:10.100"),
Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t =>
val (d, ns) = toJulianDay(fromJavaTimestamp(t))
assert(ns > 0)
val t1 = toJavaTimestamp(fromJulianDay(d, ns))
assert(t.equals(t1))
}
}

test("SPARK-6785: java date conversion before and after epoch") {
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
Expand Down Expand Up @@ -114,7 +114,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
@DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable {

// Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
Expand Down
138 changes: 14 additions & 124 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
Expand Down Expand Up @@ -188,9 +192,11 @@ class SQLContext(@transient val sparkContext: SparkContext)

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executeSql(sql: String):
org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))

protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
protected[sql] def executePlan(plan: LogicalPlan) =
new sparkexecution.QueryExecution(this, plan)

@transient
protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
Expand Down Expand Up @@ -781,77 +787,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}.toArray
}

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext

val sqlContext: SQLContext = self

def codegenEnabled: Boolean = self.conf.codegenEnabled

def unsafeEnabled: Boolean = self.conf.unsafeEnabled

def numPartitions: Int = self.conf.numShufflePartitions

def strategies: Seq[Strategy] =
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrderedAndProject ::
HashAggregation ::
Aggregation ::
LeftSemiJoin ::
EquiJoinSelection ::
InMemoryScans ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)

/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
* Filter nodes added only when needed. For example, a Project operator is only used when the
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
// optimization with the current implementation would change the output schema.
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.

if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}
@deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0")
protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this)

@transient
protected[sql] val planner = new SparkPlanner
protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)

@transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
Expand Down Expand Up @@ -898,59 +838,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val conf: SQLConf = new SQLConf
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected[sql] class QueryExecution(val logical: LogicalPlan) {
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)

lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)

// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(self)
planner.plan(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }

def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim

override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
@deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
protected[sql] class QueryExecution(logical: LogicalPlan)
extends sparkexecution.QueryExecution(this, logical)

/**
* Parses the data type in our internal string representation. The data type string should
Expand Down
Loading

0 comments on commit 75a8d90

Please sign in to comment.