Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16139][TEST] Add logging functionality for leaked threads in tests #19893

Closed
wants to merge 21 commits into from

Conversation

gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

Lots of our tests don't properly shutdown everything they create, and end up leaking lots of threads. For example, TaskSetManagerSuite doesn't stop the extra TaskScheduler and DAGScheduler it creates. There are a couple more instances, eg. in DAGSchedulerSuite.

This PR adds the possibility to print out the not properly stopped thread list after a test suite executed. The format is the following:

===== FINISHED o.a.s.scheduler.DAGSchedulerSuite: 'task end event should have updated accumulators (SPARK-20342)' =====

...

===== Global thread whitelist loaded with name /thread_whitelist from classpath: rpc-client.*, rpc-server.*, shuffle-client.*, shuffle-server.*' =====

ScalaTest-run: 

===== THREADS NOT STOPPED PROPERLY =====

ScalaTest-run: dag-scheduler-event-loop
ScalaTest-run: globalEventExecutor-2-5
ScalaTest-run: 

===== END OF THREAD DUMP =====

ScalaTest-run: 

===== EITHER PUT THREAD NAME INTO THE WHITELIST FILE OR SHUT IT DOWN PROPERLY =====

With the help of this leaking threads has been identified in TaskSetManagerSuite. My intention is to hunt down and fix such bugs in later PRs.

How was this patch tested?

Manual: TaskSetManagerSuite test executed and found out where are the leaking threads.
Automated: Pass the Jenkins.

@gaborgsomogyi
Copy link
Contributor Author

cc @squito @srowen @HyukjinKwon

@smurakozi
Copy link
Contributor

Logging the leaked threads in a more grep friendly format would be nice, you could easily create a thread leak report.
It would be also nice to see the leaks on the console.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Dec 5, 2017

Good point, I've also struggled to collect all actual problems from Jenkins build :)
Format changed to the following:

===== FINISHED o.a.s.scheduler.DAGSchedulerSuite: 'task end event should have updated accumulators (SPARK-20342)' =====

...

ScalaTest-run: 

===== Global thread whitelist loaded with name /thread_whitelist from classpath: rpc-client.*, rpc-server.*, shuffle-client.*, shuffle-server.*' =====

ScalaTest-run: 

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.scheduler.DAGSchedulerSuite, thread names: dag-scheduler-event-loop, globalEventExecutor-2-6 =====

# Each line contains a new regex string which will be evaluated with matches
# Empty lines or starting with # will me skipped

rpc-client.*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is just for testing support, I personally think there's no need to create a config file and read it. Hard-coding filtering rules may be just fine. Neutral on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Sean here - there's not a really obvious use case for having this independent of the class where it's used. Putting it into the code means that the whitelist feature is self-documenting, and you don't have to go through any indirection to find this file.

Plus I think moving this into SparkFunSuite means you can get rid of the file loading logic in 'object SparkFunSuite'.

@@ -683,7 +683,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)

val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this change about? not shadowing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here originally the newly created instance was stored in a local variable which was never saved in member and freed properly. With this change the afterEach method stops it and frees up the resources.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Dec 6, 2017

I have gathered statistics manually about the actual stand. I've grep-ed unit-tests.log in the whole build:

bash-3.2$ find . -type f | xargs grep "POSSIBLE THREAD LEAK" | wc -l
370

@srowen
Copy link
Member

srowen commented Dec 6, 2017

Do any of those leaked threads look like they might be real issues to fix? you could paste the results here, minus anything you know isn't a problem.

@gaborgsomogyi
Copy link
Contributor Author

I've just started to take a look at it deeper and found some patterns. Namely we can exclude all netty.* threads + ForkJoinPool.* is most of the time but not always created inside scala by the global ExecutionContext. All in all far from have a good picture but I'll exclude these entries.

@gaborgsomogyi
Copy link
Contributor Author

On the other side globalEventExecutor.* and dag-scheduler-event-loop was an issue in the tests what I've taken a look at.

@gaborgsomogyi
Copy link
Contributor Author

Here is a list but it definitely contains false positives.

SPARK-16139.txt

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #4006 has finished for PR 19893 at commit 0d45a5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

I've taken a look at the failed test but seems like unrelated.

@@ -52,6 +62,23 @@ abstract class SparkFunSuite
getTestResourceFile(file).getCanonicalPath
}

private def saveThreadNames(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest turning this into runningThreadNames(): Set[String], and then you can use this method both in beforeAll() and in printRemainingThreadNames() (line 70). And you can maybe put the whitelist logic here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config file removed and refactored as you suggested. It's much more simple now, thanks :)


private def printRemainingThreadNames(): Unit = {
val currentThreadNames = Thread.getAllStackTraces.keySet().map(_.getName).toSet
val whitelistedThreadNames = currentThreadNames.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: '.' goes on next line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

# Each line contains a new regex string which will be evaluated with matches
# Empty lines or starting with # will me skipped

rpc-client.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Sean here - there's not a really obvious use case for having this independent of the class where it's used. Putting it into the code means that the whitelist feature is self-documenting, and you don't have to go through any indirection to find this file.

Plus I think moving this into SparkFunSuite means you can get rid of the file loading logic in 'object SparkFunSuite'.

private def printRemainingThreadNames(): Unit = {
val currentThreadNames = Thread.getAllStackTraces.keySet().map(_.getName).toSet
val whitelistedThreadNames = currentThreadNames.
filterNot(s => SparkFunSuite.threadWhiteList.exists(s.matches(_)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: .filterNot { s =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style applied.

val whitelistedThreadNames = currentThreadNames.
filterNot(s => SparkFunSuite.threadWhiteList.exists(s.matches(_)))
val remainingThreadNames = whitelistedThreadNames.diff(beforeAllTestThreadNames)
if (!remainingThreadNames.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remainingThreadNames.nonEmpty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@@ -72,3 +99,27 @@ abstract class SparkFunSuite
}

}

object SparkFunSuite
extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to previous line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object removed due to previous review items.

@gaborgsomogyi
Copy link
Contributor Author

In the meantime analysed a couple of cases and found netty related threads:

  • netty.*
  • globalEventExecutor.*
  • threadDeathWatcher.*

I've added them to the whitelist.

import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorContext
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}

import scala.collection.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder why the style checker didn't complain, but scala.* imports should be in the previous position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've executed reorganize imports. Shouldn't solve this such problems?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what that is.

The import order is described in http://spark.apache.org/contributing.html, section "Imports".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the guidance. I've set up the intellij imports organizer as described and fixed with it.

@@ -34,12 +36,24 @@ abstract class SparkFunSuite
with Logging {
// scalastyle:on

val threadWhiteList = Set(
"rpc-client.*", "rpc-server.*", "shuffle-client.*", "shuffle-server.*",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add comments explaining why the threads are whitelisted. Without an explanation to the contrary, I don't think any of these should be whitelisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new netty related patterns added documentation. Could somebody help me out with rpc and shuffle? All I can see for example TaskSetManagerSuite.test("TaskSet with no preferences") creates a lot of them and I don't see any test issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily removed rpc and shuffle. I'll put them back when proper doc can be written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made deepdive what these threads are and put documentation for each. I'll execute a build with them and let's see the new numbers.

@vanzin
Copy link
Contributor

vanzin commented Dec 6, 2017

ok to test

val currentThreadNames = runningThreadNames()
val whitelistedThreadNames = currentThreadNames
.filterNot { s => threadWhiteList.exists(s.matches(_)) }
val remainingThreadNames = whitelistedThreadNames.diff(beforeAllTestThreadNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this would be better written as:

val remainingThreadNames = runningThreadNames.diff(beforeAllTestThreadNames).filterNot { s => threadWhiteList.exists(s.matches(_)) }

(although putting the whitelist filtering into runningThreadNames() would still make this more concise).

The reason is that it's not obvious to the reader why you whitelist 'after' threads but not 'before' - clearer to whitelist the diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compressed.

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84577 has finished for PR 19893 at commit a35a52f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84574 has finished for PR 19893 at commit 1a64209.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84580 has finished for PR 19893 at commit fe6cd0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84606 has finished for PR 19893 at commit 62cb32b.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84601 has finished for PR 19893 at commit 2b02d45.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

@squito I mean another jira, because it needs deeper analysis and discussion.

@gaborgsomogyi
Copy link
Contributor Author

gentle ping @jiangxb1987

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85290 has finished for PR 19893 at commit ef00796.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM only some nits. also cc @cloud-fan

}

private def printRemainingThreadNames(): Unit = {
val suiteName = this.getClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val shortSuiteName = this.getClass.getName.replaceAll("org.apache.spark", "o.a.s")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

s"thread names: ${remainingThreadNames.mkString(", ")} =====\n")
}
} else {
logWarning(s"\n\n===== THREAD AUDIT POST ACTION CALLED " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove 's' before the string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


protected override def beforeAll(): Unit = {
doThreadPreAudit
super.beforeAll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: super.beforeAll(), and also super.afterAll().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85315 has finished for PR 19893 at commit f7939fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the places where you're overriding doThreadAuditInSparkFunSuite, it seems like the code is just not correct, and that you can just fix it instead of overriding that behavior.

with Logging {
// scalastyle:on

protected val doThreadAuditInSparkFunSuite = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this just doThreadAudit or enableThreadAudit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the way this is being used elsewhere, a better name is probably enableAutoThreadAudit. or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about proper naming before. The last suggested one is definitely better. No exact place where it happens but not suggesting that it's completely turned off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to enableAutoThreadAudit.

/**
* Thread audit for test suites.
*
* Thread audit happens normally in [[SparkFunSuite]] automatically when a new test suite created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't describe the behavior of SparkFunSuite here. You should instead document the flag in SparkFunSuite that controls whether this code is triggered.

All the comments in the rest of this class are related to SparkFunSuite and overriding its default behavior, so they're better placed in SparkFunSuite and not here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, moving.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just remove this paragraph since this class is independent of SparkFunSuite.


/**
* During [[SparkContext]] creation BlockManager
* creates event loops. One is wrapped inside
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line wrapped too early.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

protected def doThreadPostAudit(): Unit = printRemainingThreadNames

private def snapshotRunningThreadNames(): Unit = {
threadNamesSnapshot = runningThreadNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call with () if you declare the method with ().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

protected def doThreadPreAudit(): Unit = snapshotRunningThreadNames
protected def doThreadPostAudit(): Unit = printRemainingThreadNames

private def snapshotRunningThreadNames(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just inline this in doThreadPreAudit since it's the only call site and this is a private method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined.

threadNamesSnapshot = runningThreadNames
}

private def printRemainingThreadNames(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above. Just inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined.

@@ -39,6 +41,7 @@ class SessionStateSuite extends SparkFunSuite
protected var activeSession: SparkSession = _

override def beforeAll(): Unit = {
doThreadPreAudit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the problem here that this is not calling super.beforeAll()? If you do that, you don't need to override doThreadAuditInSparkFunSuite nor call doThreadPostAudit below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. My intention was to change the least in the tests behaviour. This case doesn't matter.

private var targetAttributes: Seq[Attribute] = _
private var targetPartitionSchema: StructType = _

override def beforeAll(): Unit = {
doThreadPreAudit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here. This should be calling super.beforeAll().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

override protected val doThreadAuditInSparkFunSuite = false

protected override def beforeAll(): Unit = {
doThreadPreAudit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the same situation, but because this is a trait, it kinda relies on the suites to call beforeAll and afterAll correctly... if you don't want to audit all suites, you could write a comment explaining the situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of similar but not the same. Comment added.

override def beforeAll(): Unit = {
// Reuse the singleton session
activeSession = spark
doThreadPreAudit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing. Just call super.beforeAll() correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85722 has finished for PR 19893 at commit 0851ef2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* class MyTestSuite extends SparkFunSuite {
*
* override val doThreadAuditInSparkFunSuite = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enableAutoThreadAudit now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* Thread audit happens normally here automatically when a new test suite created.
* The only prerequisite for that is that the test class must extend [[SparkFunSuite]].
*
* There are some test suites which are doing initialization before [[SparkFunSuite#beforeAll]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better:

"
It is possible to override the default thread audit behavior by setting enableAutoThreadAudit to false and manually calling the audit methods, if desired. For example:

// Code
"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

/**
* Thread audit for test suites.
*
* Thread audit happens normally in [[SparkFunSuite]] automatically when a new test suite created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just remove this paragraph since this class is independent of SparkFunSuite.

trait SharedSQLContext extends SQLTestUtils with SharedSparkSession {

/**
* Auto thread audit is turned off here intentionally and done manually.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little not convinced that this is needed.

I still think that any reported leaks here are caused by bugs in the test suites and not because of this. The code you have here is basically the same thing as SparkFunSuite.

For example, if a suite extending this does not call super.beforeAll() but calls super.afterAll(), won't you get false positives in the output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing but it meant to solve different problem (changes the execution order). Please see the execution order with and without this change described in my previous post:

As a next step analysed SQL test flow. Here are the steps:

1. SharedSparkSession.beforeAll called which initialise SparkSession and SQLContext
2. SparkFunSuite.beforeAll creates a thread snapshot
3. Test code runs
4. SparkFunSuite.afterAll prints out the possible leaks
5. SharedSparkSession.afterAll stops SparkSession

Not sure if I understand right but this will not report false positives. The only problem what I see here as it's not gonna report SparkSession and SQLContext related leaks.

As you mentioned before this code should find SparkContext related threading issues which applies here as well. This is not fulfilled at the moment and my proposal is to fix it this way:

1. SparkFunSuite.beforeAll creates a thread snapshot
2. SharedSparkSession.beforeAll called which initialise SparkSession and SQLContext
3. Test code runs
4. SharedSparkSession.afterAll stops SparkSession
5. SparkFunSuite.afterAll prints out the possible leaks

With this change I don't see any false positives and missed threads.
Please share your ideas related this topic.

Your concern is fully standing but this change is not intended to cover the mentioned issue. The problem you mentioned is addressed in ThreadAudit, namely it prints out the following message such cases:

THREAD AUDIT POST ACTION CALLED WITHOUT PRE ACTION IN SUITE...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand your explanation, and I definitely don't understand what's going on from the comment in the code. What I'm asking is for the comment here to explain not what the code is doing, but why it's doing it.

Basically, if instead of the code you have here, you just called super.beforeAll and super.afterAll, without disabling enableAutoThreadAudit, what will break and why? That's what the comment should explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, now I see your point. Description changed.

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85802 has finished for PR 19893 at commit 87c4852.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86035 has finished for PR 19893 at commit 9c9c6ef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

Checked the failure but seems like unrelated.

@jiangxb1987
Copy link
Contributor

retest this please

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you had a chance to look at the hive tests? There's a whole bunch of reported thread leaks there. Hive tests behave differently from all of the others in that they share a spark session across suites, not just within a suite.

Examples of reported thread leaks:

And a whole bunch of others.


/**
* Suites extending [[SharedSQLContext]] are sharing resources (eg. SparkSession) in their tests.
* Such resources are initialized by the suite before thread audit takes thread snapshot and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but this still does not explain why this is happening. It's just stating that it is.

For example, in SharedSparkSession, there is this code:

  protected override def afterAll(): Unit = {
    super.afterAll()
    if (_spark != null) {
      _spark.sessionState.catalog.reset()
      _spark.stop()
      _spark = null
    }
  }

If you move super.afterAll() to after the session is stopped, won't that solve the problem and avoid this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion solves one part of the problem. The other one lies here:

  protected override def beforeAll(): Unit = {
    initializeSession()

    // Ensure we have initialized the context before calling parent code
    super.beforeAll()
  }

Session initialized before thread snapshot. This should also happen in the opposite order. Because I've seen the comment in the code I decided not to change it.

Copy link
Contributor

@vanzin vanzin Jan 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the latter a problem? At worst you'll have less threads after the suite finishes than when it started, which should be fine, no? The problem is having leaked threads, not the other way around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I see your point. Still, the comment here is confusing. Can't this be done in SharedSparkSession instead, where that initialization happens, so that it's clear what it's talking about?

Copy link
Contributor

@vanzin vanzin Jan 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an easier to understand comment about what's going on here:

  /**
   * Suites extending [[SharedSQLContext]] are sharing resources (eg. SparkSession) in their tests.
   * That trait initializes the spark session in its [[beforeAll()]] implementation before the
   * automatic thread snapshot is performed, so the audit code could fail to report threads leaked
   * by that shared session.
   *
   * The behavior is overridden here to take the snapshot before the spark session is initialized.
   */

Sorry for the noise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better phrased and compressed explanation, applied. Agree that it would be better to move this functionality into SharedSparkSession on the other hand it would lead far in terms of number of modifications. SharedSparkSession has to extend has to extend SparkFunSuite which I don't see it worth the effort. The other option what I see also doesn't help in terms of understanding. Namely moving manual thread audit into SparkFunSuite and leaving enableAutoThreadAudit = false in SharedSQLContext but splitting functionality rarely can help. Ideas?

@gaborgsomogyi
Copy link
Contributor Author

Related hive please see my comment on 11 Dec 2017.

@vanzin
Copy link
Contributor

vanzin commented Jan 12, 2018

Why not disable the thread audit in the hive module? You added that functionality already, should be pretty trivial to use it.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86050 has finished for PR 19893 at commit 9c9c6ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

Thread audit disabled in hive.

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86093 has finished for PR 19893 at commit 56a41df.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • * That trait initializes the spark session in its [[beforeAll()]] implementation before the

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86094 has finished for PR 19893 at commit 68d0f3b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 16, 2018

Merging to master.

It would be nice to file a separate bug to eventually look at how to do this on the spark-hive module (or maybe it's just not worth the effort).

@asfgit asfgit closed this in 12db365 Jan 16, 2018
@gaborgsomogyi
Copy link
Contributor Author

@vanzin @squito @srowen @jiangxb1987 @henryr
Big thanks to everybody for the constructive comments, learned a lot from them.
I'll take a look at further possibilities like the suggested spark-hive module.

@gaborgsomogyi gaborgsomogyi deleted the SPARK-16139 branch January 17, 2018 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants