From b27a9feafbbc438bec4f64d1e7f576b2d06e2041 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 6 Nov 2017 12:52:52 +0000 Subject: [PATCH 1/2] Preliminary changes to get ClosureCleaner to work with Scala 2.12. Makes many usages just work, but not all. --- .../apache/spark/util/ClosureCleaner.scala | 28 +++++++++++-------- .../spark/util/ClosureCleanerSuite2.scala | 10 +++++-- .../spark/graphx/lib/ShortestPaths.scala | 2 +- .../streaming/BasicOperationsSuite.scala | 4 +-- 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index dfece5dd0670b..40616421b5bca 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -38,12 +38,13 @@ private[spark] object ClosureCleaner extends Logging { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" val resourceStream = cls.getResourceAsStream(className) - // todo: Fixme - continuing with earlier behavior ... - if (resourceStream == null) return new ClassReader(resourceStream) - - val baos = new ByteArrayOutputStream(128) - Utils.copyStream(resourceStream, baos, true) - new ClassReader(new ByteArrayInputStream(baos.toByteArray)) + if (resourceStream == null) { + null + } else { + val baos = new ByteArrayOutputStream(128) + Utils.copyStream(resourceStream, baos, true) + new ClassReader(new ByteArrayInputStream(baos.toByteArray)) + } } // Check whether a class represents a Scala closure @@ -81,11 +82,13 @@ private[spark] object ClosureCleaner extends Logging { val stack = Stack[Class[_]](obj.getClass) while (!stack.isEmpty) { val cr = getClassReader(stack.pop()) - val set = Set.empty[Class[_]] - cr.accept(new InnerClosureFinder(set), 0) - for (cls <- set -- seen) { - seen += cls - stack.push(cls) + if (cr != null) { + val set = Set.empty[Class[_]] + cr.accept(new InnerClosureFinder(set), 0) + for (cls <- set -- seen) { + seen += cls + stack.push(cls) + } } } (seen - obj.getClass).toList @@ -366,7 +369,8 @@ private[spark] class ReturnStatementInClosureException private class ReturnStatementFinder extends ClassVisitor(ASM5) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - if (name.contains("apply")) { + // $anonfun$ covers Java 8 lambdas + if (name.contains("apply") || name.contains("$anonfun$")) { new MethodVisitor(ASM5) { override def visitTypeInsn(op: Int, tp: String) { if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 934385fbcad1b..278fada83d78c 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -117,9 +117,13 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri findTransitively: Boolean): Map[Class[_], Set[String]] = { val fields = new mutable.HashMap[Class[_], mutable.Set[String]] outerClasses.foreach { c => fields(c) = new mutable.HashSet[String] } - ClosureCleaner.getClassReader(closure.getClass) - .accept(new FieldAccessFinder(fields, findTransitively), 0) - fields.mapValues(_.toSet).toMap + val cr = ClosureCleaner.getClassReader(closure.getClass) + if (cr == null) { + Map.empty + } else { + cr.accept(new FieldAccessFinder(fields, findTransitively), 0) + fields.mapValues(_.toSet).toMap + } } // Accessors for private methods diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index 4cac633aed008..aff0b932e9429 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -25,7 +25,7 @@ import org.apache.spark.graphx._ * Computes shortest paths to the given set of landmark vertices, returning a graph where each * vertex attribute is a map containing the shortest-path distance to each reachable landmark. */ -object ShortestPaths { +object ShortestPaths extends Serializable { /** Stores a map from the vertex id of a landmark to the distance to that landmark. */ type SPMap = Map[VertexId, Int] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 6f62c7a88dc3c..35893f6870c80 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -596,8 +596,6 @@ class BasicOperationsSuite extends TestSuiteBase { ) val updateStateOperation = (s: DStream[String]) => { - class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable - // updateFunc clears a state when a StateObject is seen without new values twice in a row val updateFunc = (values: Seq[Int], state: Option[StateObject]) => { val stateObj = state.getOrElse(new StateObject) @@ -817,3 +815,5 @@ class BasicOperationsSuite extends TestSuiteBase { } } } + +class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable \ No newline at end of file From bf393ae2904881004d621f3729064a2739ccb039 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 6 Nov 2017 15:40:41 +0000 Subject: [PATCH 2/2] Add missing newline --- .../scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 35893f6870c80..0a764f61c0cd9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -816,4 +816,4 @@ class BasicOperationsSuite extends TestSuiteBase { } } -class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable \ No newline at end of file +class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable