Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9144] Remove DAGScheduler.runLocallyWithinThread and spark.localExecution.enabled #7484

Closed
wants to merge 4 commits into from

Conversation

JoshRosen
Copy link
Contributor

Spark has an option called spark.localExecution.enabled; according to the docs:

Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.

This feature ends up adding quite a bit of complexity to DAGScheduler, especially in the runLocallyWithinThread method, but as far as I know nobody uses this feature (I searched the mailing list and haven't seen any recent mentions of the configuration nor stacktraces including the runLocally method). As a step towards scheduler complexity reduction, I propose that we remove this feature and all code related to it for Spark 1.5.

@JoshRosen
Copy link
Contributor Author

Ping @tdas, who helped come up with the idea of removing this after we noticed that some messiness in error-handling in runLocallyWithinThread led to a problem in #7385.

Also, ping @kayousterhout or @markhamstra to see if they can think of any reasons why we should save this feature.

} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to be removing this duplication.

@markhamstra
Copy link
Contributor

If nobody is going to miss it, I'd be happy to be rid of localExecution. But this PR should really be "Deprecate DAGScheduler.runLocallyWithinThread and spark.localExecution.enabled" since the actual removal can't occur until later.

@JoshRosen
Copy link
Contributor Author

Fair enough RE: deferring the actual removal to 1.6. I can re-work this to only deprecate the setting and the public methods that expose the allowLocal flag.

@@ -98,8 +98,7 @@ class KafkaRDD[
val res = context.runJob(
this,
(tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
parts.keys.toArray,
allowLocal = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay.

@SparkQA
Copy link

SparkQA commented Jul 18, 2015

Test build #37675 has finished for PR 7484 at commit eec39fa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class NaiveBayes(override val uid: String)
    • class PCA(JavaEstimator, HasInputCol, HasOutputCol):
    • class PCAModel(JavaModel):
    • abstract class Expression extends TreeNode[Expression] with Product
    • trait Generator extends Expression
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging with Product
    • abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Product with Serializable

@rxin
Copy link
Contributor

rxin commented Jul 18, 2015

I think the idea is to run stuff like take much faster. Why do we want to remove this?

@JoshRosen
Copy link
Contributor Author

@rxin, it turns out that this optimization for local actions is guarded behind a feature flag which is off by default. Although this path gets tested in DAGSchedulerSuite, I think it's somewhat unlikely that it ends up getting used in most production deployments.

@JoshRosen
Copy link
Contributor Author

Also, there is a bit of messiness in how runLocallyInThread handles exceptions that led to a flaky test in one of my other patches, so I wanted to see whether we could consider just removing that codepath entirely rather than going through the work to produce and review fixes for that rare corner-case in such a rarely-used feature.

@mateiz
Copy link
Contributor

mateiz commented Jul 22, 2015

FYI, this used to be on by default (and not flagged) until August last year, when this commit turned it off: http://mail-archives.apache.org/mod_mbox/spark-commits/201408.mbox/%3C9f2f6315e068441787bf791864573776@git.apache.org%3E. It doesn't seem horrible to keep it off forever since it created some problems before. The main place it helped was if you call first(), take() etc on a dataset when working interactively, but maybe sending one task isn't that bad.

@mateiz
Copy link
Contributor

mateiz commented Jul 22, 2015

BTW it would also be nice to test the difference this makes before deciding, though the optimization only helps in somewhat limited cases (e.g. it won't help much if you do a shuffle).

rxin added a commit to rxin/spark that referenced this pull request Jul 22, 2015
[SPARK-9144] Remove DAGScheduler.runLocallyWithinThread and spark.localExecution.enabled

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@rxin
Copy link
Contributor

rxin commented Jul 22, 2015

I think the main time when this can help a lot is when you are connecting to a busy cluster, and in that case, it can take a while to get something scheduled.

If the cluster is idle, it takes just a few ms to launch a task, and as a result users won't be able to tell the difference at all.

@rxin
Copy link
Contributor

rxin commented Jul 22, 2015

I submitted #7585 to bring this up to date. We can merge that one.

asfgit pushed a commit that referenced this pull request Jul 23, 2015
…alExecution.enabled

Spark has an option called spark.localExecution.enabled; according to the docs:

> Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.

This feature ends up adding quite a bit of complexity to DAGScheduler, especially in the runLocallyWithinThread method, but as far as I know nobody uses this feature (I searched the mailing list and haven't seen any recent mentions of the configuration nor stacktraces including the runLocally method). As a step towards scheduler complexity reduction, I propose that we remove this feature and all code related to it for Spark 1.5.

This pull request simply brings #7484 up to date.

Author: Josh Rosen <joshrosen@databricks.com>
Author: Reynold Xin <rxin@databricks.com>

Closes #7585 from rxin/remove-local-exec and squashes the following commits:

84bd10e [Reynold Xin] Python fix.
1d9739a [Reynold Xin] Merge pull request #7484 from JoshRosen/remove-localexecution
eec39fa [Josh Rosen] Remove allowLocal(); deprecate user-facing uses of it.
b0835dc [Josh Rosen] Remove local execution code in DAGScheduler
8975d96 [Josh Rosen] Remove local execution tests.
ffa8c9b [Josh Rosen] Remove documentation for configuration
@JoshRosen
Copy link
Contributor Author

Closing since this was done in #7585.

@JoshRosen JoshRosen closed this Jul 23, 2015
@JoshRosen JoshRosen deleted the remove-localexecution branch January 14, 2016 03:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants