Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6152] Use shaded ASM5 to support closure cleaning of Java 8 compiled classes #9512

Closed
wants to merge 8 commits into from
4 changes: 4 additions & 0 deletions core/pom.xml
Expand Up @@ -51,6 +51,10 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand Down
25 changes: 13 additions & 12 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Expand Up @@ -21,8 +21,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import scala.collection.mutable.{Map, Set}

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm5.Opcodes._

import org.apache.spark.{Logging, SparkEnv, SparkException}

Expand Down Expand Up @@ -325,19 +325,19 @@ private[spark] object ClosureCleaner extends Logging {
private[spark] class ReturnStatementInClosureException
extends SparkException("Return statements aren't allowed in Spark closures")

private class ReturnStatementFinder extends ClassVisitor(ASM4) {
private class ReturnStatementFinder extends ClassVisitor(ASM5) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
if (name.contains("apply")) {
new MethodVisitor(ASM4) {
new MethodVisitor(ASM5) {
override def visitTypeInsn(op: Int, tp: String) {
if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
throw new ReturnStatementInClosureException
}
}
}
} else {
new MethodVisitor(ASM4) {}
new MethodVisitor(ASM5) {}
}
}
}
Expand All @@ -361,7 +361,7 @@ private[util] class FieldAccessFinder(
findTransitively: Boolean,
specificMethod: Option[MethodIdentifier[_]] = None,
visitedMethods: Set[MethodIdentifier[_]] = Set.empty)
extends ClassVisitor(ASM4) {
extends ClassVisitor(ASM5) {

override def visitMethod(
access: Int,
Expand All @@ -376,7 +376,7 @@ private[util] class FieldAccessFinder(
return null
}

new MethodVisitor(ASM4) {
new MethodVisitor(ASM5) {
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
if (op == GETFIELD) {
for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
Expand All @@ -385,7 +385,8 @@ private[util] class FieldAccessFinder(
}
}

override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
override def visitMethodInsn(
op: Int, owner: String, name: String, desc: String, itf: Boolean) {
for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
// Check for calls a getter method for a variable in an interpreter wrapper object.
// This means that the corresponding field will be accessed, so we should save it.
Expand All @@ -408,7 +409,7 @@ private[util] class FieldAccessFinder(
}
}

private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM5) {
var myName: String = null

// TODO: Recursively find inner closures that we indirectly reference, e.g.
Expand All @@ -423,9 +424,9 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM

override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
new MethodVisitor(ASM5) {
override def visitMethodInsn(
op: Int, owner: String, name: String, desc: String, itf: Boolean) {
val argTypes = Type.getArgumentTypes(desc)
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
Expand Down
4 changes: 4 additions & 0 deletions docs/building-spark.md
Expand Up @@ -190,6 +190,10 @@ Running only Java 8 tests and nothing else.

mvn install -DskipTests -Pjava8-tests

or

sbt -Pjava8-tests java8-tests/test

Java 8 tests are run when `-Pjava8-tests` profile is enabled, they will run in spite of `-DskipTests`.
For these tests to run your system must have a JDK 8 installation.
If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
Expand Down
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Test cases where JDK8-compiled Scala user code is used with Spark.
*/
class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
test("basic RDD closure test (SPARK-6152)") {
sc.parallelize(1 to 1000).map(x => x * x).count()
}
}
4 changes: 4 additions & 0 deletions graphx/pom.xml
Expand Up @@ -47,6 +47,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Expand Up @@ -22,11 +22,10 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.HashSet
import scala.language.existentials

import org.apache.spark.util.Utils

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor}
import org.apache.xbean.asm5.Opcodes._

import org.apache.spark.util.Utils

/**
* Includes an utility function to test whether a function accesses a specific attribute
Expand Down Expand Up @@ -107,18 +106,19 @@ private[graphx] object BytecodeUtils {
* MethodInvocationFinder("spark/graph/Foo", "test")
* its methodsInvoked variable will contain the set of methods invoked directly by
* Foo.test(). Interface invocations are not returned as part of the result set because we cannot
* determine the actual metod invoked by inspecting the bytecode.
* determine the actual method invoked by inspecting the bytecode.
*/
private class MethodInvocationFinder(className: String, methodName: String)
extends ClassVisitor(ASM4) {
extends ClassVisitor(ASM5) {

val methodsInvoked = new HashSet[(Class[_], String)]

override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
if (name == methodName) {
new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
new MethodVisitor(ASM5) {
override def visitMethodInsn(
op: Int, owner: String, name: String, desc: String, itf: Boolean) {
if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) {
if (!skipClass(owner)) {
methodsInvoked.add((Utils.classForName(owner.replace("/", ".")), name))
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Expand Up @@ -393,6 +393,14 @@
</exclusion>
</exclusions>
</dependency>
<!-- This artifact is a shaded version of ASM 5.0.4. The POM that was used to produce this
is at https://github.com/apache/geronimo-xbean/tree/xbean-4.4/xbean-asm5-shaded
For context on why we shade ASM, see SPARK-782 and SPARK-6152. -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
<version>4.4</version>
</dependency>

<!-- Shaded deps marked as provided. These are promoted to compile scope
in the modules where we want the shaded classes to appear in the
Expand Down
26 changes: 24 additions & 2 deletions project/SparkBuild.scala
Expand Up @@ -57,6 +57,9 @@ object BuildCommons {
val sparkHome = buildLocation

val testTempDir = s"$sparkHome/target/tmp"

val javacJVMVersion = settingKey[String]("source and target JVM version for javac")
val scalacJVMVersion = settingKey[String]("source and target JVM version for scalac")
}

object SparkBuild extends PomBuild {
Expand Down Expand Up @@ -154,9 +157,17 @@ object SparkBuild extends PomBuild {
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
},

javacOptions in Compile ++= Seq("-encoding", "UTF-8"),
javacJVMVersion := "1.7",
scalacJVMVersion := "1.7",

javacOptions in Compile ++= Seq(
"-encoding", "UTF-8",
"-source", javacJVMVersion.value,
"-target", javacJVMVersion.value
),

scalacOptions in Compile ++= Seq(
s"-target:jvm-${scalacJVMVersion.value}",
"-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath // Required for relative source links in scaladoc
),

Expand Down Expand Up @@ -241,8 +252,9 @@ object SparkBuild extends PomBuild {

enable(Flume.settings)(streamingFlumeSink)

enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
enable(Java8TestSettings.settings)(java8Tests)

enable(DockerIntegrationTests.settings)(dockerIntegrationTests)

/**
* Adds the ability to run the spark shell directly from SBT without building an assembly
Expand Down Expand Up @@ -591,6 +603,16 @@ object Unidoc {
)
}

object Java8TestSettings {
import BuildCommons._

lazy val settings = Seq(
javacJVMVersion := "1.8",
// Targeting Java 8 bytecode is only supported in Scala 2.11.4 and higher:
scalacJVMVersion := (if (System.getProperty("scala-2.11") == "true") "1.8" else "1.7")
)
}

object TestSettings {
import BuildCommons._

Expand Down
4 changes: 4 additions & 0 deletions repl/pom.xml
Expand Up @@ -95,6 +95,10 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
</dependency>

<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
<dependency>
Expand Down
Expand Up @@ -23,15 +23,14 @@ import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.xbean.asm5._
import org.apache.xbean.asm5.Opcodes._

import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.ParentClassLoader

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._

/**
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used.
Expand Down Expand Up @@ -192,7 +191,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}

class ConstructorCleaner(className: String, cv: ClassVisitor)
extends ClassVisitor(ASM4, cv) {
extends ClassVisitor(ASM5, cv) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
val mv = cv.visitMethod(access, name, desc, sig, exceptions)
Expand All @@ -202,7 +201,7 @@ extends ClassVisitor(ASM4, cv) {
// field in the class to point to it, but do nothing otherwise.
mv.visitCode()
mv.visitVarInsn(ALOAD, 0) // load this
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V", false)
mv.visitVarInsn(ALOAD, 0) // load this
// val classType = className.replace('.', '/')
// mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Expand Up @@ -110,6 +110,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Expand Up @@ -21,8 +21,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import scala.collection.mutable

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
import org.apache.xbean.asm5._
import org.apache.xbean.asm5.Opcodes._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
Expand Down Expand Up @@ -486,7 +486,7 @@ private class BoxingFinder(
method: MethodIdentifier[_] = null,
val boxingInvokes: mutable.Set[String] = mutable.Set.empty,
visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty)
extends ClassVisitor(ASM4) {
extends ClassVisitor(ASM5) {

private val primitiveBoxingClassName =
Set("java/lang/Long",
Expand All @@ -503,11 +503,12 @@ private class BoxingFinder(
MethodVisitor = {
if (method != null && (method.name != name || method.desc != desc)) {
// If method is specified, skip other methods.
return new MethodVisitor(ASM4) {}
return new MethodVisitor(ASM5) {}
}

new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
new MethodVisitor(ASM5) {
override def visitMethodInsn(
op: Int, owner: String, name: String, desc: String, itf: Boolean) {
if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") {
if (primitiveBoxingClassName.contains(owner)) {
// Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l)
Expand Down Expand Up @@ -542,15 +543,7 @@ private object BoxingFinder {
// Copy data over, before delegating to ClassReader -
// else we can run out of open file handles.
Utils.copyStream(resourceStream, baos, true)
// ASM4 doesn't support Java 8 classes, which requires ASM5.
// So if the class is ASM5 (E.g., java.lang.Long when using JDK8 runtime to run these codes),
// then ClassReader will throw IllegalArgumentException,
// However, since this is only for testing, it's safe to skip these classes.
try {
Some(new ClassReader(new ByteArrayInputStream(baos.toByteArray)))
} catch {
case _: IllegalArgumentException => None
}
Some(new ClassReader(new ByteArrayInputStream(baos.toByteArray)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that this no longer needs to return an Option; will update now to keep the code clean.

}

}