Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13640][SQL] Synchronize ScalaReflection.mirror method. #11487

Closed
wants to merge 8 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Mar 3, 2016

What changes were proposed in this pull request?

ScalaReflection.mirror method should be synchronized when scala version is 2.10 because universe.runtimeMirror is not thread safe.

How was this patch tested?

I added a test to check thread safety of ScalaRefection.mirror method in ScalaReflectionSuite, which will throw the following Exception in Scala 2.10 without this patch:

[info] - thread safety of mirror *** FAILED *** (49 milliseconds)
[info]   java.lang.UnsupportedOperationException: tail of empty list
[info]   at scala.collection.immutable.Nil$.tail(List.scala:339)
[info]   at scala.collection.immutable.Nil$.tail(List.scala:334)
[info]   at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
[info]   at scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477)
[info]   at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777)
[info]   at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:235)
[info]   at scala.reflect.runtime.JavaMirrors$class.createMirror(JavaMirrors.scala:34)
[info]   at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:61)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:36)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:256)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:252)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
[info]   at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
[info]   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Notice that the test will pass when Scala version is 2.11.

@rxin
Copy link
Contributor

rxin commented Mar 3, 2016

cc @marmbrus

@ueshin is back!

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52375 has finished for PR 11487 at commit 389d644.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 3, 2016

I assume it still works in 2.11, and is at worst redundant but doesn't hurt much. This is definitely the right lock? Maybe a comment pointing to why that is correct would help since it isn't obvious

@sarutak
Copy link
Member

sarutak commented Mar 3, 2016

This PR can solve the issue mentioned in SPARK-10719 right?

@sarutak
Copy link
Member

sarutak commented Mar 3, 2016

@zsxwing who reported SPARK-10719.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52397 has finished for PR 11487 at commit cee5896.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 3, 2016

SPARK-10719 is different. I don't think this one will resolve SPARK-10719 since the issue is in the compiler generated codes which never call org.apache.spark.sql.catalyst.ScalaReflection.mirror. You can check the stack trace I reported in the ticket.

@ueshin
Copy link
Member Author

ueshin commented Mar 3, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52412 has finished for PR 11487 at commit cee5896.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Mar 4, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52441 has finished for PR 11487 at commit cee5896.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def testThreadSafetyFor(name: String)(exec: () => Any) = {
test(s"thread safety of ${name}") {
for (_ <- 0 until 100) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(0 until 100).foreach? probably just a matter of taste but it's symmetrical with your inner loop then.
You can import java.net.URLClassLoader.
It doesn't really seem like you need a method here; it took a moment to see there was a test in here.

Maybe it's obvious to you but why do all these classes/methods need to be tested separately?

And is this locking still safe in 2.11?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thank you for your comment.

(0 until 100).foreach?

I repeated the test 100 times here because it is for thread-safety. Thread safety problem sometimes happens but sometimes doesn't.

You can import java.net.URLClassLoader.

I'll modify to use import.

It doesn't really seem like you need a method here; it took a moment to see there was a test in here.

I'll modify to move out of the method.

Maybe it's obvious to you but why do all these classes/methods need to be tested separately?

The methods are public, i.e. can be called by multi-thread, so I thought these also need to be tested.
But I'm wondering some of them could be removed?

And is this locking still safe in 2.11?

Yes, reflection in Scala 2.11 is thread-safe.
If we don't support Scala 2.10, these lockings in ScalaReflection would not be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand repeating it 100 times, but your idiom for iterating n times is different on two nearby lines.

I actually meant the opposite -- remove the helper method but run tests in a loop still -- but leave it.

My question was more like, does the extra locking cause trouble in 2.11? I know you said 2.11 doesn't need this change per se.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52542 has finished for PR 11487 at commit 7ac9648.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52541 has finished for PR 11487 at commit 0055fd1.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Mar 7, 2016

@srowen

My question was more like, does the extra locking cause trouble in 2.11? I know you said 2.11 doesn't need this change per se.

I think the extra locking cause a little performance penalty while building logical plan in a driver process but it is much smaller than execution time.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52562 has finished for PR 11487 at commit 63b1797.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 8, 2016

OK, that seems reasonable. I was more wondering if we risk some deadlock in 2.11, but the tests are also potentially testing for that problem as well.

@srowen
Copy link
Member

srowen commented Mar 9, 2016

Merged to master

@asfgit asfgit closed this in 2c5af7d Mar 9, 2016
@ueshin
Copy link
Member Author

ueshin commented Mar 9, 2016

@srowen Thank you for merging this and your review.

@timotta
Copy link

timotta commented Mar 21, 2016

Will this fix be applyied to 1.6 version?

@srowen
Copy link
Member

srowen commented Mar 21, 2016

Could be; I had not done so to be conservative, since I wasn't entirely sure of the implication, and sounded like a rare issue. But if you have some reasonable argument it warrants back-porting, I think that's possible.

@zsxwing
Copy link
Member

zsxwing commented Mar 21, 2016

@timotta FYI, this one doesn't resolve SPARK-10719. If you want to use DataFrame in multiple threads with Scala 2.10, you may still encounter SPARK-10719. Since the Scala community doesn't want to fix SI-6240 for 2.10, I don't think there is anything to do in Spark. Maybe the best option is using Scala 2.11.

@timotta
Copy link

timotta commented Mar 21, 2016

@zsxwing I could use scala 2.11 but if it's not a spark issue, Spark should distribute as default with scala 2.11 not 2.10, since there is this bug.

@timotta
Copy link

timotta commented Mar 21, 2016

@zsxwing i think the issued im looking to fix is that one: https://issues.apache.org/jira/browse/SPARK-13640

@zsxwing
Copy link
Member

zsxwing commented Mar 21, 2016

@zsxwing I could use scala 2.11 but if it's not a spark issue, Spark should distribute as default with scala 2.11 not 2.10, since there is this bug.

Spark 2.0.0 will use 2.11 by default. However, we cannot do it for 1.6 as it's only a maintenance branch.

@zsxwing i think the issued im looking to fix is that one: https://issues.apache.org/jira/browse/SPARK-
13640

I meant the fix for SPARK-13640 doesn't fix all race conditions in DataFrame.

roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

`ScalaReflection.mirror` method should be synchronized when scala version is `2.10` because `universe.runtimeMirror` is not thread safe.

## How was this patch tested?

I added a test to check thread safety of `ScalaRefection.mirror` method in `ScalaReflectionSuite`, which will throw the following Exception in Scala `2.10` without this patch:

```
[info] - thread safety of mirror *** FAILED *** (49 milliseconds)
[info]   java.lang.UnsupportedOperationException: tail of empty list
[info]   at scala.collection.immutable.Nil$.tail(List.scala:339)
[info]   at scala.collection.immutable.Nil$.tail(List.scala:334)
[info]   at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
[info]   at scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477)
[info]   at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777)
[info]   at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:235)
[info]   at scala.reflect.runtime.JavaMirrors$class.createMirror(JavaMirrors.scala:34)
[info]   at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:61)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:36)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:256)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:252)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
[info]   at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
[info]   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

Notice that the test will pass when Scala version is `2.11`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#11487 from ueshin/issues/SPARK-13640.
@squito
Copy link
Contributor

squito commented Apr 6, 2016

@ueshin Have you found this test to pass consistently on 2.10? It seems we don't actually run these tests on 2.10 at all anymore, but when I run locally, say with

build/sbt -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Dscala-2.10 -Pscala-2.10
test-only *.ScalaReflectionSuite

it sometimes works, but sometimes fails with a variety of failures like:

[info] - SPARK-13640: thread safety of dataTypeFor *** FAILED *** (1 second, 546 milliseconds)
[info]   scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.UnsupportedOperationException: tail of empty list
[info] scala.collection.immutable.Nil$.tail(List.scala:339)
[info] scala.collection.immutable.Nil$.tail(List.scala:334)
[info] scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
[info] scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477)
[info] scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777)
[info] scala.reflect.internal.Mirrors$Roots$RootPackage$.<init>(Mirrors.scala:268)
[info] scala.reflect.internal.Mirrors$Roots.RootPackage$lzycompute(Mirrors.scala:267)
[info] scala.reflect.internal.Mirrors$Roots.RootPackage(Mirrors.scala:267)
[info] scala.reflect.internal.Mirrors$Roots.RootPackage(Mirrors.scala:245)
[info] scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:238)
...

or

[info] - SPARK-13640: thread safety of constructorFor *** FAILED *** (8 seconds, 59 milliseconds)
[info]   scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving package catalyst
[info]   at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2768)
[info]   at scala.reflect.internal.transform.Erasure$class.rebindInnerClass(Erasure.scala:72)
[info]   at scala.reflect.internal.transform.Transforms$$anonfun$3$$anon$1.rebindInnerClass(Transforms.scala:27)
[info]   at scala.reflect.internal.transform.Erasure$ErasureMap.eraseNormalClassRef(Erasure.scala:114)
[info]   at scala.reflect.internal.transform.Erasure$ErasureMap.apply(Erasure.scala:132)
[info]   at scala.reflect.internal.transform.Transforms$class.transformedType(Transforms.scala:39)
[info]   at scala.reflect.internal.SymbolTable.transformedType(SymbolTable.scala:13)
[info]   at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:306)
[info]   at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:298)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$class.getClassNameFromType(ScalaReflection.scala:775)
...

It seems like a pretty rare race -- any attempt I make to add any logging, or even a try/catch, makes it seem to disappear (at least, less than 1 in 10 runs after any changes). Maybe this is just the same as SPARK-10719, and there isn't anything better we can do here?

@ueshin
Copy link
Member Author

ueshin commented Apr 7, 2016

@squito Thank you for your report.
I executed the test on 2.10 many times while preparing this PR, but I have not found the failures.
The first one seems the same as SPARK-10719 but the second could be fixed by synchronizing the getClassNameFromType method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants