Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shahidki31 committed Dec 4, 2018
1 parent 2235967 commit f8cfb54
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
42 changes: 21 additions & 21 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, LevelDB}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

/**
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
Expand Down Expand Up @@ -148,18 +148,18 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
if (store.isInstanceOf[LevelDB]) {
if (store.isInstanceOf[InMemoryStore]) {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.closeableIterator()
} else {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
}
) { it =>
Expand Down Expand Up @@ -230,14 +230,26 @@ private[spark] class AppStatusStore(
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

// TODO Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
// For InMemory case, it is efficient to find using the following code. But for diskStore case
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
// rework on the way indexing works, so that we can index by specific metrics for successful
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
if (store.isInstanceOf[LevelDB]) {
if (store.isInstanceOf[InMemoryStore]) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
} else {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
Expand All @@ -262,18 +274,6 @@ private[spark] class AppStatusStore(
}
}.toIndexedSeq
}
} else {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class AppStatusStoreSuite extends SparkFunSuite {

test("only successfull task have taskSummary") {
val store = new InMemoryStore()
(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) }
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
assert(appStore.size === 0)
}
Expand Down

0 comments on commit f8cfb54

Please sign in to comment.