From 62350fd28208ee2ece0fa5182d1387a0821c3845 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20H=C3=A1va?= Date: Sat, 25 Mar 2017 02:10:23 +0100 Subject: [PATCH] =?UTF-8?q?[SW-364]=20setupNames=20method=20wasn't=20calle?= =?UTF-8?q?d=20which=20means=20that=20multiple=20re=E2=80=A6=20(#218)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [SW-364] setupNames method wasn't called which means that multiple repl couldn't coexist * Remove additional package which is causing problems in scala 2.11 IMain * Better code reuse between repls for scala 2.10 and 2.11 * Fix important in H2OIMain scala 2.10 (cherry picked from commit c729b75242032852c775bbdac14e1549d5177ce5) --- .../spark/repl/h2o/H2OIMainHelper.scala | 95 +++++++++++++++++++ .../org.apache.spark.repl.h2o/H2OIMain.scala | 78 ++------------- .../org.apache.spark.repl.h2o/H2OIMain.scala | 83 ++-------------- .../org/apache/spark/repl/h2o/H2OIMain.scala | 62 ++++++++++++ 4 files changed, 173 insertions(+), 145 deletions(-) create mode 100644 repl/src/main/scala/org/apache/spark/repl/h2o/H2OIMainHelper.scala create mode 100644 repl/src/main/scala_2.11/org/apache/spark/repl/h2o/H2OIMain.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/h2o/H2OIMainHelper.scala b/repl/src/main/scala/org/apache/spark/repl/h2o/H2OIMainHelper.scala new file mode 100644 index 0000000000..81c2586403 --- /dev/null +++ b/repl/src/main/scala/org/apache/spark/repl/h2o/H2OIMainHelper.scala @@ -0,0 +1,95 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.repl.h2o + +import java.io.File + +import org.apache.spark.repl.Main +import org.apache.spark.util.{MutableURLClassLoader, Utils} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv} + +import scala.tools.nsc.interpreter.Naming + +/** + * Helper methods for H2OIMain on both scala versions + */ +trait H2OIMainHelper { + + private var _initialized = false + + /** + * Ensure that each class defined in REPL is in a package containing number of repl session + */ + def setupClassNames(naming: Naming, sessionId: Int): Unit = { + import naming._ + // sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value + naming.sessionNames.line + val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames") + fieldSessionNames.setAccessible(true) + fieldSessionNames.set(naming, new SessionNames { + override def line = "intp_id_" + sessionId + propOr("line") + }) + } + + def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = { + SparkEnv.get.serializer.setDefaultClassLoader(classLoader) + SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader) + } + + def newREPLDirectory(): File = { + val conf = new SparkConf() + val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) + val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") + outputDir + } + + private def prepareLocalClassLoader() = { + val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader") + f.setAccessible(true) + val value = f.get(SparkEnv.get.serializer) + value match { + case v: Option[_] => { + v.get match { + case cl: MutableURLClassLoader => cl.addURL(H2OInterpreter.classOutputDirectory.toURI.toURL) + case _ => + } + } + case _ => + } + } + + def initializeClassLoader(sc: SparkContext): Unit = { + if(!_initialized){ + if (sc.isLocal) { + prepareLocalClassLoader() + setClassLoaderToSerializers(new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)) + } else if (Main.interp != null) { + // Application has been started using SparkShell script. + // Set the original interpreter classloader as the fallback class loader for all + // class not defined in our custom REPL. + setClassLoaderToSerializers(new InterpreterClassLoader(Main.interp.intp.classLoader)) + } else { + // Application hasn't been started using SparkShell. + // Set the context classloader as the fallback class loader for all + // class not defined in our custom REPL + setClassLoaderToSerializers(new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)) + } + _initialized = true + } + } +} diff --git a/repl/src/main/scala_2.10/org.apache.spark.repl.h2o/H2OIMain.scala b/repl/src/main/scala_2.10/org.apache.spark.repl.h2o/H2OIMain.scala index 16e8443335..5a51bbed3e 100644 --- a/repl/src/main/scala_2.10/org.apache.spark.repl.h2o/H2OIMain.scala +++ b/repl/src/main/scala_2.10/org.apache.spark.repl.h2o/H2OIMain.scala @@ -31,9 +31,9 @@ import scala.tools.nsc.Settings */ private[repl] class H2OIMain private(initialSettings: Settings, interpreterWriter: IntpResponseWriter, - val sessionID: Int, + sessionId: Int, propagateExceptions: Boolean = false) - extends SparkIMain(initialSettings, interpreterWriter, propagateExceptions) { + extends SparkIMain(initialSettings, interpreterWriter, propagateExceptions) with H2OIMainHelper { // we need to setup the compiler and stop the class server only when spark version detected at runtime doesn't actually @@ -42,7 +42,7 @@ private[repl] class H2OIMain private(initialSettings: Settings, setupCompiler() stopClassServer() } - setupClassNames() + setupClassNames(naming, sessionId) /** @@ -70,79 +70,16 @@ private[repl] class H2OIMain private(initialSettings: Settings, fieldCompiler.setAccessible(true) fieldCompiler.set(this, newCompiler(settings, reporter)) } - - /** - * Ensure that each class defined in repl is in a package containing number of repl session - */ - private def setupClassNames() = { - import naming._ - // sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value - naming.sessionNames.line - val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames") - fieldSessionNames.setAccessible(true) - fieldSessionNames.set(naming, new SessionNames { - override def line = "intp_id_" + sessionID + "." + propOr("line") - }) - } } -object H2OIMain { +object H2OIMain extends H2OIMainHelper{ val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain] - private var interpreterClassloader: InterpreterClassLoader = _ - private var _initialized = false - - private def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = { - SparkEnv.get.serializer.setDefaultClassLoader(classLoader) - SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader) - } - - /** - * Add directory with classes defined in repl to the classloader - * which is used in the local mode. This classloader is obtained using reflections. - */ - private def prepareLocalClassLoader() = { - val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader") - f.setAccessible(true) - val value = f.get(SparkEnv.get.serializer) - value match { - case v: Option[_] => { - v.get match { - case cl: MutableURLClassLoader => cl.addURL(H2OIMain.classOutputDirectory.toURI.toURL) - case _ => - } - } - case _ => - } - } - private def initialize(sc: SparkContext): Unit = { - if (sc.isLocal) { - // master set to local or local[*] - - prepareLocalClassLoader() - interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader) - } else { - if (Main.interp != null) { - interpreterClassloader = new InterpreterClassLoader(Main.interp.intp.classLoader) - } else { - // non local mode, application not started using SparkSubmit - interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader) - } - } - setClassLoaderToSerializers(interpreterClassloader) - } - - def getInterpreterClassloader: InterpreterClassLoader = { - interpreterClassloader - } def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized { - if(!_initialized){ - initialize(sc) - _initialized = true - } + initializeClassLoader(sc) existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId, false)) existingInterpreters(sessionId) } @@ -160,10 +97,7 @@ object H2OIMain { // otherwise the field is not available, which means that this spark version is no longer using dedicated // repl server and we handle this as in Spark 2.0 // REPL hasn't been started yet, create a new directory - val conf = new SparkConf() - val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) - val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") - outputDir + newREPLDirectory() } } diff --git a/repl/src/main/scala_2.11/org.apache.spark.repl.h2o/H2OIMain.scala b/repl/src/main/scala_2.11/org.apache.spark.repl.h2o/H2OIMain.scala index 522f5635c2..f6ff0f63e4 100644 --- a/repl/src/main/scala_2.11/org.apache.spark.repl.h2o/H2OIMain.scala +++ b/repl/src/main/scala_2.11/org.apache.spark.repl.h2o/H2OIMain.scala @@ -17,105 +17,42 @@ package org.apache.spark.repl.h2o +import org.apache.spark.SparkContext import org.apache.spark.repl.Main -import org.apache.spark.util.{MutableURLClassLoader, Utils} -import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter.IMain - /** * Slightly altered scala's IMain allowing multiple interpreters to coexist in parallel. Each line in repl * is wrapped in a package which name contains current session id */ private[repl] class H2OIMain private(initialSettings: Settings, interpreterWriter: IntpResponseWriter, - val sessionID: Int) - extends IMain(initialSettings, interpreterWriter) { + sessionId: Int) + extends IMain(initialSettings, interpreterWriter) with H2OIMainHelper { - /** - * Ensure that each class defined in repl is in a package containing number of repl session - */ - def setupClassNames() = { - import naming._ - // sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value - naming.sessionNames.line - val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames") - fieldSessionNames.setAccessible(true) - fieldSessionNames.set(naming, new SessionNames { - override def line = "intp_id_" + sessionID + "." + propOr("line") - }) - } + setupClassNames(naming, sessionId) } -object H2OIMain { +object H2OIMain extends H2OIMainHelper{ val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain] - private var interpreterClassloader: InterpreterClassLoader = _ - private var _initialized = false - - private def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = { - SparkEnv.get.serializer.setDefaultClassLoader(classLoader) - SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader) - } - - private def prepareLocalClassLoader() = { - val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader") - f.setAccessible(true) - val value = f.get(SparkEnv.get.serializer) - value match { - case v: Option[_] => { - v.get match { - case cl: MutableURLClassLoader => cl.addURL(H2OInterpreter.classOutputDirectory.toURI.toURL) - case _ => - } - } - case _ => - } - } - - private def initialize(sc: SparkContext): Unit = { - if (sc.isLocal) { - // master set to local or local[*] - prepareLocalClassLoader() - interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader) - } else { - if (Main.interp != null) { - interpreterClassloader = new InterpreterClassLoader(Main.interp.intp.classLoader) - } else { - // non local mode, application not started using SparkSubmit - interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader) - } - } - setClassLoaderToSerializers(interpreterClassloader) - } - - def getInterpreterClassloader: InterpreterClassLoader = { - interpreterClassloader - } - def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized { - if(!_initialized){ - initialize(sc) - _initialized = true - } - existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId)) + initializeClassLoader(sc) + existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId, false)) existingInterpreters(sessionId) } private lazy val _classOutputDirectory = { - if (org.apache.spark.repl.Main.interp != null) { + if (Main.interp != null) { // Application was started using shell, we can reuse this directory - org.apache.spark.repl.Main.outputDir + Main.outputDir } else { // REPL hasn't been started yet, create a new directory - val conf = new SparkConf() - val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) - val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") - outputDir + newREPLDirectory() } } diff --git a/repl/src/main/scala_2.11/org/apache/spark/repl/h2o/H2OIMain.scala b/repl/src/main/scala_2.11/org/apache/spark/repl/h2o/H2OIMain.scala new file mode 100644 index 0000000000..2f3e7b4d16 --- /dev/null +++ b/repl/src/main/scala_2.11/org/apache/spark/repl/h2o/H2OIMain.scala @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.repl.h2o + +import org.apache.spark.SparkContext +import org.apache.spark.repl.Main + +import scala.collection.mutable +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.IMain + + +/** + * Slightly altered scala's IMain allowing multiple interpreters to coexist in parallel. Each line in repl + * is wrapped in a package which name contains current session id + */ +private[repl] class H2OIMain private(initialSettings: Settings, + interpreterWriter: IntpResponseWriter, + sessionId: Int) + extends IMain(initialSettings, interpreterWriter) with H2OIMainHelper { + + setupClassNames(naming, sessionId) + +} + +object H2OIMain extends H2OIMainHelper { + + val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain] + + def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized { + initializeClassLoader(sc) + existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId)) + existingInterpreters(sessionId) + } + + private lazy val _classOutputDirectory = { + if (Main.interp != null) { + // Application was started using shell, we can reuse this directory + Main.outputDir + } else { + // REPL hasn't been started yet, create a new directory + newREPLDirectory() + } + } + + def classOutputDirectory = _classOutputDirectory +} \ No newline at end of file