Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4087] use broadcast for task only when task is large enough #2933

Closed
wants to merge 6 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 24, 2014

Using broadcast for small tasks has no benefits or even some regressions (several RPCs), also there some stable issues with broadcast, so we should use broadcast for tasks only when the serialized tasks are large enough (larger than 8k, be default, maybe changed in future).

In practice, most of tasks are small, so this should improve the stability for most user cases, especially for tests, which will start and stop context multiple times.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22161 has started for PR 2933 at commit 4ca3caa.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22161 has finished for PR 2933 at commit 4ca3caa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class LogInfo(startTime: Long, endTime: Long, path: String)

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22161/
Test FAILed.

@JoshRosen
Copy link
Contributor

@ScrapCodes It looks like the Scala style checks failed due to a line that contained 104 characters, but the scalastyle output didn't list the actual cause of the failure:

Scalastyle checks failed at following occurrences:
java.lang.RuntimeException: exists error
    at scala.sys.package$.error(package.scala:27)
    at scala.Predef$.error(Predef.scala:142)
[error] (core/*:scalastyle) exists error

Any idea why it's not displaying the cause of the failure?

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #418 has started for PR 2933 at commit 4ca3caa.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22169 has started for PR 2933 at commit 28a9409.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #418 has finished for PR 2933 at commit 4ca3caa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReconnectWorker(masterUrl: String) extends DeployMessage
    • throw new SparkException("Failed to load class to register with Kryo", e)
    • class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])])
    • raise TypeError("Cannot convert a Row class into dict")
    • class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
    • class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
    • case class LogInfo(startTime: Long, endTime: Long, path: String)

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22169 has finished for PR 2933 at commit 28a9409.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22169/
Test FAILed.

@JoshRosen
Copy link
Contributor

Do you mind opening a JIRA for this? You can link it to the other broadcast optimization one.

@@ -124,6 +123,10 @@ class DAGScheduler(
/** If enabled, we may run certain actions like take() and first() locally. */
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)

/** Broadcast the serialized tasks only when they are bigger than it */
private val broadcastTaskMinSize =
sc.getConf.getInt("spark.scheduler.broadcastTaskMinSize", 8) * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the serialized task ends up being sent in an Akka message, so there could be problems if a user configures this to be higher than the capacity of the Akka frame.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, user will take the risk if they change it to non-reasonable values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps call this broadcastTaskMinSizeKB? Should we document this flag? Either way, there should be some mention that your jobs will literally stop working silently if you change this to be similarly to the akka frame size. It is not clear that this is sent via Akka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep this internal, it's a tradeoff between 1.0 and 1.1, most of the users do need to touch this.

We could document it later if user really need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user change akka frame size to a small one, the jobs also will stop working silently even without this patch.

I think we should have good default values for these, and assume that user know the risk if they want to change some configs, it's not easy to make sure that they are consistant between all possible values for all the configs.

@davies davies changed the title use broadcast for task only when task is large enough [SPARK-4087] use broadcast for task only when task is large enough Oct 24, 2014
@@ -69,6 +70,10 @@ private[spark] class Stage(
var resultOfJob: Option[ActiveJob] = None
var pendingTasks = new HashSet[Task[_]]

/** This is used to track the life cycle of broadcast,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-minor style nit, but I think our usual style is to not place comments on the first /** line.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #427 has started for PR 2933 at commit 28a9409.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22196 has started for PR 2933 at commit 7dfe41e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #427 has finished for PR 2933 at commit 28a9409.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo
Copy link
Contributor

witgo commented Oct 25, 2014

@JoshRosen
#2846 fixes the scalastyle bug.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22196 timed out for PR 2933 at commit 7dfe41e after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22196/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #450 has started for PR 2933 at commit 7dfe41e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #450 has finished for PR 2933 at commit 7dfe41e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22223 has started for PR 2933 at commit f3e2081.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22223 has finished for PR 2933 at commit f3e2081.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22223/
Test PASSed.

@JoshRosen
Copy link
Contributor

I've been thinking about this some more and I wonder about the motivation for this change: how much of a performance benefit does this buy us for typical workloads? This (and the other torrentbroadcast inlining patch) add extra code-paths / complexity, but do they buy us measurable performance benefits? I'm concerned about adding extra branches to already-complicated code.

@davies
Copy link
Contributor Author

davies commented Oct 26, 2014

@JoshRosen The motivation is not about performance, it's about stability. Sending tasks to executors is the critical part in spark, it should be as stable as possible. Using broadcast to sending tasks bring much of the complexity (runtime) to it, actually it introduce some problems for us (we did not have them in 1.0). The motivation of this patch is remove the complexity of broadcast in most cases, only using it when broadcast can bring performance benefits (the tasks is large enough). In the future, maybe we could increase broadcastTaskMinSizeKB to 100 or even more.

This bring some complexity for code (not big), but actually simplify the runtime behavior. It also will have some performance gain (no RPC or cache at all),

@pwendell
Copy link
Contributor

I find this a little bit hacky. If the broadcast implementation has bugs or performance issues, we should just fix them and it will stabalize over time like any other new features we add. Having this mode where we might do one thing and might do another, it will make debugging and measuring things trickier. And we'll expose this configuration option which it seems like ultimately we will want to remove.

IMO this would only be justified if we had a well documented performance issue that we felt we simply can't solve within the broadcast architecture, then you would give a latch here for people to avoid broadcasting.

@davies
Copy link
Contributor Author

davies commented Oct 26, 2014

Broadcast (especially TorrentBroadcast) is designed for large object, using it to send out small shared variables just like using tank to shot a mosquitoes, it's not a good approach in the begging, which make simple things complicated.

The motivation of broadcasting tasks, is to solve the performance for BIG closure, it should not brings any regression for other cases (small closure), the latter are more common and important in daily usage. In order to fix the regression (performance or stability), we may need to introduce even more complicated logic (just like embedded broadcast or piggy back small blocks).

@pwendell
Copy link
Contributor

I don't see fundamentally why the broadcast mechanism can't be done as efficiently as task launching itself. Do you have a reproducible workload where this caused a performance regression and we couldn't optimize the broadcast sufficiently?

@davies
Copy link
Contributor Author

davies commented Oct 26, 2014

The motivation is not about performance, it's about stability.

We're fighting with the problem of failure during deserialize a task for days (failed in TorrentBroadcast), they can not be reproduced easily. Hope that we can fix it before 1.2 release.

@JoshRosen
Copy link
Contributor

We're fighting with the problem of failure during deserialize a task for days (failed in TorrentBroadcast)

I thought we had fixed this issue; can you point me to new occurrences of it?

@davies
Copy link
Contributor Author

davies commented Oct 27, 2014

@JoshRosen I think we still have it (in tests at tonight):

[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost): java.io.IOException: unexpected exception type
[info]         java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
[info]         java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
[info]         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
[info]         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
[info]         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[info]         java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
[info]         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
[info]         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
[info]         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[info]         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
[info]         org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
[info]         org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
[info]         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
[info]         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]         java.lang.Thread.run(Thread.java:745)
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1192)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1181)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1180)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1180)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:695)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:695)
[info]   at scala.Option.foreach(Option.scala:236)
[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:695)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1398)
[info]   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[info]   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[info]   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[info]   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[info]   at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[info]   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I think this exception is triggled in TorrentBroadcast.

@JoshRosen
Copy link
Contributor

This is really strange; I thought that the "unexpected exception type" would have been addressed by #2932

@JoshRosen
Copy link
Contributor

Can you point me to the commit that produced that stacktrace?

@davies
Copy link
Contributor Author

davies commented Oct 27, 2014

@JoshRosen @pwendell The test branch (internal) did not have that commit. That commit only improve the logging, it did not solve the stability problem.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #486 has started for PR 2933 at commit f3e2081.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #486 has finished for PR 2933 at commit f3e2081.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@SparkQA
Copy link

SparkQA commented Nov 2, 2014

Test build #22746 has started for PR 2933 at commit aab61a8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 2, 2014

Test build #22746 has finished for PR 2933 at commit aab61a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DecimalType(DataType):
    • case class UnscaledValue(child: Expression) extends UnaryExpression
    • case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends UnaryExpression
    • case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)
    • case class PrecisionInfo(precision: Int, scale: Int)
    • case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType
    • final class Decimal extends Ordered[Decimal] with Serializable
    • trait DecimalIsConflicted extends Numeric[Decimal]

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22746/
Test PASSed.

@squito
Copy link
Contributor

squito commented Nov 10, 2014

I agree with @pwendell . It seems like the right thing to do is just fix Broadcast ... and if we can't, then wouldn't you also want to turn off Broadcast even for big closures?

@JoshRosen
Copy link
Contributor

What's the status on this PR / JIRA? As far as I know, it seems that TorrentBroadcast has been more stable lately, so if the only motivation here was stability then I think we might be able to close this.

@davies
Copy link
Contributor Author

davies commented Dec 16, 2014

Close this now.

@davies davies closed this Dec 16, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants