Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4180] [Core] Prevent creation of multiple active SparkContexts #3121

Closed
wants to merge 18 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch adds error-detection logic to throw an exception when attempting to create multiple active SparkContexts in the same JVM, since this is currently unsupported and has been known to cause confusing behavior (see SPARK-2243 for more details).

The solution implemented here is only a partial fix. A complete fix would have the following properties:

  1. Only one SparkContext may ever be under construction at any given time.
  2. Once a SparkContext has been successfully constructed, any subsequent construction attempts should fail until the active SparkContext is stopped.
  3. If the SparkContext constructor throws an exception, then all resources created in the constructor should be cleaned up (SPARK-4194).
  4. If a user attempts to create a SparkContext but the creation fails, then the user should be able to create new SparkContexts.

This PR only provides 2) and 4); we should be able to provide all of these properties, but the correct fix will involve larger changes to SparkContext's construction / initialization, so we'll target it for a different Spark release.

The correct solution:

I think that the correct way to do this would be to move the construction of SparkContext's dependencies into a static method in the SparkContext companion object. Specifically, we could make the default SparkContext constructor private and change it to accept a SparkContextDependencies object that contains all of SparkContext's dependencies (e.g. DAGScheduler, ContextCleaner, etc.). Secondary constructors could call a method on the SparkContext companion object to create the SparkContextDependencies and pass the result to the primary SparkContext constructor. For example:

class SparkContext private (deps: SparkContextDependencies) {
  def this(conf: SparkConf) {
    this(SparkContext.getDeps(conf))
  }
}

object SparkContext(
  private[spark] def getDeps(conf: SparkConf): SparkContextDependencies = synchronized {
    if (anotherSparkContextIsActive) { throw Exception(...) }
    var dagScheduler: DAGScheduler = null
    try {
        dagScheduler = new DAGScheduler(...)
        [...]
    } catch {
      case e: Exception =>
         Option(dagScheduler).foreach(_.stop())
          [...]
    }
    SparkContextDependencies(dagScheduler, ....)
  }
} 

This gives us mutual exclusion and ensures that any resources created during the failed SparkContext initialization are properly cleaned up.

This indirection is necessary to maintain binary compatibility. In retrospect, it would have been nice if SparkContext had no private constructors and could only be created through builder / factory methods on its companion object, since this buys us lots of flexibility and makes dependency injection easier.

Alternative solutions:

As an alternative solution, we could refactor SparkContext's primary constructor to perform all object creation in a giant try-finally block. Unfortunately, this will require us to turn a bunch of vals into vars so that they can be assigned from the try block. If we still want vals, we could wrap each val in its own try block (since the try block can return a value), but this will lead to extremely messy code and won't guard against the introduction of future code which doesn't properly handle failures.

The more complex approach outlined above gives us some nice dependency injection benefits, so I think that might be preferable to a var-ification.

This PR's solution:

  • At the start of the constructor, check whether some other SparkContext is active; if so, throw an exception.
  • If another SparkContext might be under construction (or has thrown an exception during construction), allow the new SparkContext to begin construction but log a warning (since resources might have been leaked from a failed creation attempt).
  • At the end of the SparkContext constructor, check whether some other SparkContext constructor has raced and successfully created an active context. If so, throw an exception.

This guarantees that no two SparkContexts will ever be active and exposed to users (since we check at the very end of the constructor). If two threads race to construct SparkContexts, then one of them will win and another will throw an exception.

This exception can be turned into a warning by setting spark.driver.allowMultipleContexts = true. The exception is disabled in unit tests, since there are some suites (such as Hive) that may require more significant refactoring to clean up their SparkContexts. I've made a few changes to other suites' test fixtures to properly clean up SparkContexts so that the unit test logs contain fewer warnings.

@JoshRosen
Copy link
Contributor Author

/cc @tdas @pwendell @aarondav

I'd like to target this for Spark 1.1.1 and Spark 1.2.0.

I should add user-facing documentation to explain that only one SparkContext may be active per JVM; I'll push an extra commit to do that now.

@JoshRosen JoshRosen changed the title [SPARK-4180] Prevent creation of multiple active SparkContexts [SPARK-4180] [Core] Prevent creation of multiple active SparkContexts Nov 5, 2014
@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22959 has started for PR 3121 at commit afaa7e3.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor Author

Oh, and /cc @mateiz and @andrewor14, too.

@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22960 has started for PR 3121 at commit 918e878.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22959 has finished for PR 3121 at commit afaa7e3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22959/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22960 has finished for PR 3121 at commit 918e878.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22960/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

This failed a bunch of tests because Spark Streaming's BasicOperationsSuite was missing some cleanup and because there was a bug in StreamingContext.stop() where it wouldn't stop the SparkContext if the streaming context had not been started yet.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22975 has started for PR 3121 at commit 06c5c54.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22975 has finished for PR 3121 at commit 06c5c54.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22975/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

@tdas It looks like a change that I made to StreamingContext.stop() legitimately broke one of the tests:

[info] - stop before start and start after stop *** FAILED *** (1 second, 163 milliseconds)
[info]   org.apache.spark.SparkException: StreamingContext has already been stopped

Here's the test code:

  test("stop before start and start after stop") {
    ssc = new StreamingContext(master, appName, batchDuration)
    addInputStream(ssc).register()
    ssc.stop()  // stop before start should not throw exception
    ssc.start()  // <---- This is the line that's throwing the exception
    ssc.stop()
    intercept[SparkException] {
      ssc.start() // start after stop should throw exception
    }
  }

The issue is that the old code would let you call ssc.stop() on a StreamingSparkContext that hadn't been started and would then let you call start() on it. I don't like the old semantics, since calling start() after stop() should always be an error. Can I change this, or do I have to retain the old and confusing semantics?

@JoshRosen
Copy link
Contributor Author

Here's another example of bad test fixture design: Spark SQL's TestSQLContext is a global SparkContext instance so it can't easily be started / stopped and is shared across multiple tests / suites:

/** A SQLContext that can be used for local testing. */
object TestSQLContext
  extends SQLContext(
    new SparkContext(
      "local[2]",
      "TestSQLContext",
      new SparkConf().set("spark.sql.testkey", "true"))) {

  /** Fewer partitions to speed up testing. */
  override private[spark] def numShufflePartitions: Int =
    getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}

I'm going to refactor this into a proper testing trait, SharedLocalSQLContext, that manages setup / teardown using BeforeAndAfterAll.

@JoshRosen
Copy link
Contributor Author

This SQL situation is going to be a huge mess to fix and I fear that I'll wind up having to touch nearly every line of test code (or at least a substantial fraction of them).

Part of the problem is that there are also global TestJsonData, TestData, and ParquetTestData classes that hold RDDs created using TestSQLContext. These classes' fields are statically imported into most of the test suites, so there's tons of references that would need to be updated. There are a few code smells that suggest that this approach might have caused other problems, such as these lines at the top of some test suites to ensure that the test data is initialized:

class CachedTableSuite extends QueryTest {
  TestData // Load test tables.

class InMemoryColumnarQuerySuite extends QueryTest {
  // Make sure the tables are loaded.
  TestData

I want to avoid rewriting the bulk of the actual test logic. One approach is to leave these as global objects but convert their fields to defs and add an initialize() method for reconstructing the global objects' states when a new SQLContext is created. This isolates most of the changes to those test data classes. We could create a test suite mixin that calls these initialize() methods from beforeAll(). If we combine this with a few small changes to remove the TestSQLContext global object and replace it by a SharedSQLContext trait, then I think we can enable cleanup of the SQL tests' SparkContext with fairly minimal impact to the actual test code itself.

Perhaps this design using global objects that are shared across test suites was motivated by performance concerns, since it might be expensive to create a bunch of tables. I think that the right approach to addressing test performance is through coarse-grained parallelization by running individual test suites in separate JVMs, since global shared state can be confusing. Also, I think the performance impact might be fairly minimal: we'd only be re-initializing once per suite, not once per test.

@marmbrus, do you have any feedback here? Is there a cleaner way to enable cleanup of the SparkContext that the SQL tests create?

@JoshRosen
Copy link
Contributor Author

To be more concrete, I'm suggesting something like this:

object TestData {

  /**
   * Initialize TestData using the given SQLContext.  This will re-create all SchemaRDDs and tables
   * using that context.
   */
  def init(sqlContext: SQLContext) {
    initMethods.foreach(m => m(sqlContext))
  }

  /** A sequence of functions that are invoked when `init()` is called */
  private val initMethods = mutable.Buffer[SQLContext => Unit]()

  /**
   * Register a block of code to be called when TestData is initialized with a new SQLContext.
   */
  private def onInit(block: SQLContext => Unit) {
    initMethods += block
  }

  def testData = _testData
  private var _testData: SchemaRDD = null
  onInit { sqlContext =>
     _testData = sqlContext.sparkContext.parallelize(
      (1 to 100).map(i => TestData(i, i.toString))).toSchemaRDD
    testData.registerTempTable("testData")
  }

 case class LargeAndSmallInts(a: Int, b: Int)
  def largeAndSmallInts = _largeAndSmallInts
  private var _largeAndSmallInts: SchemaRDD = null
  onInit { sqlContext =>
      ...
  }

  [...]

This whole onInit thing is a way to co-locate the fields, case classes, and initialization code fragments. From clients' perspectives, the public vals have become getter defs, but everything else stays the same.

@aarondav
Copy link
Contributor

aarondav commented Nov 6, 2014

Here is the prior discussion on the issue you brought up with StreamingSparkContext: #3053 (comment)

Note that the result was the reversion of the change and the addition of this check:
https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49
which could be removed if your change is used.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2014

@JoshRosen I don't think the situation is quite a dire as you suggest (every line of test code?). We can add logic to QueryTest and the other base test classes that creates a SQLContext per suite with whatever SparkContext you want. We can then turn the data objects into traits that are mixed into the test cases they need. As long some SQLContext is in scope, and the required tables are added to that context during the constructor I don't anticipate any major problems.

Hive is going to be another story. The whole reason for this singleton context pattern is that we have problems initializing more than one HiveContext in a single JVM. If you try to do that all DDL operations fail with a mysterious Database default does not exist error. We have never been able to figure out what sort of global state Hive relies on (though admittedly it has not been a very high priority since a global context with a robust .reset() has worked pretty well so far).

@JoshRosen
Copy link
Contributor Author

The one subtlety with mixing in *Data classes is that the Maven test runner might instantiate multiple test suites before running any of them, so in general it's unsafe to create resources that need to be cleaned up in the constructor of the test suite itself, which is why I'd like to do this setup in a beforeAll method. You can't statically import methods/fields from a var, though, so we can't just instantiate a *Data class in beforeAll and use it in all of the tests without having to rewrite every line to access the fields through the class instance.

One alternative is to have the mixin trait be "resettable" by exposing defs in its public API (basically, what I've proposed above).

}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) {
logWarning("Multiple SparkContext error detection is disabled!", exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like "Multiple SparkContexts detected in the same JVM!". We're not trying to warn people that they disabled the config.

@JoshRosen
Copy link
Contributor Author

@aarondav I just noticed another test failure which was due to the same problem that you saw in #3053: some of our tests throw exceptions when creating SparkContexts and then never clean up those resources, which triggers the "cannot create a new SparkContext if a previous attempt failed" limitation that this commit introduces.

I'd rather not refactor a bunch of test code in SQL, so I'm probably going to just add the "turn this error into a warning" flag to our test Java options; we can do the final cleanup / refactoring later.

@JoshRosen
Copy link
Contributor Author

I've made another pass which I think should address this last round of review feedback. Thanks for all of the careful review and commentary so far.

@SparkQA
Copy link

SparkQA commented Nov 15, 2014

Test build #23418 has started for PR 3121 at commit c0987d3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 15, 2014

Test build #23418 has finished for PR 3121 at commit c0987d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23418/
Test PASSed.

@@ -57,12 +57,27 @@ import org.apache.spark.util._
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say "may eventually be removed".

@pwendell
Copy link
Contributor

This is a nice solution and LGTM. Made only minor comments.

@SparkQA
Copy link

SparkQA commented Nov 17, 2014

Test build #23457 has started for PR 3121 at commit 23c7123.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 17, 2014

Test build #23457 has finished for PR 3121 at commit 23c7123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23457/
Test PASSed.

@pwendell
Copy link
Contributor

LGTM

@pwendell
Copy link
Contributor

Thanks Josh - I'm gonna pull this in.

@pwendell
Copy link
Contributor

Hey Josh - I've put this into branch 1.2. Since this is a nontrivial patch I'm sort of hesitant to throw it directly in branch-1.1 right now. Could we wait an backport this after 1.1.1 ships?

asfgit pushed a commit that referenced this pull request Nov 17, 2014
This patch adds error-detection logic to throw an exception when attempting to create multiple active SparkContexts in the same JVM, since this is currently unsupported and has been known to cause confusing behavior (see SPARK-2243 for more details).

**The solution implemented here is only a partial fix.**  A complete fix would have the following properties:

1. Only one SparkContext may ever be under construction at any given time.
2. Once a SparkContext has been successfully constructed, any subsequent construction attempts should fail until the active SparkContext is stopped.
3. If the SparkContext constructor throws an exception, then all resources created in the constructor should be cleaned up (SPARK-4194).
4. If a user attempts to create a SparkContext but the creation fails, then the user should be able to create new SparkContexts.

This PR only provides 2) and 4); we should be able to provide all of these properties, but the correct fix will involve larger changes to SparkContext's construction / initialization, so we'll target it for a different Spark release.

### The correct solution:

I think that the correct way to do this would be to move the construction of SparkContext's dependencies into a static method in the SparkContext companion object.  Specifically, we could make the default SparkContext constructor `private` and change it to accept a `SparkContextDependencies` object that contains all of SparkContext's dependencies (e.g. DAGScheduler, ContextCleaner, etc.).  Secondary constructors could call a method on the SparkContext companion object to create the `SparkContextDependencies` and pass the result to the primary SparkContext constructor.  For example:

```scala
class SparkContext private (deps: SparkContextDependencies) {
  def this(conf: SparkConf) {
    this(SparkContext.getDeps(conf))
  }
}

object SparkContext(
  private[spark] def getDeps(conf: SparkConf): SparkContextDependencies = synchronized {
    if (anotherSparkContextIsActive) { throw Exception(...) }
    var dagScheduler: DAGScheduler = null
    try {
        dagScheduler = new DAGScheduler(...)
        [...]
    } catch {
      case e: Exception =>
         Option(dagScheduler).foreach(_.stop())
          [...]
    }
    SparkContextDependencies(dagScheduler, ....)
  }
}
```

This gives us mutual exclusion and ensures that any resources created during the failed SparkContext initialization are properly cleaned up.

This indirection is necessary to maintain binary compatibility.  In retrospect, it would have been nice if SparkContext had no private constructors and could only be created through builder / factory methods on its companion object, since this buys us lots of flexibility and makes dependency injection easier.

### Alternative solutions:

As an alternative solution, we could refactor SparkContext's primary constructor to perform all object creation in a giant `try-finally` block.  Unfortunately, this will require us to turn a bunch of `vals` into `vars` so that they can be assigned from the `try` block.  If we still want `vals`, we could wrap each `val` in its own `try` block (since the try block can return a value), but this will lead to extremely messy code and won't guard against the introduction of future code which doesn't properly handle failures.

The more complex approach outlined above gives us some nice dependency injection benefits, so I think that might be preferable to a `var`-ification.

### This PR's solution:

- At the start of the constructor, check whether some other SparkContext is active; if so, throw an exception.
- If another SparkContext might be under construction (or has thrown an exception during construction), allow the new SparkContext to begin construction but log a warning (since resources might have been leaked from a failed creation attempt).
- At the end of the SparkContext constructor, check whether some other SparkContext constructor has raced and successfully created an active context.  If so, throw an exception.

This guarantees that no two SparkContexts will ever be active and exposed to users (since we check at the very end of the constructor).  If two threads race to construct SparkContexts, then one of them will win and another will throw an exception.

This exception can be turned into a warning by setting `spark.driver.allowMultipleContexts = true`.  The exception is disabled in unit tests, since there are some suites (such as Hive) that may require more significant refactoring to clean up their SparkContexts.  I've made a few changes to other suites' test fixtures to properly clean up SparkContexts so that the unit test logs contain fewer warnings.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3121 from JoshRosen/SPARK-4180 and squashes the following commits:

23c7123 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4180
d38251b [Josh Rosen] Address latest round of feedback.
c0987d3 [Josh Rosen] Accept boolean instead of SparkConf in methods.
85a424a [Josh Rosen] Incorporate more review feedback.
372d0d3 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4180
f5bb78c [Josh Rosen] Update mvn build, too.
d809cb4 [Josh Rosen] Improve handling of failed SparkContext creation attempts.
79a7e6f [Josh Rosen] Fix commented out test
a1cba65 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4180
7ba6db8 [Josh Rosen] Add utility to set system properties in tests.
4629d5c [Josh Rosen] Set spark.driver.allowMultipleContexts=true in tests.
ed17e14 [Josh Rosen] Address review feedback; expose hack workaround for existing unit tests.
1c66070 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4180
06c5c54 [Josh Rosen] Add / improve SparkContext cleanup in streaming BasicOperationsSuite
d0437eb [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet.
c4d35a2 [Josh Rosen] Log long form of creation site to aid debugging.
918e878 [Josh Rosen] Document "one SparkContext per JVM" limitation.
afaa7e3 [Josh Rosen] [SPARK-4180] Prevent creations of multiple active SparkContexts.

(cherry picked from commit 0f3ceb5)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
@asfgit asfgit closed this in 0f3ceb5 Nov 17, 2014
@JoshRosen
Copy link
Contributor Author

Sure; I've removed 1.1.1 as a target version and added 1.1.2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants