Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31399][CORE][2.4] Support indylambda Scala closure in ClosureCleaner #28577

Closed

Conversation

rednaxelafx
Copy link
Contributor

This is a backport of #28463 from Apache Spark master/3.0 to 2.4.
Minor adaptation include:

  • Retain the Spark 2.4-specific behavior of skipping the indylambda check when using Scala 2.11
  • Remove unnecessary LMF restrictions in ClosureCleaner tests
  • Address review comments in the original PR from @kiszk

Tested with the default Scala 2.11 build, and also tested ClosureCleaner-related tests in Scala 2.12 build as well:

  • repl: SingletonReplSuite
  • core: ClosureCleanerSuite and ClosureCleanerSuite2

What changes were proposed in this pull request?

This PR proposes to enhance Spark's ClosureCleaner to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:

  • When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:

  • Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
  • "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
    {
      val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
      val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
    }

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:

  • old, inner class style Scala closures, aka delambdafy:inline: default in Scala 2.11 and before
  • new, "indylambda" style Scala closures, aka delambdafy:method: default in Scala 2.12 and later

Why are the changes needed?

There had been previous effortsto extend Spark's ClosureCleaner to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for SPARK-14540.

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing this -- a REPL line object. e.g. in a Spark Shell:

:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect

In this example, func refers to a Scala closure that captures the enclosing this because it needs to access topLevelValue, which is in turn implemented as a field on the enclosing REPL line object.

The existing ClosureCleaner in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.

scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1@615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...

in the Scala 2.11 / Spark 2.4.x case:

Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1@615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw@64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw@66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read@c310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw@79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw@636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)

in the Scala 2.12 / Spark 2.4.x case after this PR:

Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1@6f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw@2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw@152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read@7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw@d980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw@557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920@569b57c4)

For more background of the new and old ways Scala lowers closures to Java bytecode, please see A note on how NSC (New Scala Compiler) lowers lambdas.

For more background on how Spark's ClosureCleaner works and what's needed to make it support "indylambda" Scala closures, please refer to A Note on Apache Spark's ClosureCleaner.

tl;dr

The ClosureCleaner works like a mark-sweep algorithm on fields:

  • Finding (a chain of) outer objects referenced by the starting closure;
  • Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
  • Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.
Outer Objects

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked $outer chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an $outer chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be arg$1 (instead of $outer).

So what's missing in the ClosureCleaner for the "indylambda" support is find and potentially clone+clean the captured enclosing this REPL line object. That's what this PR implements.

Inner Closures

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring $anonfun$.

The new, "indylambda" style Scala closures are compiled into static methods in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods in the same class. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring $anonfun$.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.

  • For inner class style: the closure object creation site is like new <InnerClassForTheClosure>(captured args)
  • For "indylambda" style: the closure object creation site would be compiled into an invokedynamic instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

Does this PR introduce any user-facing change?

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

How was this patch tested?

Added new unit test case to org.apache.spark.repl.SingletonReplSuite. The new test case fails without the fix in this PR, and pases with the fix.

Closes #28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok kris.mok@databricks.com
Signed-off-by: Wenchen Fan wenchen@databricks.com
(cherry picked from commit dc01b75)
Signed-off-by: Kris Mok kris.mok@databricks.com

…cala closure in ClosureCleaner

This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
- When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
- Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
- "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
  ```scala
  {
    val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
    val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
  }
  ```

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:
- old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before
- new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later

There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell:
```scala
:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect
```
In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object.

The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
```scala
scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...
```
in the Scala 2.11 / Spark 2.4.x case:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$readc310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)
```
in the Scala 2.12 / Spark master case after this PR:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iwd980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
```

For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).

For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).

The `ClosureCleaner` works like a mark-sweep algorithm on fields:
- Finding (a chain of) outer objects referenced by the starting closure;
- Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
- Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`).

So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements.

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`.

The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
- For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)`
- For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix.

Closes apache#28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dc01b75)
Signed-off-by: Kris Mok <kris.mok@databricks.com>
Copy link
Contributor Author

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some PR comments to highlight the points to review.


import scala.collection.JavaConverters._
import scala.collection.mutable.{Map, Set, Stack}
import scala.language.existentials
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kiszk note that the Spark 2.4 version of this file imports scala.language.existentials

Comment on lines +457 to +462
if (isScala2_11) {
// Keep existing behavior in Spark 2.4: assume Scala 2.11 doesn't use indylambda.
// NOTE: It's actually possible to turn on indylambda in Scala 2.11 via delambdafy:inline,
// but that's not the default and we don't expect it to be in use.
return None
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Spark 2.4-specific. It was there in the existing code in ClosureCleaner.getSerializedLambda, and I'm porting it over to the new code as well.

* enclosing "this", aka `$outer` in Scala.
*/
def isInnerClassCtorCapturingOuter(
op: Int, name: String, desc: String, callerInternalName: String): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kiszk I've addressed your comment in this Spark 2.4 backport PR: removed the unused owner parameter.

// `$outer` chain. So this is NOT controlled by the `findTransitively` flag.
logDebug(s" found inner class $ownerExternalName")
// val innerClassInfo = getOrUpdateClassInfo(owner)
val (innerClass, innerClassNode) = getOrUpdateClassInfo(owner)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kiszk I've addressed your comment in this Spark 2.4 backport PR: use destructuring pattern matching assignment here. This is possible because of the extra import at the top of this file.

@@ -395,6 +395,67 @@ class SingletonReplSuite extends SparkFunSuite {
assertDoesNotContain("Exception", output)
}

test("SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: these new tests pass in both Scala 2.11 and Scala 2.12 mode.

@rednaxelafx
Copy link
Contributor Author

Hi @dongjoon-hyun and @dbtsai : could you please help review this backport PR? I did test the ClosureCleaner-related tests in both Scala 2.11 and Scala 2.12 builds locally before I sent out the PR, but it'd be nice to get some more Scala 2.12 testing.

cc reviewers of the original PR: @retronym @viirya @maropu @srowen @kiszk @cloud-fan @HyukjinKwon

Thanks!

@SparkQA
Copy link

SparkQA commented May 19, 2020

Test build #122831 has finished for PR 28577 at commit 0b7bc9f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rednaxelafx
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 19, 2020

Test build #122837 has finished for PR 28577 at commit 0b7bc9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you, @rednaxelafx .
cc @holdenk since she is a release manager of Apache Spark 2.4.6.

@holdenk
Copy link
Contributor

holdenk commented May 19, 2020

Thank you for backporting this. I’m off today and tomorrow for health reasons, but if no one has time to review by Thursday I’ll take a look :)

@dbtsai
Copy link
Member

dbtsai commented May 19, 2020

@rednaxelafx We have internal Spark 2.4 builds supporting JDK11, JDK8, Scala 2.12, and Scala 2.11. I just cherry-picked this PR, and ran the full tests. I'll update the result here.

@dbtsai
Copy link
Member

dbtsai commented May 19, 2020

All the tests are passing in Scala 2.12 with JDK8 and JDK11 builds.

I also tested the code in the description, and it works as expected.

  • Scala 2.12 with JDK8
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5.14-apple-SNAPSHOT
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_252)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}

// Exiting paste mode, now interpreting.

defined class NotSerializableClass
ns: NotSerializableClass = NotSerializableClass@2769d577
topLevelValue: String = someValue
func: Int => scala.collection.immutable.IndexedSeq[String] = $Lambda$1751/481549862@25297d52

scala> sc.parallelize(0 to 2).map(func).collect
res0: Array[scala.collection.immutable.IndexedSeq[String]] = Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))
  • Scala 2.12 with JDK11
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5.14-jdk11-apple-SNAPSHOT
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.7)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}

// Exiting paste mode, now interpreting.

defined class NotSerializableClass
ns: NotSerializableClass = NotSerializableClass@199f2854
topLevelValue: String = someValue
func: Int => scala.collection.immutable.IndexedSeq[String] = $Lambda$1852/0x0000000800c2a040@5c9cbc69

scala> sc.parallelize(0 to 2).map(func).collect
res0: Array[scala.collection.immutable.IndexedSeq[String]] = Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))

dbtsai pushed a commit that referenced this pull request May 19, 2020
…leaner

This is a backport of #28463 from Apache Spark master/3.0 to 2.4.
Minor adaptation include:
- Retain the Spark 2.4-specific behavior of skipping the indylambda check when using Scala 2.11
- Remove unnecessary LMF restrictions in ClosureCleaner tests
- Address review comments in the original PR from kiszk

Tested with the default Scala 2.11 build, and also tested ClosureCleaner-related tests in Scala 2.12 build as well:
- repl: `SingletonReplSuite`
- core: `ClosureCleanerSuite` and `ClosureCleanerSuite2`

---

### What changes were proposed in this pull request?

This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
- When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
- Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
- "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
  ```scala
  {
    val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
    val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
  }
  ```

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:
- old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before
- new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later

### Why are the changes needed?

There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell:
```scala
:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect
```
In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object.

The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
```scala
scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...
```
in the Scala 2.11 / Spark 2.4.x case:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$readc310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)
```
in the Scala 2.12 / Spark 2.4.x case after this PR:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iwd980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
```

For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).

For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).

#### tl;dr

The `ClosureCleaner` works like a mark-sweep algorithm on fields:
- Finding (a chain of) outer objects referenced by the starting closure;
- Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
- Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.

##### Outer Objects

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`).

So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements.

##### Inner Closures

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`.

The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
- For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)`
- For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

### Does this PR introduce _any_ user-facing change?

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

### How was this patch tested?

Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix.

Closes #28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok <kris.mokdatabricks.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>
(cherry picked from commit dc01b75)
Signed-off-by: Kris Mok <kris.mokdatabricks.com>

Closes #28577 from rednaxelafx/backport-spark-31399-2.4.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
@dbtsai
Copy link
Member

dbtsai commented May 19, 2020

Merged into branch-2.4. Thank you, @rednaxelafx

@dbtsai dbtsai closed this May 19, 2020
@dongjoon-hyun
Copy link
Member

Thank you so much, @rednaxelafx and @dbtsai .

@srowen
Copy link
Member

srowen commented May 19, 2020

I was running tests on this patch in 2.11 and 2.12 and it looked good here too.

@dongjoon-hyun
Copy link
Member

Thank you, @srowen !

@kiszk
Copy link
Member

kiszk commented May 20, 2020

late LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants