Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31399][CORE][test-hadoop3.2][test-java11] Support indylambda Scala closure in ClosureCleaner #28463

Closed

Conversation

rednaxelafx
Copy link
Contributor

@rednaxelafx rednaxelafx commented May 6, 2020

What changes were proposed in this pull request?

This PR proposes to enhance Spark's ClosureCleaner to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:

  • When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:

  • Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
  • "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
    {
      val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
      val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
    }

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:

  • old, inner class style Scala closures, aka delambdafy:inline: default in Scala 2.11 and before
  • new, "indylambda" style Scala closures, aka delambdafy:method: default in Scala 2.12 and later

Why are the changes needed?

There had been previous effortsto extend Spark's ClosureCleaner to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for SPARK-14540.

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing this -- a REPL line object. e.g. in a Spark Shell:

:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect

In this example, func refers to a Scala closure that captures the enclosing this because it needs to access topLevelValue, which is in turn implemented as a field on the enclosing REPL line object.

The existing ClosureCleaner in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.

scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1@615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...

in the Scala 2.11 / Spark 2.4.x case:

Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1@615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw@64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw@66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read@c310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw@79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw@636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)

in the Scala 2.12 / Spark master case after this PR:

Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1@6f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw@2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw@152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read@7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw@d980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw@557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920@569b57c4)

For more background of the new and old ways Scala lowers closures to Java bytecode, please see A note on how NSC (New Scala Compiler) lowers lambdas.

For more background on how Spark's ClosureCleaner works and what's needed to make it support "indylambda" Scala closures, please refer to A Note on Apache Spark's ClosureCleaner.

tl;dr

The ClosureCleaner works like a mark-sweep algorithm on fields:

  • Finding (a chain of) outer objects referenced by the starting closure;
  • Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
  • Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.
Outer Objects

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked $outer chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an $outer chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be arg$1 (instead of $outer).

So what's missing in the ClosureCleaner for the "indylambda" support is find and potentially clone+clean the captured enclosing this REPL line object. That's what this PR implements.

Inner Closures

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring $anonfun$.

The new, "indylambda" style Scala closures are compiled into static methods in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods in the same class. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring $anonfun$.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.

  • For inner class style: the closure object creation site is like new <InnerClassForTheClosure>(captured args)
  • For "indylambda" style: the closure object creation site would be compiled into an invokedynamic instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

Does this PR introduce any user-facing change?

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

How was this patch tested?

Added new unit test case to org.apache.spark.repl.SingletonReplSuite. The new test case fails without the fix in this PR, and pases with the fix.

@rednaxelafx
Copy link
Contributor Author

Working on a new commit that'll add some new test cases for Scala closures declared in Scala REPLs.

In the meantime, any comments on the fix itself is very appreciated!

}

def inspect(closure: AnyRef): SerializedLambda = {
val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this create a copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a copy, but a "serialization proxy" object that holds on to the symbolic information of the original closure object.

The original closure object contains too much runtime-specific information (e.g. class loader info, reference to runtime-generated lambda class) that's not suitable for serialization. So the JDK uses a "serialization proxy" that only holds on to the symbolic info for serialization purposes.

Spark's ClosureCleaner abuses this serialization proxy for introspection into what's inside the black box.


val visited = Set[MethodIdentifier[_]](implMethodId)
val stack = Stack[MethodIdentifier[_]](implMethodId)
while (!stack.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks promising. I remember that the original work to get this working on 2.12 just kind of punted on indylambdas as it mostly Just Worked, but that's an important case that hasn't worked.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122360 has finished for PR 28463 at commit 722fed1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122359 has finished for PR 28463 at commit 0899401.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logDebug(s\" + cloning instance of REPL class $capturingClassName\")
  • logTrace(s\" found intra class call to $owner.$name$desc\")

@rednaxelafx rednaxelafx changed the title [WIP][SPARK-31399] Support indylambda Scala closure in ClosureCleaner [SPARK-31399] Support indylambda Scala closure in ClosureCleaner May 6, 2020
@rednaxelafx rednaxelafx changed the title [SPARK-31399] Support indylambda Scala closure in ClosureCleaner [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner May 6, 2020
@rednaxelafx
Copy link
Contributor Author

I've added a test case to SingletonReplSuite to demonstrate this PR's fix. It fails when I disable the fix part of this PR, and passes when I re-enable the fix part.

*
* @param maybeClosure the closure to check.
*/
def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from ClosureCleaner.getSerializedLambda with some enhancements.

// && implements a scala.runtime.java8 functional interface
}

def inspect(closure: AnyRef): SerializedLambda = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from ClosureCleaner.inspect, verbatim

@gatorsmile
Copy link
Member

cc @zsxwing

@rednaxelafx
Copy link
Contributor Author

cc @retronym and @skonto for Scala compiler expertise input.
Also cc @JoshRosen

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122364 has finished for PR 28463 at commit 3792a6d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented May 6, 2020

One question about test strategy. Is it better to test old style lambda (i.e. w/o indy) against the new closure cleaner, too? In other words, does getSerializationProxy() work correctly?

@rednaxelafx
Copy link
Contributor Author

Thank you for the comment, @kiszk san!

One question about test strategy. Is it better to test old style lambda (i.e. w/o indy) against the new closure cleaner, too? In other words, does getSerializationProxy() work correctly?

The old/new style closure checks (isClosure vs IndylambdaScalaClosures.getSerializationProxy()) are always in use. The latter is really a slightly enhanced version of ClosureCleaner.getSerializedLambda that already exists in Spark 2.4 and has worked on Scala 2.11 closures. So I have confidence that it'll work fine.

That said, it's a good suggestion to have some tests to explicitly the old style lambdas. It's possible to use them in Scala 2.12 by specifying -Ydelambdafy:inline to the Scala compiler (or the Scala REPL).

It could be a bit of hassle to setup a test suite with a exotic REPL option. At least that can't go to the SingletonReplSuite. Maybe tweaking ReplSuite a bit can make it happen.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122373 has finished for PR 28463 at commit 8959f7e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • |class NotSerializableClass(val x: Int)

Comment on lines +370 to +372
// should be something cleanable, i.e. a Scala REPL line object
val needsCleaning = isClosureDeclaredInScalaRepl &&
outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == capturingClassName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isClosureDeclaredInScalaRepl && ... means only Scala REPL line object needs cleaning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means only Scala closures are supported. Spark's ClosureCleaner was never capable of cleaning non-Scala closures, e.g. Java 8's lambdas, because the specifics of how captures work are different, among various reasons. My new code just makes it explicit in the naming to make sure there's no confusion that this is only geared towards Scala.

if (op == GETFIELD || op == PUTFIELD) {
val ownerExternalName = owner.replace('/', '.')
for (cl <- accessedFields.keys if cl.getName == ownerExternalName) {
logTrace(s" found field access $name on $owner")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will logging ownerExternalName be more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! Thanks, I'll address this in an update.

val implMethodId = MethodIdentifier(
implClass, lambdaProxy.getImplMethodName, lambdaProxy.getImplMethodSignature)

val visited = Set[MethodIdentifier[_]](implMethodId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visited is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh shoot, I forgot to add the uses..... thanks for catching this!
It should be added right after the val currentId = stack.pop line. The initialization line should probably be empty instead of pre-populating with the implMethodId, for consistency.

if (!maybeClosureClass.isSynthetic) return None

val implementedInterfaces = ClassUtils.getAllInterfaces(maybeClosureClass).asScala
val isClosureCandidate = implementedInterfaces.exists(_.getName == "scala.Serializable") &&
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This detail has changed in Scala 2.13, in which scala.Serializable has become a type alias for java.io.Serializable. You could safely add that check in here already to future proof things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much! This is very useful information. Does that mean I should check for maybeClosure.isInstanceOf[Serializable] here instead of bothering with explicitly checking the name scala.Serializable? Assuming we're running on a JVM instead of running in a no-Java environment, this instanceof check should always be true for Scala closures, right?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that should work fine.

Scala lambdas can also implement other functional interfaces (such as java.util.function.Supplier) when used in a context that expects one. I guess that's out of scope for ClosureCleaner, though.

@retronym
Copy link

retronym commented May 7, 2020

Brainstorming a little. An alternative design of ClosureCleaner would be to customise serialization of REPL line wrapper objects so they replaced non-serializable values with null or some other sentinel value. If the deserialized code attempted to use one of the absent values, a runtime error could be issued. Think of it as the "YOLO serializer."

Pros:

  • Simplicity of implementation (no static analysis needed)
  • Will handle all the corner cases

Cons:

  • Error comes lazily during user code execution
  • May send more data

Okay, just wanted to put that out there.

In case you're not aware, there is a handy secret handshake in the REPL to show the wrappers. Add // show at the end of the line for:

$ scala-ref 2.12.x -Yrepl-class-based -Yrepl-use-magic-imports
scala> class NotSerializableClass(val x: Int); val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure =(j: Int) => { (1 to j).flatMap { x =>  (1 to x).map { y => y + topLevelValue } } } // show
sealed class $read extends _root_.java.io.Serializable {
  def <init>() = {
    super.<init>;
    ()
  };
  sealed class $iw extends _root_.java.io.Serializable {
    def <init>() = {
      super.<init>;
      ()
    };
    class NotSerializableClass extends scala.AnyRef {
      <paramaccessor> val x: Int = _;
      def <init>(x: Int) = {
        super.<init>;
        ()
      }
    };
    val ns = new NotSerializableClass.<init>(42);
    val topLevelValue = "someValue";
    val closure = ((j: Int) => 1.to(j).flatMap(((x) => 1.to(x).map(((y) => y + topLevelValue)))))
  };
  val $iw = new $iw.<init>
}
object $read extends scala.AnyRef {
  def <init>() = {
    super.<init>;
    ()
  };
  val INSTANCE = new $read.<init>
}
defined class NotSerializableClass
ns: NotSerializableClass = NotSerializableClass@476fde05
topLevelValue: String = someValue
closure: Int => scala.collection.immutable.IndexedSeq[String] = $Lambda$1151/1215025252@5111de7c

scala> import java.io._; def serialize(obj: AnyRef): Array[Byte] = { val buffer = new ByteArrayOutputStream; val out = new ObjectOutputStream(buffer); out.writeObject(obj); buffer.toByteArray }
import java.io._
serialize: (obj: AnyRef)Array[Byte]

scala> serialize(closure) // show
sealed class $read extends _root_.java.io.Serializable {
  def <init>() = {
    super.<init>;
    ()
  };
  val $line3$read: $line3.$read.INSTANCE.type = $line3.$read.INSTANCE;
  import $line3$read.$iw.closure;
  import _root_.scala.tools.nsc.interpreter.$u007B$u007B;
  import java.io._;
  import _root_.scala.tools.nsc.interpreter.$u007B$u007B;
  val $line4$read: $line4.$read.INSTANCE.type = $line4.$read.INSTANCE;
  import $line4$read.$iw.serialize;
  sealed class $iw extends _root_.java.io.Serializable {
    def <init>() = {
      super.<init>;
      ()
    };
    val res0 = serialize(closure)
  };
  val $iw = new $iw.<init>
}
object $read extends scala.AnyRef {
  def <init>() = {
    super.<init>;
    ()
  };
  val INSTANCE = new $read.<init>
}
java.io.NotSerializableException: NotSerializableClass
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

We already have Spark-contributed code that inserts the temporary vals like val $line3$read to ensure we serialize the full object graph.

We also have a have an extra post-REPL compiler phases that does some a countervailing cleanup to remove these temp vals when the import that gave rise to them doesn't actually contribute terms to the code. (A proper fix for SPARK-5150 )

I'm wondering if the compiler also ought to bend the rules a little bit and force capture-by-copy of topLevelValue. In general, we can't do that in because it would would change evaluation order.

class C {
  val closure = () => topLevelValue // must capture `C`, not the `String` to avoid seeing the `null`.
  val topLevelValue = "" 
}

Capturing early would leak the uninitialized field, but the expression ought not be valid the REPL because it has a forward reference. The REPL doesn't emit the "invalid forward reference" error for scala> () => foo; val foo = "", but we could make it do so.

So with a small change to the compiler we could make the new test pass without changes to ClosureCleaner. Is this worth it? That would depend on seeing a broader range of test cases, and seeing if they would also be fixed with a change to capture.

/**
* Scans an indylambda Scala closure, along with its lexically nested closures, and populate
* the accessed fields info on which fields on the outer object are accessed.
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an code snippet here together then the resulting logTrace output would be really useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good suggestion. I was intending to write more explanation in the comments but haven't decided on how.

// so we check first
// non LMF-closures should be less frequent from now on
val lambdaFunc = getSerializedLambda(func)
val maybeIndylambdaProxy = IndylambdaScalaClosures.getSerializationProxy(func)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @rednaxelafx . Do we need to change this PR for Scala 2.11 when we backport into branch-2.4?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I found your previous comment (#28463 (comment)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the new IndylambdaScalaClosures.getSerializationProxy is pretty much the same as the old getSerializedLambda, just with a few more checks.
It's supposed to return None for Scala 2.11 closures, and Some(...) for Scala 2.12+ closures, no changes.

@retronym
Copy link

retronym commented May 7, 2020

Maybe tweaking ReplSuite a bit can make it happen.

Alternatively, you can dynamically change (most) compiler settings in the REPL.

scala> :settings -Ydelambdafy:inline

scala> class CInline { def test = () => 42 }
class CInline

scala> :javap -c CInline#test
  public scala.Function0<java.lang.Object> test();
    Code:
       0: new           #13                 // class $line6/$read$$iw$CInline$$anonfun$test$1
       3: dup
       4: aload_0
       5: invokespecial #22                 // Method $line6/$read$$iw$CInline$$anonfun$test$1."<init>":(L$line6/$read$$iw$CInline;)V

@SparkQA
Copy link

SparkQA commented May 7, 2020

Test build #122399 has finished for PR 28463 at commit baeba8b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • |class NotSerializableClass(val x: Int)
  • | class InnerFoo

@maropu
Copy link
Member

maropu commented May 16, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 16, 2020

Test build #122726 has finished for PR 28463 at commit 978e60e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

dongjoon-hyun pushed a commit that referenced this pull request May 16, 2020
### What changes were proposed in this pull request?

It's quite annoying to be blocked by flaky tests in several PRs. This PR disables them. The tests come from 3 PRs I'm recently watching:
#28526
#28463
#28517

### Why are the changes needed?

To make PR builder more stable

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #28547 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request May 16, 2020
### What changes were proposed in this pull request?

It's quite annoying to be blocked by flaky tests in several PRs. This PR disables them. The tests come from 3 PRs I'm recently watching:
#28526
#28463
#28517

### Why are the changes needed?

To make PR builder more stable

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #28547 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 2012d58)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@SparkQA
Copy link

SparkQA commented May 17, 2020

Test build #5045 has finished for PR 28463 at commit 978e60e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@SparkQA
Copy link

SparkQA commented May 17, 2020

Test build #5042 has finished for PR 28463 at commit 978e60e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@SparkQA
Copy link

SparkQA commented May 17, 2020

Test build #5043 has finished for PR 28463 at commit 978e60e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@SparkQA
Copy link

SparkQA commented May 17, 2020

Test build #5044 has finished for PR 28463 at commit 978e60e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@viirya
Copy link
Member

viirya commented May 17, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 17, 2020

Test build #122754 has finished for PR 28463 at commit 978e60e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

// This this the InnerClassFinder equivalent for inner classes, which still use the
// `$outer` chain. So this is NOT controlled by the `findTransitively` flag.
logDebug(s" found inner class $ownerExternalName")
val innerClassInfo = getOrUpdateClassInfo(owner)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: How about val (innerClass, innerClassNode) = ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. Yes this was intentional. Using the destructuring pattern match syntax somehow triggers a problem in Scala compiler's type inferencer, that requires importing scala.lang.existential to resolve. I'd rather write the code in a slightly more tedious fashion than importing that...

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2020

Test build #122768 has finished for PR 28463 at commit 978e60e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@maropu
Copy link
Member

maropu commented May 18, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2020

Test build #122778 has finished for PR 28463 at commit 978e60e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@HyukjinKwon
Copy link
Member

retest this please

2 similar comments
@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2020

Test build #122780 has finished for PR 28463 at commit 978e60e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@SparkQA
Copy link

SparkQA commented May 18, 2020

Test build #122781 has finished for PR 28463 at commit 978e60e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • // starting closure (in class T)
  • // we need to track calls from \"inner closure\" to outer classes relative to it (class T, A, B)
  • logDebug(s\" found inner class $ownerExternalName\")

@cloud-fan
Copy link
Contributor

We don't have many critical changes after the last success build: #28463 (comment)

The failed flaky tests are unrelated to this PR, and we need to unblock 3.0 ASAP. I'm merging it first, will monitor the jenkins builds later. Thanks!

@cloud-fan cloud-fan closed this in dc01b75 May 18, 2020
cloud-fan pushed a commit that referenced this pull request May 18, 2020
…cala closure in ClosureCleaner

### What changes were proposed in this pull request?

This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
- When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
- Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
- "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
  ```scala
  {
    val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
    val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
  }
  ```

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:
- old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before
- new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later

### Why are the changes needed?

There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell:
```scala
:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect
```
In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object.

The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
```scala
scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...
```
in the Scala 2.11 / Spark 2.4.x case:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$readc310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)
```
in the Scala 2.12 / Spark master case after this PR:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iwd980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
```

For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).

For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).

#### tl;dr

The `ClosureCleaner` works like a mark-sweep algorithm on fields:
- Finding (a chain of) outer objects referenced by the starting closure;
- Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
- Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.

##### Outer Objects

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`).

So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements.

##### Inner Closures

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`.

The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
- For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)`
- For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

### Does this PR introduce _any_ user-facing change?

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

### How was this patch tested?

Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix.

Closes #28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dc01b75)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/122781/
Test FAILed.

@HyukjinKwon
Copy link
Member

LocalityPlacementStrategySuite was failed again. Potentially related. I am going to merge #28566 together.

rednaxelafx added a commit to rednaxelafx/apache-spark that referenced this pull request May 19, 2020
…cala closure in ClosureCleaner

This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
- When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
- Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
- "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
  ```scala
  {
    val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
    val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
  }
  ```

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:
- old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before
- new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later

There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell:
```scala
:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect
```
In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object.

The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
```scala
scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...
```
in the Scala 2.11 / Spark 2.4.x case:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$readc310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)
```
in the Scala 2.12 / Spark master case after this PR:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iwd980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
```

For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).

For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).

The `ClosureCleaner` works like a mark-sweep algorithm on fields:
- Finding (a chain of) outer objects referenced by the starting closure;
- Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
- Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`).

So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements.

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`.

The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
- For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)`
- For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix.

Closes apache#28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dc01b75)
Signed-off-by: Kris Mok <kris.mok@databricks.com>
dbtsai pushed a commit that referenced this pull request May 19, 2020
…leaner

This is a backport of #28463 from Apache Spark master/3.0 to 2.4.
Minor adaptation include:
- Retain the Spark 2.4-specific behavior of skipping the indylambda check when using Scala 2.11
- Remove unnecessary LMF restrictions in ClosureCleaner tests
- Address review comments in the original PR from kiszk

Tested with the default Scala 2.11 build, and also tested ClosureCleaner-related tests in Scala 2.12 build as well:
- repl: `SingletonReplSuite`
- core: `ClosureCleanerSuite` and `ClosureCleanerSuite2`

---

### What changes were proposed in this pull request?

This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.

Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
- When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned)

This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
- Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below.
- "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g.
  ```scala
  {
    val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, references `fieldA` on `this`.
    val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  // captures `this` and `siblingClosure`, references `fieldB` on `this`.
  }
  ```

The changes are intended to be minimal, with further code cleanups planned in separate PRs.

Jargons:
- old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before
- new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later

### Why are the changes needed?

There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).

But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell:
```scala
:pa
class NotSerializableClass(val x: Int)
val ns = new NotSerializableClass(42)
val topLevelValue = "someValue"
val func = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue }
  }
}
<Ctrl+D>
sc.parallelize(0 to 2).map(func).collect
```
In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object.

The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.

Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
```scala
scala> :pa
// Entering paste mode (ctrl-D to finish)

class NotSerializableClass1(val x: Int)
case class Foo(id: String)
val ns = new NotSerializableClass1(42)
val topLevelValue = "someValue"

// Exiting paste mode, now interpreting.

defined class NotSerializableClass1
defined class Foo
ns: NotSerializableClass1 = NotSerializableClass1615b1baf
topLevelValue: String = someValue

scala> :pa
// Entering paste mode (ctrl-D to finish)

val closure2 = (j: Int) => {
  (1 to j).flatMap { x =>
    (1 to x).map { y => y + topLevelValue } // 2 levels
  }
}

// Exiting paste mode, now interpreting.

closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>

scala> sc.parallelize(0 to 2).map(closure2).collect
org.apache.spark.SparkException: Task not serializable
...
```
in the Scala 2.11 / Spark 2.4.x case:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw64df3f4b)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw66e6e5e9)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$readc310aa3)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iw79224636)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw636d4cdc)
	- field (class: $anonfun$1, name: $outer, type: class $iw)
	- object (class $anonfun$1, <function1>)
```
in the Scala 2.12 / Spark 2.4.x case after this PR:
```
Caused by: java.io.NotSerializableException: NotSerializableClass1
Serialization stack:
	- object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a)
	- field (class: $iw, name: ns, type: class NotSerializableClass1)
	- object (class $iw, $iw2945a3c1)
	- field (class: $iw, name: $iw, type: class $iw)
	- object (class $iw, $iw152705d0)
	- field (class: $line14.$read, name: $iw, type: class $iw)
	- object (class $line14.$read, $line14.$read7cf311eb)
	- field (class: $iw, name: $line14$read, type: class $line14.$read)
	- object (class $iw, $iwd980dac)
	- field (class: $iw, name: $outer, type: class $iw)
	- object (class $iw, $iw557d9532)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
```

For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).

For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).

#### tl;dr

The `ClosureCleaner` works like a mark-sweep algorithm on fields:
- Finding (a chain of) outer objects referenced by the starting closure;
- Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed;
- Cloning the outer objects, nulling out fields that are not accessed by any closure of concern.

##### Outer Objects

For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain.

For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`).

So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements.

##### Inner Closures

The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`.

The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`.

Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
- For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)`
- For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.

### Does this PR introduce _any_ user-facing change?

Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.

### How was this patch tested?

Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix.

Closes #28463 from rednaxelafx/closure-cleaner-indylambda.

Authored-by: Kris Mok <kris.mokdatabricks.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>
(cherry picked from commit dc01b75)
Signed-off-by: Kris Mok <kris.mokdatabricks.com>

Closes #28577 from rednaxelafx/backport-spark-31399-2.4.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet