Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14540][Core] Fix remaining major issues for Scala 2.12 Support #21930

Closed
wants to merge 1 commit into from

Conversation

skonto
Copy link
Contributor

@skonto skonto commented Jul 31, 2018

What changes were proposed in this pull request?

This PR addresses issues 2,3 in this document.

  • We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

  • We also fix the issue due to Overloading resolution for functions fails with Unit adaptation scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: addTaskCompletionListener. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

How was this patch tested?

This was manually tested:

./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

@holdensmagicalunicorn
Copy link

@skonto, thanks! I am a bot who has found some folks who might be able to help with the review:@pwendell, @cloud-fan and @mateiz

@skonto
Copy link
Contributor Author

skonto commented Jul 31, 2018

@lrytz @retronym @adriaanm @debasishg @srowen @felixcheung fyi, pls review.

@cloud-fan
Copy link
Contributor

cc @JoshRosen too

@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this? I don't think it is a problem binary compatibility wise, but it seems a but weird since we don't use the result of the function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around scala/bug#11016 right? I'd prefer any solution that doesn't involve changing all the callers, but looks like both workarounds require something to be done. At least I'd document the purpose of U here.

That said, user code can call this right? And it would have to implement a similar change to work with 2.12? that's probably OK in the sense that any user app must make several changes to be compatible with 2.12.

I don't think 2.11 users would find there is a change to the binary API. Would a 2.11 user need to change its calls to specify a type for U with this change? because it looks like it's not optional, given that Spark code has to change its calls. Is that not a source incompatibility?

If it's not, then, I guess I wonder if you can avoid changing all the calls in Spark?

Copy link
Contributor Author

@skonto skonto Jul 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this covers that bug. So if you build with 2.11 you dont need to specify the type Unit (I tried that) when you make the call since there is no ambiguity, compiler does not face an overloading issue. So at the source level there shouldnt be an issue. Binary compatibility is also described in the doc.
With 2.12 both addTaskCompletionListener methods end up to be SAM types and the Unit adaption causes this issue. I am not sure we can do anything more here and this is specific to 2.12, otherwise you get compilation errors for that version. @retronym or @lrytz may add more context. I certainly should document this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if it's binary- and source-compatible with existing user programs for 2.11 users, that's fine. Bets are off for 2.12 users anyway.

When the release notes are crafted for 2.4, we'll want to mention this JIRA (I'll tag it) and issues like this.

// the outer closure's parent pointer. This will make `inner2` serializable.
verifyCleaning(
inner2, serializableBefore = false, serializableAfter = true, transitive = true)
if(!ClosureCleanerSuite2.supportsLMFs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space after "if". scalastyle might flag that. While you're at it, what about flipping the blocks to avoid a negation? just for a tiny bit of clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around scala/bug#11016 right? I'd prefer any solution that doesn't involve changing all the callers, but looks like both workarounds require something to be done. At least I'd document the purpose of U here.

That said, user code can call this right? And it would have to implement a similar change to work with 2.12? that's probably OK in the sense that any user app must make several changes to be compatible with 2.12.

I don't think 2.11 users would find there is a change to the binary API. Would a 2.11 user need to change its calls to specify a type for U with this change? because it looks like it's not optional, given that Spark code has to change its calls. Is that not a source incompatibility?

If it's not, then, I guess I wonder if you can avoid changing all the calls in Spark?

@SparkQA
Copy link

SparkQA commented Jul 31, 2018

Test build #93828 has finished for PR 21930 at commit d466a9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning the object $obj of class $

@skonto
Copy link
Contributor Author

skonto commented Jul 31, 2018

This patch adds the following public classes (experimental): logDebug(s\" + cloning the object $obj of class $

Is this normal?

@hvanhovell
Copy link
Contributor

Yeah I would not worry about it

@felixcheung
Copy link
Member

I think that's binary-incompatible breaking API change, right?
ex. https://github.com/apache/spark/pull/21930/files#diff-2b8f0f66fe5397b169d0f754e99da8d5R64

@skonto
Copy link
Contributor Author

skonto commented Aug 1, 2018

@felixcheung AFAIK as stated (@lrytz) in the doc it shouldnt be. It can be tested I guess.

@skonto
Copy link
Contributor Author

skonto commented Aug 1, 2018

@felixcheung I tested that with this simple program with Scala 2.11. The app jar was built against the officially released artifacts (2.3.1) with Scala 2.11.8:

    val spark = SparkSession
      ...
     spark.sparkContext.makeRDD(1 to 10000).foreachPartition{
      x => TaskContext.get().addTaskCompletionListener{
        y: TaskContext => println(s"Finishing...${y.partitionId()}...${x.length}")
      }
      x
    }

I run this app with the 2.3.1 official distro and also by building a distro from this PR again with Scala 2.11.6.
I got no errors, both cases output is:

Finishing...3...1250
Finishing...0...1250
Finishing...4...1250
Finishing...7...1250
Finishing...5...1250
Finishing...6...1250
Finishing...1...1250
Finishing...2...1250

The definition of binary compatibility states that when you change a class, client code using that class should not break when using it (without re-compiling client code). I hope I am not missing something here.
I also run the same app against a distro built with Scala 2.12 with this PR. Again I did not see any issue.

@skonto
Copy link
Contributor Author

skonto commented Aug 1, 2018

@srowen I updated the PR with the minor fixes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93878 has finished for PR 21930 at commit c72362b.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning the object $obj of class $

@srowen
Copy link
Member

srowen commented Aug 1, 2018

I can imagine it is binary-compatible because the change is just to generic types, that are erased here (no ClassTags or anything).

To be clear about source compatibility, do I have this right?

  • The change to addTaskCompletionListener callers is necessary for 2.12 callers
  • It isn't necessary for 2.11 callers
  • It's necessary for Spark because it compiles against both

@skonto
Copy link
Contributor Author

skonto commented Aug 1, 2018

@srowen that is my understanding too and yes they are erased AFAIK, but just in case (out of curiosity) I tried it...

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93879 has finished for PR 21930 at commit 30d83f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning the object $obj of class $

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's mergeable as-is, even, but if you have time for one more pass on some nits, this could also improve a little on the code that was there.

* @param closure the closure to check.
*/
private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = {
if (scala.util.Properties.versionString.contains("2.11")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this part of the diff was collapsed and I didn't see it. I have a few more minor questions/suggestions on the closure cleaner change but would indeed like to get this in for 2.4, and it looks close.

Here, what about storing the result of this in a private field so as not to compute it every time? it might not be a big deal, don't know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

closure.getClass.isSynthetic &&
closure
.getClass
.getInterfaces.exists{x: Class[_] => x.getName.equals("scala.Serializable") }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More nits: does (_.getName.equals...) not work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to test, I think I had to use it.


if (isClosureCandidate) {
try {
val res = inspect(closure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline res? just little stuff that might streamline this a bit, it doesn't matter

logDebug(" + outer objects: " + outerObjects.size)
outerObjects.foreach { o => logDebug(" " + o) }
}
if(lambdaFunc.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space after if

val declaredMethods = func.getClass.getDeclaredMethods

if (log.isDebugEnabled) {
logDebug(" + declared fields: " + declaredFields.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using interpolation, while bothering to update these debug statements?

// If accessed fields is not populated yet, we assume that
// the closure we are trying to clean is the starting one
if (accessedFields.isEmpty) {
logDebug(s" + populating accessed fields because this is the starting closure")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't actually need interpolation after all


// List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was existing code but might be OK to clean this up to val outerPairs = outerClasses.zip(outerObjects).reverse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I can do.

// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
var parent: AnyRef = null
if (outerPairs.size > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.nonEmpty? same, it was existing code, just seeing if we can scrub it

Copy link
Contributor Author

@skonto skonto Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure.

// closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1
// visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see
// https://github.com/scala/scala-dev/issues/109
val isTargetMethod = if (targetMethodName.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify to isTargetMethod = targetMethodName.isEmpty || ... || ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will give it a shot.

@skonto
Copy link
Contributor Author

skonto commented Aug 1, 2018

@srowen I think its ok now. Ready to go.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93915 has finished for PR 21930 at commit 095aed2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning the object $obj of class $

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93916 has finished for PR 21930 at commit af65ef3.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning the object $obj of class $

@skonto
Copy link
Contributor Author

skonto commented Aug 2, 2018

@srowen gentle ping.

@srowen
Copy link
Member

srowen commented Aug 2, 2018

Merged to master

@asfgit asfgit closed this in a657369 Aug 2, 2018
@skonto
Copy link
Contributor Author

skonto commented Aug 2, 2018

@srowen thanks! So 2.12 will be optional for Spark 2.4? And the major version for Spark 3.0?
What is the plan?

@srowen
Copy link
Member

srowen commented Aug 2, 2018

Yes we need to also create a 2.12 build of Spark in 2.4. We might still have to label it "beta" as I still kind of suspect there's a corner case lurking here. I can't speak for 3.0, but would assume it would try to support 2.13, and not support 2.11.

@skonto
Copy link
Contributor Author

skonto commented Aug 2, 2018

Sure @lrytz can have a second look on this, also it needs to be battle tested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants