Skip to content

Commit

Permalink
Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-…
Browse files Browse the repository at this point in the history
…sql-executions-1.5

Conflicts:
	core/src/test/scala/org/apache/spark/ThreadingSuite.scala
  • Loading branch information
Andrew Or committed Sep 14, 2015
2 parents 4435db7 + 5db51f9 commit 3b9b462
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
info.accumulables
.find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
.map { acc => acc.value.toLong }
.map { acc => acc.update.getOrElse("0").toLong }
.getOrElse(0L)
.toDouble
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
throwable.foreach { t => throw improveStackTrace(t) }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
throwable.foreach { t => throw improveStackTrace(t) }
}

test("set local properties in different thread") {
Expand All @@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === null)
}

test("set and get local properties in parent-children thread") {
Expand Down Expand Up @@ -207,9 +207,9 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
throwable.foreach { t => throw improveStackTrace(t) }
}

test("mutation in parent local property does not affect child (SPARK-10563)") {
Expand Down
29 changes: 22 additions & 7 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.xml.Node

import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite, Success}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
Expand All @@ -47,6 +47,14 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
assert(html3.contains(targetString))
}

test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains("<td>10.0 b</td>" * 5))
}

/**
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
Expand All @@ -67,12 +75,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
val taskInfo = new TaskInfo(0, 0, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
// Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
(1 to 2).foreach {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
val peakExecutionMemory = 10
taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
}
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object Main extends Logging {
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
var interp = new SparkILoop // this is a public var because tests reset it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object DateTimeUtils {
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val MICROS_PER_SECOND = 1000L * 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY

final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

Expand Down Expand Up @@ -190,13 +191,14 @@ object DateTimeUtils {

/**
* Returns Julian day and nanoseconds in a day from the number of microseconds
*
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val seconds = us / MICROS_PER_SECOND
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (us % MICROS_PER_SECOND) * 1000L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY
(day.toInt, micros * 1000L)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ class DateTimeUtilsSuite extends SparkFunSuite {
assert(ns === 0)
assert(fromJulianDay(d, ns) == 0L)

val t = Timestamp.valueOf("2015-06-11 10:10:10.100")
val (d1, ns1) = toJulianDay(fromJavaTimestamp(t))
val t1 = toJavaTimestamp(fromJulianDay(d1, ns1))
assert(t.equals(t1))

val t2 = Timestamp.valueOf("2015-06-11 20:10:10.100")
val (d2, ns2) = toJulianDay(fromJavaTimestamp(t2))
val t22 = toJavaTimestamp(fromJulianDay(d2, ns2))
assert(t2.equals(t22))
Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"),
Timestamp.valueOf("2015-06-11 20:10:10.100"),
Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t =>
val (d, ns) = toJulianDay(fromJavaTimestamp(t))
assert(ns > 0)
val t1 = toJavaTimestamp(fromJulianDay(d, ns))
assert(t.equals(t1))
}
}

test("SPARK-6785: java date conversion before and after epoch") {
Expand Down

0 comments on commit 3b9b462

Please sign in to comment.