Skip to content

Commit

Permalink
[SPARK-3256] Added support for :cp <jar> that was broken in Scala 2.1…
Browse files Browse the repository at this point in the history
…0.x for REPL

As seen with [SI-6502](https://issues.scala-lang.org/browse/SI-6502) of Scala, the _:cp_ command was broken in Scala 2.10.x. As the Spark shell is a friendly wrapper on top of the Scala REPL, it is also affected by this problem.

My solution was to alter the internal classpath and invalidate any new entries. I also had to add the ability to add new entries to the parent classloader of the interpreter (SparkIMain's global).

The advantage of this versus wiping the interpreter and replaying all of the commands is that you don't have to worry about rerunning heavy Spark-related commands (going to the cluster) or potentially reloading data that might have changed. Instead, you get to work from where you left off.

Until this is fixed upstream for 2.10.x, I had to use reflection to alter the internal compiler classpath.

The solution now looks like this:
![screen shot 2014-08-13 at 3 46 02 pm](https://cloud.githubusercontent.com/assets/2481802/3912625/f02b1440-232c-11e4-9bf6-bafb3e352d14.png)

Author: Chip Senkbeil <rcsenkbe@us.ibm.com>

Closes #1929 from rcsenkbeil/FixReplClasspathSupport and squashes the following commits:

f420cbf [Chip Senkbeil] Added SparkContext.addJar calls to support executing code on remote clusters
a826795 [Chip Senkbeil] Updated AddUrlsToClasspath to use 'new Run' suggestion over hackish compiler error
2ff1d86 [Chip Senkbeil] Added compilation failure on symbols hack to get Scala classes to load correctly
a220639 [Chip Senkbeil] Added support for :cp <jar> that was broken in Scala 2.10.x for REPL
  • Loading branch information
Chip Senkbeil authored and mateiz committed Aug 27, 2014
1 parent 4238c17 commit 191d7cf
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
19 changes: 12 additions & 7 deletions repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Expand Up @@ -8,7 +8,11 @@
package org.apache.spark.repl package org.apache.spark.repl




import java.net.URL

import scala.reflect.io.AbstractFile
import scala.tools.nsc._ import scala.tools.nsc._
import scala.tools.nsc.backend.JavaPlatform
import scala.tools.nsc.interpreter._ import scala.tools.nsc.interpreter._


import scala.tools.nsc.interpreter.{ Results => IR } import scala.tools.nsc.interpreter.{ Results => IR }
Expand All @@ -22,11 +26,10 @@ import scala.tools.util.{ Javap }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.concurrent.ops import scala.concurrent.ops
import scala.tools.nsc.util.{ ClassPath, Exceptional, stringFromWriter, stringFromStream } import scala.tools.nsc.util._
import scala.tools.nsc.interpreter._ import scala.tools.nsc.interpreter._
import scala.tools.nsc.io.{ File, Directory } import scala.tools.nsc.io.{File, Directory}
import scala.reflect.NameTransformer._ import scala.reflect.NameTransformer._
import scala.tools.nsc.util.ScalaClassLoader
import scala.tools.nsc.util.ScalaClassLoader._ import scala.tools.nsc.util.ScalaClassLoader._
import scala.tools.util._ import scala.tools.util._
import scala.language.{implicitConversions, existentials} import scala.language.{implicitConversions, existentials}
Expand Down Expand Up @@ -711,22 +714,24 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
added = true added = true
addedClasspath = ClassPath.join(addedClasspath, f.path) addedClasspath = ClassPath.join(addedClasspath, f.path)
totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath)
intp.addUrlsToClassPath(f.toURI.toURL)
sparkContext.addJar(f.toURI.toURL.getPath)
} }
} }
if (added) replay()
} }


def addClasspath(arg: String): Unit = { def addClasspath(arg: String): Unit = {
val f = File(arg).normalize val f = File(arg).normalize
if (f.exists) { if (f.exists) {
addedClasspath = ClassPath.join(addedClasspath, f.path) addedClasspath = ClassPath.join(addedClasspath, f.path)
val totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) intp.addUrlsToClassPath(f.toURI.toURL)
echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, totalClasspath)) sparkContext.addJar(f.toURI.toURL.getPath)
replay() echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, intp.global.classPath.asClasspathString))
} }
else echo("The path '" + f + "' doesn't seem to exist.") else echo("The path '" + f + "' doesn't seem to exist.")
} }



def powerCmd(): Result = { def powerCmd(): Result = {
if (isReplPower) "Already in power mode." if (isReplPower) "Already in power mode."
else enablePowerMode(false) else enablePowerMode(false)
Expand Down
65 changes: 61 additions & 4 deletions repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
Expand Up @@ -7,11 +7,14 @@


package org.apache.spark.repl package org.apache.spark.repl


import java.io.File

import scala.tools.nsc._ import scala.tools.nsc._
import scala.tools.nsc.backend.JavaPlatform
import scala.tools.nsc.interpreter._ import scala.tools.nsc.interpreter._


import Predef.{ println => _, _ } import Predef.{ println => _, _ }
import util.stringFromWriter import scala.tools.nsc.util.{MergedClassPath, stringFromWriter, ScalaClassLoader, stackTraceString}
import scala.reflect.internal.util._ import scala.reflect.internal.util._
import java.net.URL import java.net.URL
import scala.sys.BooleanProp import scala.sys.BooleanProp
Expand All @@ -21,7 +24,6 @@ import reporters._
import symtab.Flags import symtab.Flags
import scala.reflect.internal.Names import scala.reflect.internal.Names
import scala.tools.util.PathResolver import scala.tools.util.PathResolver
import scala.tools.nsc.util.ScalaClassLoader
import ScalaClassLoader.URLClassLoader import ScalaClassLoader.URLClassLoader
import scala.tools.nsc.util.Exceptional.unwrap import scala.tools.nsc.util.Exceptional.unwrap
import scala.collection.{ mutable, immutable } import scala.collection.{ mutable, immutable }
Expand All @@ -34,7 +36,6 @@ import scala.reflect.runtime.{ universe => ru }
import scala.reflect.{ ClassTag, classTag } import scala.reflect.{ ClassTag, classTag }
import scala.tools.reflect.StdRuntimeTags._ import scala.tools.reflect.StdRuntimeTags._
import scala.util.control.ControlThrowable import scala.util.control.ControlThrowable
import util.stackTraceString


import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf} import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
Expand Down Expand Up @@ -130,6 +131,9 @@ import org.apache.spark.util.Utils
private var _classLoader: AbstractFileClassLoader = null // active classloader private var _classLoader: AbstractFileClassLoader = null // active classloader
private val _compiler: Global = newCompiler(settings, reporter) // our private compiler private val _compiler: Global = newCompiler(settings, reporter) // our private compiler


private trait ExposeAddUrl extends URLClassLoader { def addNewUrl(url: URL) = this.addURL(url) }
private var _runtimeClassLoader: URLClassLoader with ExposeAddUrl = null // wrapper exposing addURL

private val nextReqId = { private val nextReqId = {
var counter = 0 var counter = 0
() => { counter += 1 ; counter } () => { counter += 1 ; counter }
Expand Down Expand Up @@ -308,6 +312,57 @@ import org.apache.spark.util.Utils
} }
} }


/**
* Adds any specified jars to the compile and runtime classpaths.
*
* @note Currently only supports jars, not directories
* @param urls The list of items to add to the compile and runtime classpaths
*/
def addUrlsToClassPath(urls: URL*): Unit = {
new Run // Needed to force initialization of "something" to correctly load Scala classes from jars
urls.foreach(_runtimeClassLoader.addNewUrl) // Add jars/classes to runtime for execution
updateCompilerClassPath(urls: _*) // Add jars/classes to compile time for compiling
}

protected def updateCompilerClassPath(urls: URL*): Unit = {
require(!global.forMSIL) // Only support JavaPlatform

val platform = global.platform.asInstanceOf[JavaPlatform]

val newClassPath = mergeUrlsIntoClassPath(platform, urls: _*)

// NOTE: Must use reflection until this is exposed/fixed upstream in Scala
val fieldSetter = platform.getClass.getMethods
.find(_.getName.endsWith("currentClassPath_$eq")).get
fieldSetter.invoke(platform, Some(newClassPath))

// Reload all jars specified into our compiler
global.invalidateClassPathEntries(urls.map(_.getPath): _*)
}

protected def mergeUrlsIntoClassPath(platform: JavaPlatform, urls: URL*): MergedClassPath[AbstractFile] = {
// Collect our new jars/directories and add them to the existing set of classpaths
val allClassPaths = (
platform.classPath.asInstanceOf[MergedClassPath[AbstractFile]].entries ++
urls.map(url => {
platform.classPath.context.newClassPath(
if (url.getProtocol == "file") {
val f = new File(url.getPath)
if (f.isDirectory)
io.AbstractFile.getDirectory(f)
else
io.AbstractFile.getFile(f)
} else {
io.AbstractFile.getURL(url)
}
)
})
).distinct

// Combine all of our classpaths (old and new) into one merged classpath
new MergedClassPath(allClassPaths, platform.classPath.context)
}

/** Parent classloader. Overridable. */ /** Parent classloader. Overridable. */
protected def parentClassLoader: ClassLoader = protected def parentClassLoader: ClassLoader =
SparkHelper.explicitParentLoader(settings).getOrElse( this.getClass.getClassLoader() ) SparkHelper.explicitParentLoader(settings).getOrElse( this.getClass.getClassLoader() )
Expand Down Expand Up @@ -356,7 +411,9 @@ import org.apache.spark.util.Utils
private def makeClassLoader(): AbstractFileClassLoader = private def makeClassLoader(): AbstractFileClassLoader =
new TranslatingClassLoader(parentClassLoader match { new TranslatingClassLoader(parentClassLoader match {
case null => ScalaClassLoader fromURLs compilerClasspath case null => ScalaClassLoader fromURLs compilerClasspath
case p => new URLClassLoader(compilerClasspath, p) case p =>
_runtimeClassLoader = new URLClassLoader(compilerClasspath, p) with ExposeAddUrl
_runtimeClassLoader
}) })


def getInterpreterClassLoader() = classLoader def getInterpreterClassLoader() = classLoader
Expand Down

0 comments on commit 191d7cf

Please sign in to comment.