diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index 54835ae0cf6..7c7dad90f98 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -370,34 +370,6 @@ public void run() { interpretThread.join(); } - @Test - public void testDependencies() throws IOException, InterpreterException { - Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); - properties.setProperty("zeppelin.spark.maxResult", "100"); - properties.setProperty("zeppelin.spark.useNew", "true"); - // disable color output for easy testing - properties.setProperty("zeppelin.spark.scala.color", "false"); - - // download spark-avro jar - URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar"); - ReadableByteChannel rbc = Channels.newChannel(website.openStream()); - File avroJarFile = new File("spark-avro_2.11-3.2.0.jar"); - FileOutputStream fos = new FileOutputStream(avroJarFile); - fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); - - properties.setProperty("spark.jars", avroJarFile.getAbsolutePath()); - - interpreter = new SparkInterpreter(properties); - assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter); - interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); - interpreter.open(); - - InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed. @Ignore public void testDepInterpreter() throws InterpreterException { diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 183dee63965..cd241a8aec5 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -19,6 +19,8 @@ package org.apache.zeppelin.spark import java.io.File +import java.net.URLClassLoader +import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.SQLContext @@ -35,6 +37,7 @@ import scala.util.control.NonFatal /** * Base class for different scala versions of SparkInterpreter. It should be * binary compatible between multiple scala versions. + * * @param conf * @param depFiles */ @@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, def interpret(code: String, context: InterpreterContext): InterpreterResult = { val originalOut = System.out + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { Console.withOut(interpreterOutput) { System.setOut(Console.out) @@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf) if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive" - || conf.get("spark.useHiveContext", "false").toLowerCase == "true") { + || conf.get("spark.useHiveContext", "false").toLowerCase == "true") { val hiveSiteExisted: Boolean = Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null val hiveClassesPresent = @@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } protected def getUserJars(): Seq[String] = { - val sparkJars = conf.getOption("spark.jars").map(_.split(",")) - .map(_.filter(_.nonEmpty)).toSeq.flatten - val depJars = depFiles.asScala.filter(_.endsWith(".jar")) - // add zeppelin spark interpreter jar - val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation - // zeppelinInterpreterJarURL might be a folder when under unit testing - val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) { - sparkJars ++ depJars - } else { - sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile) + var classLoader = Thread.currentThread().getContextClassLoader + var extraJars = Seq.empty[String] + while (classLoader != null) { + if (classLoader.getClass.getCanonicalName == + "org.apache.spark.util.MutableURLClassLoader") { + extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() + // Check if the file exists. + .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } + // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. + .filterNot { + u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") + } + .map(url => url.toString).toSeq + classLoader = null + } else { + classLoader = classLoader.getParent + } } - conf.set("spark.jars", result.mkString(",")) - LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars")) - result + LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) + extraJars } protected def getUserFiles(): Seq[String] = { diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 09c9710dd8c..6140925ef8c 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -95,6 +95,18 @@ + + org.apache.maven + maven-model + 3.0.3 + + + org.codehaus.plexus + plexus-utils + + + + junit diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java new file mode 100644 index 00000000000..1df4618cd7d --- /dev/null +++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.integration; + +public class DummyClass { +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 03a482d790d..94a6a408d99 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.maven.model.Model; +import org.apache.maven.model.io.xpp3.MavenXpp3Reader; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -29,12 +31,15 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.codehaus.plexus.util.xml.pull.XmlPullParserException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; @@ -80,7 +85,14 @@ public static void tearDown() throws IOException { } } - private void testInterpreterBasics() throws IOException, InterpreterException { + private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException { + // add jars & packages for testing + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0"); + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read(new FileReader("pom.xml")); + sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath()); + // test SparkInterpreter Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark", "test"); @@ -93,6 +105,11 @@ private void testInterpreterBasics() throws IOException, InterpreterException { assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertTrue(interpreterResult.message().get(0).getData().contains("45")); + // test jars & packages can be loaded correctly + interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.integration.DummyClass\n" + + "import com.maxmind.geoip2._", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + // test PySparkInterpreter Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark", "test"); interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context); @@ -123,7 +140,7 @@ private void testInterpreterBasics() throws IOException, InterpreterException { } @Test - public void testLocalMode() throws IOException, YarnException, InterpreterException { + public void testLocalMode() throws IOException, YarnException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "local[*]"); sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); @@ -143,7 +160,7 @@ public void testLocalMode() throws IOException, YarnException, InterpreterExcept } @Test - public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException { + public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "yarn-client"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); @@ -166,7 +183,7 @@ public void testYarnClientMode() throws IOException, YarnException, InterruptedE } @Test - public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException { + public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "yarn-cluster"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());