Skip to content

Commit

Permalink
[SW-364] setupNames method wasn't called which means that multiple re… (
Browse files Browse the repository at this point in the history
#218)

* [SW-364] setupNames method wasn't called which means that multiple repl couldn't coexist

* Remove additional package which is causing problems in scala 2.11 IMain

* Better code reuse between repls for scala 2.10 and 2.11

* Fix important in H2OIMain scala 2.10

(cherry picked from commit c729b75)
  • Loading branch information
jakubhava authored and mmalohlava committed Apr 3, 2017
1 parent 2f719c9 commit 62350fd
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 145 deletions.
95 changes: 95 additions & 0 deletions repl/src/main/scala/org/apache/spark/repl/h2o/H2OIMainHelper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl.h2o

import java.io.File

import org.apache.spark.repl.Main
import org.apache.spark.util.{MutableURLClassLoader, Utils}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}

import scala.tools.nsc.interpreter.Naming

/**
* Helper methods for H2OIMain on both scala versions
*/
trait H2OIMainHelper {

private var _initialized = false

/**
* Ensure that each class defined in REPL is in a package containing number of repl session
*/
def setupClassNames(naming: Naming, sessionId: Int): Unit = {
import naming._
// sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value
naming.sessionNames.line
val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames")
fieldSessionNames.setAccessible(true)
fieldSessionNames.set(naming, new SessionNames {
override def line = "intp_id_" + sessionId + propOr("line")
})
}

def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = {
SparkEnv.get.serializer.setDefaultClassLoader(classLoader)
SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader)
}

def newREPLDirectory(): File = {
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
outputDir
}

private def prepareLocalClassLoader() = {
val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader")
f.setAccessible(true)
val value = f.get(SparkEnv.get.serializer)
value match {
case v: Option[_] => {
v.get match {
case cl: MutableURLClassLoader => cl.addURL(H2OInterpreter.classOutputDirectory.toURI.toURL)
case _ =>
}
}
case _ =>
}
}

def initializeClassLoader(sc: SparkContext): Unit = {
if(!_initialized){
if (sc.isLocal) {
prepareLocalClassLoader()
setClassLoaderToSerializers(new InterpreterClassLoader(Thread.currentThread.getContextClassLoader))
} else if (Main.interp != null) {
// Application has been started using SparkShell script.
// Set the original interpreter classloader as the fallback class loader for all
// class not defined in our custom REPL.
setClassLoaderToSerializers(new InterpreterClassLoader(Main.interp.intp.classLoader))
} else {
// Application hasn't been started using SparkShell.
// Set the context classloader as the fallback class loader for all
// class not defined in our custom REPL
setClassLoaderToSerializers(new InterpreterClassLoader(Thread.currentThread.getContextClassLoader))
}
_initialized = true
}
}
}
78 changes: 6 additions & 72 deletions repl/src/main/scala_2.10/org.apache.spark.repl.h2o/H2OIMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import scala.tools.nsc.Settings
*/
private[repl] class H2OIMain private(initialSettings: Settings,
interpreterWriter: IntpResponseWriter,
val sessionID: Int,
sessionId: Int,
propagateExceptions: Boolean = false)
extends SparkIMain(initialSettings, interpreterWriter, propagateExceptions) {
extends SparkIMain(initialSettings, interpreterWriter, propagateExceptions) with H2OIMainHelper {


// we need to setup the compiler and stop the class server only when spark version detected at runtime doesn't actually
Expand All @@ -42,7 +42,7 @@ private[repl] class H2OIMain private(initialSettings: Settings,
setupCompiler()
stopClassServer()
}
setupClassNames()
setupClassNames(naming, sessionId)


/**
Expand Down Expand Up @@ -70,79 +70,16 @@ private[repl] class H2OIMain private(initialSettings: Settings,
fieldCompiler.setAccessible(true)
fieldCompiler.set(this, newCompiler(settings, reporter))
}

/**
* Ensure that each class defined in repl is in a package containing number of repl session
*/
private def setupClassNames() = {
import naming._
// sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value
naming.sessionNames.line
val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames")
fieldSessionNames.setAccessible(true)
fieldSessionNames.set(naming, new SessionNames {
override def line = "intp_id_" + sessionID + "." + propOr("line")
})
}
}


object H2OIMain {
object H2OIMain extends H2OIMainHelper{

val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain]
private var interpreterClassloader: InterpreterClassLoader = _
private var _initialized = false

private def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = {
SparkEnv.get.serializer.setDefaultClassLoader(classLoader)
SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader)
}

/**
* Add directory with classes defined in repl to the classloader
* which is used in the local mode. This classloader is obtained using reflections.
*/
private def prepareLocalClassLoader() = {
val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader")
f.setAccessible(true)
val value = f.get(SparkEnv.get.serializer)
value match {
case v: Option[_] => {
v.get match {
case cl: MutableURLClassLoader => cl.addURL(H2OIMain.classOutputDirectory.toURI.toURL)
case _ =>
}
}
case _ =>
}
}

private def initialize(sc: SparkContext): Unit = {
if (sc.isLocal) {
// master set to local or local[*]

prepareLocalClassLoader()
interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)
} else {
if (Main.interp != null) {
interpreterClassloader = new InterpreterClassLoader(Main.interp.intp.classLoader)
} else {
// non local mode, application not started using SparkSubmit
interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)
}
}
setClassLoaderToSerializers(interpreterClassloader)
}

def getInterpreterClassloader: InterpreterClassLoader = {
interpreterClassloader
}

def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized {
if(!_initialized){
initialize(sc)
_initialized = true
}
initializeClassLoader(sc)
existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId, false))
existingInterpreters(sessionId)
}
Expand All @@ -160,10 +97,7 @@ object H2OIMain {
// otherwise the field is not available, which means that this spark version is no longer using dedicated
// repl server and we handle this as in Spark 2.0
// REPL hasn't been started yet, create a new directory
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
outputDir
newREPLDirectory()
}
}

Expand Down
83 changes: 10 additions & 73 deletions repl/src/main/scala_2.11/org.apache.spark.repl.h2o/H2OIMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,105 +17,42 @@

package org.apache.spark.repl.h2o

import org.apache.spark.SparkContext
import org.apache.spark.repl.Main
import org.apache.spark.util.{MutableURLClassLoader, Utils}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}

import scala.collection.mutable
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain


/**
* Slightly altered scala's IMain allowing multiple interpreters to coexist in parallel. Each line in repl
* is wrapped in a package which name contains current session id
*/
private[repl] class H2OIMain private(initialSettings: Settings,
interpreterWriter: IntpResponseWriter,
val sessionID: Int)
extends IMain(initialSettings, interpreterWriter) {
sessionId: Int)
extends IMain(initialSettings, interpreterWriter) with H2OIMainHelper {

/**
* Ensure that each class defined in repl is in a package containing number of repl session
*/
def setupClassNames() = {
import naming._
// sessionNames is lazy val and needs to be accessed first in order to be then set again to our desired value
naming.sessionNames.line
val fieldSessionNames = naming.getClass.getDeclaredField("sessionNames")
fieldSessionNames.setAccessible(true)
fieldSessionNames.set(naming, new SessionNames {
override def line = "intp_id_" + sessionID + "." + propOr("line")
})
}
setupClassNames(naming, sessionId)
}

object H2OIMain {
object H2OIMain extends H2OIMainHelper{

val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain]
private var interpreterClassloader: InterpreterClassLoader = _
private var _initialized = false

private def setClassLoaderToSerializers(classLoader: ClassLoader): Unit = {
SparkEnv.get.serializer.setDefaultClassLoader(classLoader)
SparkEnv.get.closureSerializer.setDefaultClassLoader(classLoader)
}

private def prepareLocalClassLoader() = {
val f = SparkEnv.get.serializer.getClass.getSuperclass.getDeclaredField("defaultClassLoader")
f.setAccessible(true)
val value = f.get(SparkEnv.get.serializer)
value match {
case v: Option[_] => {
v.get match {
case cl: MutableURLClassLoader => cl.addURL(H2OInterpreter.classOutputDirectory.toURI.toURL)
case _ =>
}
}
case _ =>
}
}

private def initialize(sc: SparkContext): Unit = {
if (sc.isLocal) {
// master set to local or local[*]
prepareLocalClassLoader()
interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)
} else {
if (Main.interp != null) {
interpreterClassloader = new InterpreterClassLoader(Main.interp.intp.classLoader)
} else {
// non local mode, application not started using SparkSubmit
interpreterClassloader = new InterpreterClassLoader(Thread.currentThread.getContextClassLoader)
}
}
setClassLoaderToSerializers(interpreterClassloader)
}

def getInterpreterClassloader: InterpreterClassLoader = {
interpreterClassloader
}


def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized {
if(!_initialized){
initialize(sc)
_initialized = true
}
existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId))
initializeClassLoader(sc)
existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId, false))
existingInterpreters(sessionId)
}

private lazy val _classOutputDirectory = {
if (org.apache.spark.repl.Main.interp != null) {
if (Main.interp != null) {
// Application was started using shell, we can reuse this directory
org.apache.spark.repl.Main.outputDir
Main.outputDir
} else {
// REPL hasn't been started yet, create a new directory
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
outputDir
newREPLDirectory()
}
}

Expand Down
62 changes: 62 additions & 0 deletions repl/src/main/scala_2.11/org/apache/spark/repl/h2o/H2OIMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl.h2o

import org.apache.spark.SparkContext
import org.apache.spark.repl.Main

import scala.collection.mutable
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain


/**
* Slightly altered scala's IMain allowing multiple interpreters to coexist in parallel. Each line in repl
* is wrapped in a package which name contains current session id
*/
private[repl] class H2OIMain private(initialSettings: Settings,
interpreterWriter: IntpResponseWriter,
sessionId: Int)
extends IMain(initialSettings, interpreterWriter) with H2OIMainHelper {

setupClassNames(naming, sessionId)

}

object H2OIMain extends H2OIMainHelper {

val existingInterpreters = mutable.HashMap.empty[Int, H2OIMain]

def createInterpreter(sc: SparkContext, settings: Settings, interpreterWriter: IntpResponseWriter, sessionId: Int): H2OIMain = synchronized {
initializeClassLoader(sc)
existingInterpreters += (sessionId -> new H2OIMain(settings, interpreterWriter, sessionId))
existingInterpreters(sessionId)
}

private lazy val _classOutputDirectory = {
if (Main.interp != null) {
// Application was started using shell, we can reuse this directory
Main.outputDir
} else {
// REPL hasn't been started yet, create a new directory
newREPLDirectory()
}
}

def classOutputDirectory = _classOutputDirectory
}

0 comments on commit 62350fd

Please sign in to comment.