Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-5548: Fixed a race condition in AkkaUtilsSuite #4343

Closed

Conversation

jacek-lewandowski
Copy link
Contributor

Await.result and selection.resolveOne runs the same timeout simultaneously. When Await.result timeout is reached first, then TimeoutException is thrown. On the other hand, when selection.resolveOne timeout is reached first, ActorNotFoundException is thrown. This is an obvious race condition and the easiest way to fix it is to increase the timeout of one method to make sure the code fails on the other method first.

@jacek-lewandowski
Copy link
Contributor Author

Once approved, I'll create another PR for master.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26675 has started for PR 4343 at commit b9ba47e.

  • This patch merges cleanly.

@@ -371,7 +371,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
intercept[TimeoutException] {
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout * 2), timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the the PR's comment (= the commit message) how increasing a timeout fixes a race condition, instead of just making it less likely?

@JoshRosen
Copy link
Contributor

@jacek-lewandowski Since master and branch-1.3 are pretty much in sync at this point, feel free to open your PR directly against master; I can handle backports myself as part of the merge process, since our merge script makes it really easy to do cherry-picks. It's still worth opening separate backport PRs if the branches have diverged significantly, but if they haven't it's generally easier to open PRs against master and have the committer handle the ports. If you'd like a patch to be merged into multiple branches, just leave a comment in the PR description so the committer knows where to merge it.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26675 has finished for PR 4343 at commit b9ba47e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SimpleFunctionRegistry(val caseSensitive: Boolean) extends FunctionRegistry
    • class StringKeyHashMap[T](normalizer: (String) => String)
    • case class MultiAlias(child: Expression, names: Seq[String])

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26675/
Test PASSed.

@pwendell
Copy link
Contributor

pwendell commented Feb 4, 2015

@jacek-lewandowski how about just catching either exception?

@jacek-lewandowski
Copy link
Contributor Author

@pwendell I think the problem with catching either exception is that the ActorNotFoundException is also thrown in other situations - in this particular test we want to prove that actor system A cannot connect to the other actor system B because A doesn't trust B, and this results in timeout. This is even lower communication layer than SASL.
On the other hand ActorNotFoundException is thrown in case of timeout, in case there is no actor with that name or in case of authentication problems - completely different situations because they occur when we are successfully connected to the other actor system and that remote actor system refuses to return the reference to the requested actor.
Therefore, ActorNotFoundException when not related to the timeout should cause the test to fail because it means that we successfully connected to untrusted actor system.

@pwendell
Copy link
Contributor

pwendell commented Feb 5, 2015

I see, so you are worried about false negatives. For now, let's increase the timeout then.

asfgit pushed a commit that referenced this pull request Feb 5, 2015
`Await.result` and `selection.resolveOne` runs the same timeout simultaneously. When `Await.result` timeout is reached first, then `TimeoutException` is thrown. On the other hand, when `selection.resolveOne` timeout is reached first, `ActorNotFoundException` is thrown. This is an obvious race condition and the easiest way to fix it is to increase the timeout of one method to make sure the code fails on the other method first.

Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>

Closes #4343 from jacek-lewandowski/SPARK-5548-1.3 and squashes the following commits:

b9ba47e [Jacek Lewandowski] SPARK-5548: Fixed a race condition in AkkaUtilsSuite
@jacek-lewandowski
Copy link
Contributor Author

@pwendell so is this pr enough (according to what @JoshRosen said?) or should i create another one for master?

@pwendell
Copy link
Contributor

pwendell commented Feb 5, 2015

This one is fine. I can take care of getting it in master.

@pwendell
Copy link
Contributor

pwendell commented Feb 5, 2015

Actually can you close this PR? I already merged but it doesn't close correclty when someone merges into a topic branch.

@asfgit asfgit closed this in 081ac69 Feb 5, 2015
@dragos
Copy link
Contributor

dragos commented Feb 12, 2015

The race condition seems still there. @jacek-lewandowski I'm not 100% sure, but it doesn't seem related to the timeout this time. The stack trace indicates that the Identity message returned without finding any actor. Does that fall into the real failure category? In that case, there's another race condition in this test.

@JoshRosen
Copy link
Contributor

Yep, looks like it's still here: https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/93/testReport/junit/org.apache.spark.util/AkkaUtilsSuite/remote_fetch_ssl_on___untrusted_server/

The org.apache.spark.util.AkkaUtilsSuite.remote fetch ssl on - untrusted server test failed with the following exception:

sbt.ForkMain$ForkError: Expected exception java.util.concurrent.TimeoutException to be thrown, but akka.actor.ActorNotFound was thrown.
    at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:496)
    at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
    at org.scalatest.Assertions$class.intercept(Assertions.scala:1004)
    at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
    at org.apache.spark.util.AkkaUtilsSuite$$anonfun$8.apply$mcV$sp(AkkaUtilsSuite.scala:373)
    at org.apache.spark.util.AkkaUtilsSuite$$anonfun$8.apply(AkkaUtilsSuite.scala:349)
    at org.apache.spark.util.AkkaUtilsSuite$$anonfun$8.apply(AkkaUtilsSuite.scala:349)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
    at org.apache.spark.util.AkkaUtilsSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(AkkaUtilsSuite.scala:37)
    at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
    at org.apache.spark.util.AkkaUtilsSuite.runTest(AkkaUtilsSuite.scala:37)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    at org.apache.spark.util.AkkaUtilsSuite.org$scalatest$BeforeAndAfterAll$$super$run(AkkaUtilsSuite.scala:37)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
    at org.apache.spark.util.AkkaUtilsSuite.run(AkkaUtilsSuite.scala:37)
    at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
    at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
    at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: Actor not found for: ActorSelection[Anchor(akka.ssl.tcp://spark@localhost:50364/), Path(/user/MapOutputTracker)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@jacek-lewandowski
Copy link
Contributor Author

Here is the new PR #4653

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants