diff --git a/spark/README.md b/spark/README.md index ceec7ecabd0..a9b039ec5ee 100644 --- a/spark/README.md +++ b/spark/README.md @@ -5,14 +5,24 @@ Spark interpreter is the first and most important interpreter of Zeppelin. It su # Module structure of Spark interpreter - -* interpreter - This module is the entry module of Spark interpreter. All the interpreter interfaces are defined here, but the implementation will be delegated to the scala-xxx module depends on the Scala version of current Spark. -* spark-scala-parent - Parent module for each scala module -* scala-2.11 - Scala module for scala 2.11 +* interpreter + - This module is the entry module of Spark interpreter. All the interpreters are defined here. SparkInterpreter is the most important one, + SparkContext/SparkSession is created here, other interpreters (PySparkInterpreter,IPySparkInterpreter, SparkRInterpreter and etc) are all depends on SparkInterpreter. + Due to incompatibility between Scala versions, there are several scala-x modules for each supported Scala version. + Due to incompatibility between Spark versions, there are several spark-shims modules for each supported Spark version. +* spark-scala-parent + - Parent module for each Scala module +* scala-2.11 + - Scala module for Scala 2.11 * scala-2.12 + - Scala module for Scala 2.12 * scala-2.13 -* spark-shims + - Scala module for Scala 2.13 +* spark-shims + - Parent module for each Spark module * spark2-shims + - Shims module for Spark2 * spark3-shims + - Shims module for Spark3 + -# How to build Spark interpreter diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java index 36a04e2247b..bc58ea74611 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -79,7 +79,7 @@ public SQLContext getSqlContext() { return this.sqlContext; } - public Object getSparkSession() { + public SparkSession getSparkSession() { return this.sparkSession; } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index dde4b7496cf..035924e603f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -22,6 +22,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -73,7 +74,7 @@ public class SparkInterpreter extends AbstractInterpreter { private SparkContext sc; private JavaSparkContext jsc; private SQLContext sqlContext; - private Object sparkSession; + private SparkSession sparkSession; private SparkVersion sparkVersion; private String scalaVersion; @@ -187,14 +188,14 @@ private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir); } - @Override + @Override public void close() throws InterpreterException { LOGGER.info("Close SparkInterpreter"); if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) { innerInterpreter.close(); innerInterpreterClazz = null; } - innerInterpreter = null; + innerInterpreter = null; } @Override @@ -248,14 +249,7 @@ public SparkContext getSparkContext() { return this.sc; } - /** - * Must use Object, because the its api signature in Spark 1.x is different from - * that of Spark 2.x. - * e.g. SqlContext.sql(sql) return different type. - * - * @return - */ - public Object getSQLContext() { + public SQLContext getSQLContext() { return sqlContext; } @@ -263,7 +257,7 @@ public JavaSparkContext getJavaSparkContext() { return this.jsc; } - public Object getSparkSession() { + public SparkSession getSparkSession() { return sparkSession; } @@ -297,11 +291,11 @@ private String extractScalaVersion(SparkConf conf) throws InterpreterException { } } - public boolean isScala211() throws InterpreterException { + public boolean isScala211() { return scalaVersion.equals("2.11"); } - public boolean isScala212() throws InterpreterException { + public boolean isScala212() { return scalaVersion.equals("2.12"); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 4512ddbe6dc..4335b6a107b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -148,9 +148,4 @@ public ZeppelinContext getZeppelinContext() { return sparkInterpreter.getZeppelinContext(); } - @Override - public List completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return new ArrayList<>(); - } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 95e8bd7b553..7cdaad1640a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.spark.SparkContext; import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -57,7 +58,7 @@ public void open() throws InterpreterException { this.sqlSplitter = new SqlSplitter(); } - public boolean concurrentSQL() { + private boolean concurrentSQL() { return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); } @@ -83,7 +84,7 @@ public InterpreterResult internalInterpret(String st, InterpreterContext context } Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); sparkInterpreter.getZeppelinContext().setInterpreterContext(context); - Object sqlContext = sparkInterpreter.getSQLContext(); + SQLContext sqlContext = sparkInterpreter.getSQLContext(); SparkContext sc = sparkInterpreter.getSparkContext(); List sqls = sqlSplitter.splitSql(st); @@ -99,11 +100,10 @@ public InterpreterResult internalInterpret(String st, InterpreterContext context // TODO(zjffdu) scala 2.12,2.13 still doesn't work for codegen (ZEPPELIN-4627) Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); } - Method method = sqlContext.getClass().getMethod("sql", String.class); for (String sql : sqls) { curSql = sql; String result = sparkInterpreter.getZeppelinContext() - .showData(method.invoke(sqlContext, sql), maxResult); + .showData(sqlContext.sql(sql), maxResult); context.out.write(result); } context.out.flush(); @@ -161,7 +161,6 @@ public FormType getFormType() { return FormType.SIMPLE; } - @Override public int getProgress(InterpreterContext context) throws InterpreterException { return sparkInterpreter.getProgress(context); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index ea8fb8b4d01..578af8ed476 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -27,83 +27,17 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Utility and helper functions for the Spark Interpreter */ class Utils { - public static Logger logger = LoggerFactory.getLogger(Utils.class); - public static String DEPRRECATED_MESSAGE = + private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); + private static String DEPRECATED_MESSAGE = "%html Spark lower than 2.2 is deprecated, " + "if you don't want to see this message, please set " + "zeppelin.spark.deprecateMsg.show to false."; - static Object invokeMethod(Object o, String name) { - return invokeMethod(o, name, new Class[]{}, new Object[]{}); - } - - static Object invokeMethod(Object o, String name, Class[] argTypes, Object[] params) { - try { - return o.getClass().getMethod(name, argTypes).invoke(o, params); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class c, String name, Class[] argTypes, Object[] params) { - try { - return c.getMethod(name, argTypes).invoke(null, params); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class c, String name) { - return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); - } - - static Class findClass(String name) { - return findClass(name, false); - } - - static Class findClass(String name, boolean silence) { - try { - return Class.forName(name); - } catch (ClassNotFoundException e) { - if (!silence) { - logger.error(e.getMessage(), e); - } - return null; - } - } - - static Object instantiateClass(String name, Class[] argTypes, Object[] params) { - try { - Constructor constructor = Utils.class.getClassLoader() - .loadClass(name).getConstructor(argTypes); - return constructor.newInstance(params); - } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | - InstantiationException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - // function works after intp is initialized - static boolean isScala2_10() { - try { - Class.forName("org.apache.spark.repl.SparkIMain"); - return true; - } catch (ClassNotFoundException e) { - return false; - } catch (IncompatibleClassChangeError e) { - return false; - } - } public static String buildJobGroupId(InterpreterContext context) { String uName = "anonymous"; @@ -136,7 +70,7 @@ public static void printDeprecateMessage(SparkVersion sparkVersion, && Boolean.parseBoolean( properties.getProperty("zeppelin.spark.deprecatedMsg.show", "true"))) { try { - context.out.write(DEPRRECATED_MESSAGE); + context.out.write(DEPRECATED_MESSAGE); context.out.write("%text "); } catch (IOException e) { throw new InterpreterException(e); diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index e9ed7a3abe1..bb38c71a878 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -139,7 +139,7 @@ class SparkScala211Interpreter(conf: SparkConf, @throws[InterpreterException] def scalaInterpretQuietly(code: String): Unit = { scalaInterpret(code) match { - case success@scala.tools.nsc.interpreter.Results.Success => + case scala.tools.nsc.interpreter.Results.Success => // do nothing case scala.tools.nsc.interpreter.Results.Error => throw new InterpreterException("Fail to run code: " + code) diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index ff50ba02f38..c7ca4019a06 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -140,7 +140,7 @@ class SparkScala212Interpreter(conf: SparkConf, @throws[InterpreterException] def scalaInterpretQuietly(code: String): Unit = { scalaInterpret(code) match { - case success@scala.tools.nsc.interpreter.Results.Success => + case scala.tools.nsc.interpreter.Results.Success => // do nothing case scala.tools.nsc.interpreter.Results.Error => throw new InterpreterException("Fail to run code: " + code) diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala index f9088d818b5..7fc735ecee6 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -114,7 +114,7 @@ class SparkScala213Interpreter(conf: SparkConf, @throws[InterpreterException] def scalaInterpretQuietly(code: String): Unit = { scalaInterpret(code) match { - case success@scala.tools.nsc.interpreter.Results.Success => + case scala.tools.nsc.interpreter.Results.Success => // do nothing case scala.tools.nsc.interpreter.Results.Error => throw new InterpreterException("Fail to run code: " + code) diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala deleted file mode 100644 index bf39449c908..00000000000 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - - -import java.io.{File, IOException, PrintStream} -import java.net.URLClassLoader -import java.nio.file.Paths -import java.util.concurrent.atomic.AtomicInteger -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.yarn.client.api.YarnClient -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext} -import org.apache.zeppelin.kotlin.KotlinInterpreter -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ - - -/** - * Base class for different scala versions of SparkInterpreter. It should be - * binary compatible between multiple scala versions. - * - * @param conf - * @param depFiles - * @param properties - * @param interpreterGroup - */ -abstract class BaseSparkScalaInterpreter(val conf: SparkConf, - val depFiles: java.util.List[String], - val properties: java.util.Properties, - val interpreterGroup: InterpreterGroup, - val sparkInterpreterClassLoader: URLClassLoader) - extends AbstractSparkScalaInterpreter() { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - protected var sc: SparkContext = _ - - protected var sqlContext: SQLContext = _ - - protected var sparkSession: SparkSession = _ - - protected var userJars: Seq[String] = _ - - protected var sparkUrl: String = _ - - protected var z: SparkZeppelinContext = _ - - protected val interpreterOutput: InterpreterOutputStream - - protected val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, - SparkStringConstants.DEFAULT_MASTER_VALUE) - - protected def open(): Unit = { - /* Required for scoped mode. - * In scoped mode multiple scala compiler (repl) generates class in the same directory. - * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' - * Therefore it's possible to generated class conflict(overwrite) with other repl generated - * class. - * - * To prevent generated class name conflict, - * change prefix of generated class name from each scala compiler (repl) instance. - * - * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern - * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ - * - * As hashCode() can return a negative integer value and the minus character '-' is invalid - * in a package name we change it to a numeric value '0' which still conforms to the regexp. - * - */ - System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0')) - - BaseSparkScalaInterpreter.sessionNum.incrementAndGet() - } - - // Used by KotlinSparkInterpreter - def delegateInterpret(interpreter: KotlinInterpreter, - code: String, - context: InterpreterContext): InterpreterResult = { - val out = context.out - val newOut = if (out != null) new PrintStream(out) else null - Console.withOut(newOut) { - interpreter.interpret(code, context) - } - } - - protected def interpret(code: String): InterpreterResult = - interpret(code, InterpreterContext.get()) - - protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { - JobProgressUtil.progress(sc, jobGroup) - } - - override def getSparkContext: SparkContext = sc - - override def getSqlContext: SQLContext = sqlContext - - override def getSparkSession: AnyRef = sparkSession - - override def getSparkUrl: String = sparkUrl - - override def getZeppelinContext: ZeppelinContext = z - - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit - - // for use in java side - protected def bind(name: String, - tpe: String, - value: Object, - modifier: java.util.List[String]): Unit = - bind(name, tpe, value, modifier.asScala.toList) - - protected def close(): Unit = { - // delete stagingDir for yarn mode - if (sparkMaster.startsWith("yarn")) { - val hadoopConf = new YarnConfiguration() - val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) { - new Path(conf.get("spark.yarn.stagingDir")) - } else { - FileSystem.get(hadoopConf).getHomeDirectory() - } - val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId) - cleanupStagingDirInternal(stagingDirPath, hadoopConf) - } - - if (sc != null) { - sc.stop() - sc = null - } - if (sparkSession != null) { - sparkSession.getClass.getMethod("stop").invoke(sparkSession) - sparkSession = null - } - sqlContext = null - z = null - } - - private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = { - try { - val fs = stagingDirPath.getFileSystem(hadoopConf) - if (fs.delete(stagingDirPath, true)) { - LOGGER.info(s"Deleted staging directory $stagingDirPath") - } - } catch { - case ioe: IOException => - LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe) - } - } - - protected def createSparkContext(): Unit = { - spark2CreateContext() - } - - private def spark2CreateContext(): Unit = { - val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$") - val sparkObj = sparkClz.getField("MODULE$").get(null) - - val builderMethod = sparkClz.getMethod("builder") - val builder = builderMethod.invoke(sparkObj) - builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf) - - if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive" - || conf.get("zeppelin.spark.useHiveContext", "false").toLowerCase == "true") { - val hiveSiteExisted: Boolean = - Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null - val hiveClassesPresent = - sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean] - if (hiveSiteExisted && hiveClassesPresent) { - builder.getClass.getMethod("enableHiveSupport").invoke(builder) - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (with Hive support)"); - } else { - if (!hiveClassesPresent) { - LOGGER.warn("Hive support can not be enabled because spark is not built with hive") - } - if (!hiveSiteExisted) { - LOGGER.warn("Hive support can not be enabled because no hive-site.xml found") - } - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (without Hive support)"); - } - } else { - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (without Hive support)"); - } - - sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession) - .asInstanceOf[SparkContext] - getUserFiles().foreach(file => sc.addFile(file)) - sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession) - .asInstanceOf[SQLContext] - sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match { - case Some(url) => sparkUrl = url - case None => - } - - initAndSendSparkWebUrl() - - bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) - bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) - bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient""")) - } - - protected def initAndSendSparkWebUrl(): Unit = { - val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (!StringUtils.isBlank(webUiUrl)) { - this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); - } else { - useYarnProxyURLIfNeeded() - } - InterpreterContext.get.getIntpEventClient.sendWebUrlInfo(this.sparkUrl) - } - - protected def createZeppelinContext(): Unit = { - - var sparkShims: SparkShims = null - if (isSparkSessionPresent()) { - sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) - } else { - sparkShims = SparkShims.getInstance(sc.version, properties, sc) - } - - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, - interpreterGroup.getInterpreterHookRegistry, - properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) - bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) - } - - private def useYarnProxyURLIfNeeded() { - if (properties.getProperty("spark.webui.yarn.useProxy", "false").toBoolean) { - if (sparkMaster.startsWith("yarn")) { - val appId = sc.applicationId - val yarnClient = YarnClient.createYarnClient - val yarnConf = new YarnConfiguration() - // disable timeline service as we only query yarn app here. - // Otherwise we may hit this kind of ERROR: - // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig - yarnConf.set("yarn.timeline-service.enabled", "false") - yarnClient.init(yarnConf) - yarnClient.start() - val appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId)) - this.sparkUrl = appReport.getTrackingUrl - } - } - } - - private def isSparkSessionPresent(): Boolean = { - try { - Class.forName("org.apache.spark.sql.SparkSession") - true - } catch { - case _: ClassNotFoundException | _: NoClassDefFoundError => false - } - } - - protected def getField(obj: Object, name: String): Object = { - val field = obj.getClass.getField(name) - field.setAccessible(true) - field.get(obj) - } - - protected def getDeclareField(obj: Object, name: String): Object = { - val field = obj.getClass.getDeclaredField(name) - field.setAccessible(true) - field.get(obj) - } - - protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = { - val field = obj.getClass.getDeclaredField(name) - field.setAccessible(true) - field.set(obj, value) - } - - protected def callMethod(obj: Object, name: String): Object = { - callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object]) - } - - protected def callMethod(obj: Object, name: String, - parameterTypes: Array[Class[_]], - parameters: Array[Object]): Object = { - val method = obj.getClass.getMethod(name, parameterTypes: _ *) - method.setAccessible(true) - method.invoke(obj, parameters: _ *) - } - - protected def getUserJars(): Seq[String] = { - var classLoader = Thread.currentThread().getContextClassLoader - var extraJars = Seq.empty[String] - while (classLoader != null) { - if (classLoader.getClass.getCanonicalName == - "org.apache.spark.util.MutableURLClassLoader") { - extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() - // Check if the file exists. - .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } - // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. - .filterNot { - u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") - } - .map(url => url.toString).toSeq - classLoader = null - } else { - classLoader = classLoader.getParent - } - } - - extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) - LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) - extraJars - } - - def getUserFiles(): Nothing = { - depFiles.asScala.toSeq.filter(!_.endsWith(".jar")) - } -} - -object BaseSparkScalaInterpreter { - val sessionNum = new AtomicInteger(0) -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 88f0f88ee72..2b7e3a622aa 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -226,7 +226,7 @@ public void testZSession_Spark() throws Exception { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database")); assertEquals(0, result.getJobUrls().size()); } finally { @@ -299,7 +299,7 @@ public void testZSession_Spark_Submit() throws Exception { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database")); assertEquals(0, result.getJobUrls().size()); // cancel