diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala index fcb5271e2..4a19d4b81 100644 --- a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala @@ -17,22 +17,18 @@ package org.apache.toree.kernel.api -import java.io.{InputStream, OutputStream, PrintStream} - +import java.io.{InputStream, PrintStream} +import java.net.URI import com.typesafe.config.Config import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.sql.SparkSession /** * Interface for the kernel API. This does not include exposed variables. */ trait KernelLike { - def createSparkContext(conf: SparkConf): SparkContext - - def createSparkContext(master: String): SparkContext - /** * Executes a block of code represented as a string and returns the result. * @@ -105,6 +101,8 @@ trait KernelLike { def config: Config + def addJars(uris: URI*) + def sparkContext: SparkContext def sparkConf: SparkConf diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala deleted file mode 100644 index 92dfe0543..000000000 --- a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.magic.dependencies - -import org.apache.spark.SparkContext -import org.apache.toree.magic.Magic -import org.apache.toree.plugins.Plugin -import org.apache.toree.plugins.annotations.{Event, Init} - -trait IncludeSparkContext extends Plugin { - this: Magic => - - @Event(name = "sparkReady") protected def sparkReady( - newSparkContext: SparkContext - ) = _sparkContext = newSparkContext - - private var _sparkContext: SparkContext = _ - def sparkContext: SparkContext = _sparkContext -} diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala deleted file mode 100644 index 9bd92c2c7..000000000 --- a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.magic.dependencies - -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.toree.magic.Magic -import org.apache.toree.plugins.Plugin -import org.apache.toree.plugins.annotations.{Event, Init} - -trait IncludeSparkSession extends Plugin { - this: Magic => - - @Event(name = "sparkReady") protected def sparkReady( - newSparkSession: SparkSession - ) = _spark = newSparkSession - - private var _spark: SparkSession = _ - def spark: SparkSession = _spark -} diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala index 63a358d8c..42999d72e 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala @@ -45,7 +45,6 @@ trait ComponentInitialization { * Initializes and registers all components (not needed by bare init). * * @param config The config used for initialization - * @param appName The name of the "application" for Spark * @param actorLoader The actor loader to use for some initialization */ def initializeComponents( @@ -83,8 +82,6 @@ trait StandardComponentInitialization extends ComponentInitialization { initializePlugins(config, pluginManager) - initializeSparkContext(config, kernel) - interpreterManager.initializeInterpreters(kernel) pluginManager.fireEvent(AllInterpretersReady) @@ -97,13 +94,6 @@ trait StandardComponentInitialization extends ComponentInitialization { } - - def initializeSparkContext(config:Config, kernel:Kernel) = { - if(!config.getBoolean("nosparkcontext")) { - kernel.createSparkContext(config.getString("spark.master")) - } - } - private def initializeCommObjects(actorLoader: ActorLoader) = { logger.debug("Constructing Comm storage") val commStorage = new CommStorage() diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala index 417983bbd..285db1f62 100644 --- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala +++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala @@ -18,7 +18,8 @@ package org.apache.toree.kernel.api import java.io.{InputStream, PrintStream} -import java.util.concurrent.ConcurrentHashMap +import java.net.URI +import java.util.concurrent.{ConcurrentHashMap, TimeUnit, TimeoutException} import scala.collection.mutable import com.typesafe.config.Config import org.apache.spark.api.java.JavaSparkContext @@ -35,14 +36,15 @@ import org.apache.toree.kernel.protocol.v5 import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader import org.apache.toree.kernel.protocol.v5.magic.MagicParser import org.apache.toree.kernel.protocol.v5.stream.KernelOutputStream -import org.apache.toree.kernel.protocol.v5.{KMBuilder, KernelMessage} +import org.apache.toree.kernel.protocol.v5.{KMBuilder, KernelMessage, MIMEType} import org.apache.toree.magic.MagicManager import org.apache.toree.plugins.PluginManager -import org.apache.toree.utils.{KeyValuePairUtils, LogLike} +import org.apache.toree.utils.LogLike import scala.language.dynamics import scala.reflect.runtime.universe._ -import scala.util.{DynamicVariable, Try} -import org.apache.toree.plugins.SparkReady +import scala.util.DynamicVariable +import scala.concurrent.duration.Duration +import scala.concurrent.{Future, Await} /** * Represents the main kernel API to be used for interaction. @@ -61,6 +63,23 @@ class Kernel ( val pluginManager: PluginManager ) extends KernelLike with LogLike { + /** + * Jars that have been added to the kernel + */ + private val jars = new mutable.ArrayBuffer[URI]() + + override def addJars(uris: URI*): Unit = { + uris.foreach { uri => + if (uri.getScheme != "file") { + throw new RuntimeException("Cannot add non-local jar: " + uri) + } + } + + jars ++= uris + interpreter.addJars(uris.map(_.toURL):_*) + uris.foreach(uri => sparkContext.addJar(uri.getPath)) + } + /** * Represents the current input stream used by the kernel for the specific * thread. @@ -339,30 +358,6 @@ class Kernel ( someKernelMessage.get } - override def createSparkContext(conf: SparkConf): SparkContext = { - val sconf = createSparkConf(conf) - val _sparkSession = SparkSession.builder.config(sconf).getOrCreate() - - val sparkMaster = sconf.getOption("spark.master").getOrElse("not_set") - logger.info( s"Connecting to spark.master $sparkMaster") - - // TODO: Convert to events - pluginManager.dependencyManager.add(_sparkSession.sparkContext.getConf) - pluginManager.dependencyManager.add(_sparkSession) - pluginManager.dependencyManager.add(_sparkSession.sparkContext) - pluginManager.dependencyManager.add(javaSparkContext(_sparkSession)) - - pluginManager.fireEvent(SparkReady) - - _sparkSession.sparkContext - } - - override def createSparkContext( - master: String - ): SparkContext = { - createSparkContext(new SparkConf().setMaster(master)) - } - // TODO: Think of a better way to test without exposing this protected[toree] def createSparkConf(conf: SparkConf) = { @@ -401,7 +396,36 @@ class Kernel ( interpreterManager.interpreters.get(name) } - override def sparkSession: SparkSession = SparkSession.builder.getOrCreate + private lazy val defaultSparkConf: SparkConf = createSparkConf(new SparkConf()) + + override def sparkSession: SparkSession = { + defaultSparkConf.getOption("spark.master") match { + case Some(master) if !master.contains("local") => + // when connecting to a remote cluster, the first call to getOrCreate + // may create a session and take a long time, so this starts a future + // to get the session. if it take longer than 100 ms, then print a + // message to the user that Spark is starting. + import scala.concurrent.ExecutionContext.Implicits.global + val sessionFuture = Future { + SparkSession.builder.config(defaultSparkConf).getOrCreate + } + + try { + Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS)) + } catch { + case timeout: TimeoutException => + // getting the session is taking a long time, so assume that Spark + // is starting and print a message + display.content( + MIMEType.PlainText, "Waiting for a Spark session to start...") + Await.result(sessionFuture, Duration.Inf) + } + + case _ => + SparkSession.builder.config(defaultSparkConf).getOrCreate + } + } + override def sparkContext: SparkContext = sparkSession.sparkContext override def sparkConf: SparkConf = sparkSession.sparkContext.getConf override def javaSparkContext: JavaSparkContext = javaSparkContext(sparkSession) diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala index 2f4d8122a..9ef935925 100644 --- a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala @@ -30,7 +30,7 @@ import org.apache.toree.plugins.annotations.Event class AddDeps extends LineMagic with IncludeInterpreter - with IncludeOutputStream with IncludeSparkContext with ArgumentParsingSupport + with IncludeOutputStream with ArgumentParsingSupport with IncludeDependencyDownloader with IncludeKernel { @@ -78,7 +78,7 @@ class AddDeps extends LineMagic with IncludeInterpreter if (nonOptionArgs.size == 3) { // get the jars and hold onto the paths at which they reside - val urls = dependencyDownloader.retrieve( + val uris = dependencyDownloader.retrieve( groupId = nonOptionArgs.head, artifactId = nonOptionArgs(1), version = nonOptionArgs(2), @@ -87,11 +87,10 @@ class AddDeps extends LineMagic with IncludeInterpreter extraRepositories = repositoriesWithCreds, verbose = _verbose, trace = _trace - ).map(_.toURL) + ) - // add the jars to the interpreter and spark context - interpreter.addJars(urls:_*) - urls.foreach(url => sparkContext.addJar(url.getPath)) + // pass the new Jars to the kernel + kernel.addJars(uris:_*) } else { printHelp(printStream, """%AddDeps my.company artifact-id version""") } diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala index 489307759..48a812400 100644 --- a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala @@ -47,7 +47,7 @@ object AddJar { } class AddJar - extends LineMagic with IncludeInterpreter with IncludeSparkContext + extends LineMagic with IncludeInterpreter with IncludeOutputStream with DownloadSupport with ArgumentParsingSupport with IncludeKernel with IncludePluginManager with IncludeConfig with LogLike { @@ -137,8 +137,7 @@ class AddJar val plugins = pluginManager.loadPlugins(fileDownloadLocation) pluginManager.initializePlugins(plugins) } else { - interpreter.addJars(fileDownloadLocation.toURI.toURL) - sparkContext.addJar(fileDownloadLocation.getCanonicalPath) + kernel.addJars(fileDownloadLocation.toURI) } } } diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala index 659af4211..421d12b27 100644 --- a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala +++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala @@ -19,11 +19,9 @@ package org.apache.toree.magic.builtin import java.io.{ByteArrayOutputStream, OutputStream} import java.net.{URI, URL} - import org.apache.toree.dependencies.{Credentials, DependencyDownloader} -import org.apache.toree.interpreter.Interpreter import org.apache.toree.utils.ArgumentParsingSupport -import org.apache.spark.SparkContext +import org.apache.toree.kernel.api.KernelLike import org.scalatest.mock.MockitoSugar import org.scalatest.{FunSpec, GivenWhenThen, Matchers} import org.mockito.Mockito._ @@ -38,20 +36,17 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar describe("#execute") { it("should print out the help message if the input is invalid") { val byteArrayOutputStream = new ByteArrayOutputStream() - val mockIntp = mock[Interpreter] - val mockSC = mock[SparkContext] + val mockKernel = mock[KernelLike] val mockDownloader = mock[DependencyDownloader] var printHelpWasRun = false val addDepsMagic = new AddDeps - with IncludeSparkContext - with IncludeInterpreter + with IncludeKernel with IncludeOutputStream with IncludeDependencyDownloader with ArgumentParsingSupport { - override val sparkContext: SparkContext = mockSC - override val interpreter: Interpreter = mockIntp + override val kernel: KernelLike = mockKernel override val dependencyDownloader: DependencyDownloader = mockDownloader override val outputStream: OutputStream = byteArrayOutputStream @@ -65,9 +60,7 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar val actual = addDepsMagic.execute("notvalid") printHelpWasRun should be (true) - verify(mockIntp, times(0)).addJars(any()) - verify(mockIntp, times(0)).bind(any(), any(), any(), any()) - verify(mockSC, times(0)).addJar(any()) + verify(mockKernel, times(0)).addJars(any()) verify(mockDownloader, times(0)).retrieve( anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean() @@ -83,14 +76,12 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar ) val addDepsMagic = new AddDeps - with IncludeSparkContext - with IncludeInterpreter + with IncludeKernel with IncludeOutputStream with IncludeDependencyDownloader with ArgumentParsingSupport { - override val sparkContext: SparkContext = mock[SparkContext] - override val interpreter: Interpreter = mock[Interpreter] + override val kernel: KernelLike = mock[KernelLike] override val dependencyDownloader: DependencyDownloader = mockDependencyDownloader override val outputStream: OutputStream = mock[OutputStream] @@ -111,14 +102,12 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar ) val addDepsMagic = new AddDeps - with IncludeSparkContext - with IncludeInterpreter + with IncludeKernel with IncludeOutputStream with IncludeDependencyDownloader with ArgumentParsingSupport { - override val sparkContext: SparkContext = mock[SparkContext] - override val interpreter: Interpreter = mock[Interpreter] + override val kernel: KernelLike = mock[KernelLike] override val dependencyDownloader: DependencyDownloader = mockDependencyDownloader override val outputStream: OutputStream = mock[OutputStream] @@ -131,53 +120,21 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar expected(0), expected(1), expected(2), false) } - it("should add retrieved artifacts to the interpreter") { + it("should add retrieved artifacts to the kernel") { val mockDependencyDownloader = mock[DependencyDownloader] doReturn(Nil).when(mockDependencyDownloader).retrieve( anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean() ) - val mockInterpreter = mock[Interpreter] - - val addDepsMagic = new AddDeps - with IncludeSparkContext - with IncludeInterpreter - with IncludeOutputStream - with IncludeDependencyDownloader - with ArgumentParsingSupport - { - override val sparkContext: SparkContext = mock[SparkContext] - override val interpreter: Interpreter = mockInterpreter - override val dependencyDownloader: DependencyDownloader = - mockDependencyDownloader - override val outputStream: OutputStream = mock[OutputStream] - } - - val expected = "org.apache.toree" :: "kernel" :: "1.0" :: Nil - addDepsMagic.execute(expected.mkString(" ")) - - verify(mockInterpreter).addJars(any[URL]) - } - - it("should add retrieved artifacts to the spark context") { - val mockDependencyDownloader = mock[DependencyDownloader] - val fakeUri = new URI("file:/foo") - doReturn(fakeUri :: fakeUri :: fakeUri :: Nil) - .when(mockDependencyDownloader).retrieve( - anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(), - anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean() - ) - val mockSparkContext = mock[SparkContext] + val mockKernel = mock[KernelLike] val addDepsMagic = new AddDeps - with IncludeSparkContext - with IncludeInterpreter + with IncludeKernel with IncludeOutputStream with IncludeDependencyDownloader with ArgumentParsingSupport { - override val sparkContext: SparkContext = mockSparkContext - override val interpreter: Interpreter = mock[Interpreter] + override val kernel: KernelLike = mockKernel override val dependencyDownloader: DependencyDownloader = mockDependencyDownloader override val outputStream: OutputStream = mock[OutputStream] @@ -186,7 +143,7 @@ class AddDepsSpec extends FunSpec with Matchers with MockitoSugar val expected = "org.apache.toree" :: "kernel" :: "1.0" :: Nil addDepsMagic.execute(expected.mkString(" ")) - verify(mockSparkContext, times(3)).addJar(anyString()) + verify(mockKernel).addJars(any[URI]) } } } diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala index 169fb602a..1c7b3fc64 100644 --- a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala +++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala @@ -18,15 +18,15 @@ package org.apache.toree.magic.builtin import java.io.OutputStream -import java.net.URL -import java.nio.file.{FileSystems, Files} - +import java.net.{URI, URL} +import java.nio.file.{Files, FileSystems} import org.apache.toree.interpreter.Interpreter -import org.apache.toree.magic.dependencies.{IncludeConfig, IncludeOutputStream, IncludeInterpreter, IncludeSparkContext} +import org.apache.toree.magic.dependencies.{IncludeConfig, IncludeInterpreter, IncludeKernel, IncludeOutputStream} import com.typesafe.config.ConfigFactory import org.apache.spark.SparkContext +import org.apache.toree.kernel.api.KernelLike import org.apache.toree.plugins.PluginManager -import org.scalatest.{Matchers, FunSpec} +import org.scalatest.{FunSpec, Matchers} import org.scalatest.mock.MockitoSugar import org.mockito.Mockito._ import org.mockito.Matchers._ @@ -34,22 +34,18 @@ import org.mockito.Matchers._ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { describe("AddJar"){ describe("#execute") { - it("should call addJar on the provided SparkContext and addJars on the " + - "provided interpreter") { - val mockSparkContext = mock[SparkContext] - val mockInterpreter = mock[Interpreter] + it("should call addJar on the provided kernel") { + val mockKernel = mock[KernelLike] val mockOutputStream = mock[OutputStream] val mockPluginManager = mock[PluginManager] val testConfig = ConfigFactory.load() val addJarMagic = new AddJar - with IncludeSparkContext - with IncludeInterpreter with IncludeOutputStream with IncludeConfig + with IncludeKernel { - override val sparkContext: SparkContext = mockSparkContext - override val interpreter: Interpreter = mockInterpreter + override val kernel: KernelLike = mockKernel override val outputStream: OutputStream = mockOutputStream override lazy val pluginManager: PluginManager = mockPluginManager override val config = testConfig @@ -59,8 +55,7 @@ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { addJarMagic.execute("""http://www.example.com/someJar.jar""") - verify(mockSparkContext).addJar(anyString()) - verify(mockInterpreter).addJars(any[URL]) + verify(mockKernel).addJars(any[URI]) verify(mockPluginManager, times(0)).loadPlugins(any()) } @@ -104,21 +99,18 @@ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { } it("should use a cached jar if the force option is not provided") { - val mockSparkContext = mock[SparkContext] - val mockInterpreter = mock[Interpreter] + val mockKernel = mock[KernelLike] val mockOutputStream = mock[OutputStream] var downloadFileCalled = false // Used to verify that downloadFile // was or was not called in this test val testConfig = ConfigFactory.load() val addJarMagic = new AddJar - with IncludeSparkContext - with IncludeInterpreter with IncludeOutputStream with IncludeConfig + with IncludeKernel { - override val sparkContext: SparkContext = mockSparkContext - override val interpreter: Interpreter = mockInterpreter + override val kernel: KernelLike = mockKernel override val outputStream: OutputStream = mockOutputStream override val config = testConfig override def downloadFile(fileUrl: URL, destinationUrl: URL): URL = { @@ -140,26 +132,22 @@ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { tmpFilePath.toFile.delete() downloadFileCalled should be (false) - verify(mockSparkContext).addJar(anyString()) - verify(mockInterpreter).addJars(any[URL]) + verify(mockKernel).addJars(any[URI]) } it("should not use a cached jar if the force option is provided") { - val mockSparkContext = mock[SparkContext] - val mockInterpreter = mock[Interpreter] + val mockKernel = mock[KernelLike] val mockOutputStream = mock[OutputStream] var downloadFileCalled = false // Used to verify that downloadFile // was or was not called in this test val testConfig = ConfigFactory.load() val addJarMagic = new AddJar - with IncludeSparkContext - with IncludeInterpreter with IncludeOutputStream with IncludeConfig + with IncludeKernel { - override val sparkContext: SparkContext = mockSparkContext - override val interpreter: Interpreter = mockInterpreter + override val kernel: KernelLike = mockKernel override val outputStream: OutputStream = mockOutputStream override val config = testConfig override def downloadFile(fileUrl: URL, destinationUrl: URL): URL = { @@ -181,8 +169,7 @@ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { tmpFilePath.toFile.delete() downloadFileCalled should be (true) - verify(mockSparkContext).addJar(anyString()) - verify(mockInterpreter).addJars(any[URL]) + verify(mockKernel).addJars(any[URI]) } it("should add magic jar to magicloader and not to interpreter and spark"+ @@ -194,12 +181,10 @@ class AddJarSpec extends FunSpec with Matchers with MockitoSugar { val testConfig = ConfigFactory.load() val addJarMagic = new AddJar - with IncludeSparkContext with IncludeInterpreter with IncludeOutputStream with IncludeConfig { - override val sparkContext: SparkContext = mockSparkContext override val interpreter: Interpreter = mockInterpreter override val outputStream: OutputStream = mockOutputStream override lazy val pluginManager: PluginManager = mockPluginManager diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala index f68db5ed8..edd897941 100644 --- a/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala +++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala @@ -21,7 +21,7 @@ import java.io.OutputStream import java.net.URL import org.apache.toree.interpreter.Interpreter -import org.apache.toree.magic.dependencies.{IncludeOutputStream, IncludeInterpreter, IncludeSparkContext} +import org.apache.toree.magic.dependencies.{IncludeOutputStream, IncludeInterpreter} import org.apache.toree.magic.{CellMagic, LineMagic} import org.apache.spark.SparkContext import org.scalatest.{Matchers, FunSpec} @@ -32,11 +32,9 @@ import org.mockito.Matchers._ class TestLSMagic(sc: SparkContext, intp: Interpreter, os: OutputStream) extends LSMagic - with IncludeSparkContext with IncludeInterpreter with IncludeOutputStream { - override val sparkContext: SparkContext = sc override val interpreter: Interpreter = intp override val outputStream: OutputStream = os }