Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-7729:Executor which has been killed should also be displayed on… #6263

Closed
wants to merge 25 commits into from

Conversation

@archit279thakur
Copy link

archit279thakur commented May 19, 2015

… Executors Tab.

@JoshRosen

This comment has been minimized.

Copy link
Contributor

JoshRosen commented Oct 16, 2015

It looks like this PR and #6644 duplicate / overlap with each other.

@suyanNone

This comment has been minimized.

Copy link
Contributor

suyanNone commented Oct 26, 2015

Hi, @archit279thakur would you mind add the logic about adding a time expire to show lost-Executor log?

@archit279thakur

This comment has been minimized.

Copy link
Author

archit279thakur commented Oct 27, 2015

Sure, and time for expiration should be configuration based?

@suyanNone

This comment has been minimized.

Copy link
Contributor

suyanNone commented Nov 3, 2015

yean, make it configurable looks good

@archit279thakur

This comment has been minimized.

Copy link
Author

archit279thakur commented Nov 4, 2015

@suyanNone Can you please review my 2nd commit.

val localtestconf = new SparkConf().set(StorageStatusListener.TIME_TO_EXPIRE_KILLED_EXECUTOR,"5s")
val listener = new StorageStatusListener(localtestconf)
listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
Thread.sleep(5500)

This comment has been minimized.

Copy link
@squito

squito Nov 4, 2015

Contributor

you can avoid sleeping by using cache.setTicker

This comment has been minimized.

Copy link
@archit279thakur

archit279thakur Nov 5, 2015

Author

For that we'll have to set an arbitrary ticker to the main cache, We would not want to set any arbitrary ticker to the original cache. Creating a new cache in the test would not be testing of our functionality and would be equivalent of testing just the Guava Cache's code. right? Please correct, if wrong.

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

I think you can do something in between -- StorageStatusListener can have a private[storage] constructor which takes the ticker, and the public one just defaults it to the system ticker. Yes, you would not be testing exactly the same behavior, but it tests the important parts.

Sleeping isn't the worst thing in this case -- often it leads to flaky tests, though I don't think that would be the case here. Still, 5 seconds is awfully long for this test when it should take a tiny fraction of that, and it adds up over all the tests.

/**
* Test the behavior of StorageStatusListener in response to all relevant events.
*/

This comment has been minimized.

Copy link
@squito

squito Nov 4, 2015

Contributor

spurious extra doc, I don't think you use SparkConfSuite, and the import of SparkConf is out of order

import com.google.common.cache.CacheLoader
import org.apache.spark.SparkConf

import scala.collection.JavaConversions._

This comment has been minimized.

Copy link
@squito

squito Nov 4, 2015

Contributor

nit: import ordering

@archit279thakur

This comment has been minimized.

Copy link
Author

archit279thakur commented Nov 5, 2015

@squito Can you please review it again.

@@ -17,26 +17,55 @@

package org.apache.spark.storage

import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions.collectionAsScalaIterable

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

avoid using JavaConversions, you should prefer JavaConverters, which forces you to call .asScala, making the transformation much clearer to future code readers. the convention is to import scala.collection.JavaConverters._

This comment has been minimized.

Copy link
@archit279thakur
import scala.collection.mutable
import scala.language.reflectiveCalls

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

unused?

This comment has been minimized.

Copy link
@archit279thakur

archit279thakur Nov 5, 2015

Author

I am getting this warning on the compilation (though there are many already being thrown).

[warn] core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala:174: reflective access of structural type member method advance should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn] This can be achieved by adding the import clause 'import scala.language.reflectiveCalls'
[warn] or by setting the compiler option -language:reflectiveCalls.
[warn] See the Scala docs for value scala.language.reflectiveCalls for a discussion
[warn] why the feature should be explicitly enabled.
[warn] ticker.advance(5, TimeUnit.SECONDS)

That's why I imported that. I checked again, I am still getting this.

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

ok, so that is in the StorageStatusListener_Suite_, not StorageStatusListener. You're getting that b/c you're calling ticker.advance, though you haven't explicitly defined a class or interface w/ that method, so scala is using some reflection tricks to do it. If you change that test as I suggested, that warning will disappear and you can remove the import

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

import ordering -- these go above the o.a.spark imports, with a linebreak in between. See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports

private [storage] def this(conf: SparkConf, ticker: Ticker) = {
this(conf)
this.ticker = ticker
}

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

you can avoid the var here, though the scala syntax is a little tricky:

class StorageStatusListener private[storage](conf: SparkConf, ticker: Ticker) {
  def this(conf: SparkConf) = {
    this(conf, Ticker.systemTicker())
  }

This comment has been minimized.

Copy link
@archit279thakur

archit279thakur Nov 5, 2015

Author

Yes, This definitely looks better.


def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}


def removedExecutorStorageStatusList: Seq[StorageStatus] = synchronized{

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

nit: space before {

This comment has been minimized.

Copy link
@archit279thakur

archit279thakur Nov 5, 2015

Author

Thanks for pointing it out. Corrected.

import org.apache.spark.Success
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.scalatest.FunSuite

import com.google.common.base.Ticker

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

nit: import ordering & grouping. org.scalatest & com.google go in the same group, above the org.apache.spark group

This comment has been minimized.

Copy link
@archit279thakur

archit279thakur Nov 5, 2015

Author

Thanks for pointing it out. Corrected.

@archit279thakur

This comment has been minimized.

Copy link
Author

archit279thakur commented Nov 5, 2015

@squito Thanks for your comments. Incorporated them all and also gone through the link.
Please point out if I missed anything.

val listener = new StorageStatusListener(localtestconf, ticker)
listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
ticker.advance(5, TimeUnit.SECONDS)
assert(listener.removedExecutorIdToStorageStatus.asMap.get("1") == null)

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

this is more complicated than it needs to be -- no need for an atomic (there is only one thread here) you can just use a long. also I'd check the removedExecutorStorageStatusList method, rather than the cache itself.

    class MyTicker extends Ticker {
      var t = 0L
      override def read(): Long = t
    }
    val ticker = new MyTicker
    val listener = new StorageStatusListener(localtestconf, ticker)
    listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
    assert(listener.removedExecutorStorageStatusList.nonEmpty)
    ticker.t = 5000000001L
    assert(listener.removedExecutorStorageStatusList.isEmpty)
@@ -22,6 +22,7 @@ import org.apache.spark.Success
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark.SparkConf

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

nit: import ordering, should go above the rest of the spark imports (b/c class imports sort before package imports). Also while you're touching this, there should be a blank line between the scalatest and spark imports

@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 5, 2015

Jenkins, ok to test

@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 5, 2015

@archit279thakur can you also bring this up to date with master, and include before & after screenshots?
I'd like for this to also update the json endpoints. Finally, I think that as long we're storing removed executors, we should store the time they were removed.

@suyanNone can you take another look as well?

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 5, 2015

Test build #45141 has finished for PR 6263 at commit 1fdffc5.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.
@@ -150,4 +157,21 @@ class StorageStatusListenerSuite extends FunSuite {
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}

test("Killed Executor Entry removed after configurable time") {
val localtestconf = new SparkConf().set(StorageStatusListener.TIME_TO_EXPIRE_KILLED_EXECUTOR,"5s")

This comment has been minimized.

Copy link
@squito

squito Nov 5, 2015

Contributor

nit: line too long

archit.thakur added 8 commits Nov 5, 2015
…into SPARK-7729

Conflicts:
	core/src/main/scala/org/apache/spark/ui/SparkUI.scala
	core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
	core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
	core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46414 has finished for PR 6263 at commit 3e23321.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@archit279thakur

This comment has been minimized.

Copy link
Author

archit279thakur commented Nov 20, 2015

@squito
In reply to:
@archit279thakur can you also bring this up to date with master, and include before & after screenshots? I'd like for this to also update the json endpoints. Finally, I think that as long we're storing removed executors, we should store the time they were removed.

Two things:

  1. For that, I'll have to add a new column in the execTable on the UI. Should it be lastStatusChangedTime (with aliveTime or killedTime, depending on the status) or KilledTime (with blank values for the Alive executors) ?
  2. This value would always be greater than the value currentTime - spark.ui.timeToExpireKilledExecutor. I am not really sure, we provide any useful insights by showing the time at which executor died.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46416 has finished for PR 6263 at commit b827f8f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46417 has finished for PR 6263 at commit 33fc892.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46418 has finished for PR 6263 at commit 2cf4f71.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46422 has finished for PR 6263 at commit e1577dc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 20, 2015

Test build #46423 has finished for PR 6263 at commit 826587f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 24, 2015

Hi @archit279thakur,

good questions about what to do with the time it was killed. The reason I wanted it included is so the user could put it together with the timeline, to see what stages were running when the executor was killed, to help them debug why the executor was removed. I dont' have strong opinions about where it should go in the UI -- I'm willing to believe that it will just lead to too much clutter. @CodingCat , any thoughts? But in either case, it would still be nice to have in the json endpoint.

btw, the mima failure is from this:

[error]  * method this(java.lang.String,java.lang.String,Int,Long,Long,Int,Int,Int,Int,Long,Long,Long,Long,Long,scala.collection.Map)Unit in class org.apache.spark.status.api.v1.ExecutorSummary does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")

you can add that line to project/MimaExcludes.scala, that constructor is private so this a false positive.


@DeveloperApi
object StorageStatusListener {
val TIME_TO_EXPIRE_KILLED_EXECUTOR = "spark.ui.timeToExpireKilledExecutor"

This comment has been minimized.

Copy link
@CodingCat

CodingCat Nov 25, 2015

Contributor

do we really need this class? how about just exposing this string to the end user?

@CodingCat

This comment has been minimized.

Copy link
Contributor

CodingCat commented Nov 25, 2015

@archit279thakur , would you mind just uploading some screenshot, so that we have more sense on the current page structure?

@CodingCat

This comment has been minimized.

Copy link
Contributor

CodingCat commented Nov 25, 2015

In the current version of patch, we use expiration time to prevent too many dead executors from appearing on the UI. It brings inconvenient overhead which makes the UI component to have a dependency on Guava...Additionally, there are cases that the executors are failed and restarted time and time again within a very short period (I met this in some of my applications when I introduced some bug, cannot remember what exactly happened)

I'm considering that we might be able to just cap the maximum number of rows in the table, like what we do in many other places (master/worker UI, etc.). Even we stick to the expiration time, TimeStampedHashMap might be a cleaner solution?

@CodingCat

This comment has been minimized.

Copy link
Contributor

CodingCat commented Nov 25, 2015

@squito Regarding the page structure, do not trust my sense of aesthetic, :-)

Personally, I prefer to separate the page into two sections, one for alive executors, one for dead ones,

or you provide the capability to the user to sort the entries with status

@lianhuiwang

This comment has been minimized.

Copy link
Contributor

lianhuiwang commented Dec 1, 2015

@archit279thakur @CodingCat @squito I have created new PR #10058. Can you take a look at it?Thanks.

@rxin

This comment has been minimized.

Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

@asfgit asfgit closed this in 7b4452b Dec 31, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants
You can’t perform that action at this time.