From eb4cb8653565fcb66d7c7222cc7b765383bfce45 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 14 Apr 2017 18:04:25 +0800 Subject: [PATCH] Support sc.removeJar --- .../scala/org/apache/spark/SparkContext.scala | 20 ++++++++++ .../scala/org/apache/spark/rpc/RpcEnv.scala | 9 +++++ .../spark/rpc/netty/NettyStreamManager.scala | 10 +++++ .../org/apache/spark/SparkContextSuite.scala | 38 +++++++++++++++++++ 4 files changed, 77 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 99efc4893fda4..32b20dd8e3c62 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1846,6 +1846,26 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Removes a JAR dependency on this `SparkContext` added by `sc.addJar`. + * + * @param path can be found at EnvironmentTab. + */ + def removeJar(path: String) { + if (!addedJars.contains(path)) { + logWarning(s"${path} does not exist") + } else { + val uri = URI.create(path) + val sparkSchema = URI.create(env.rpcEnv.address.toSparkURL).getScheme + val isAddedToFileServer = sparkSchema.equals(uri.getScheme) + val isRemoveFromFileServer = isAddedToFileServer && env.rpcEnv.fileServer.removeJar(uri) + if (isRemoveFromFileServer || !isAddedToFileServer) { + addedJars.remove(path) + postEnvironmentUpdate() + } + } + } + /** * Returns a list of jar files that are added to resources. */ diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 530743c03640b..f502128eea3ba 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -18,6 +18,7 @@ package org.apache.spark.rpc import java.io.File +import java.net.URI import java.nio.channels.ReadableByteChannel import scala.concurrent.Future @@ -184,6 +185,14 @@ private[spark] trait RpcEnvFileServer { */ def addDirectory(baseUri: String, path: File): String + /** + * Remove a jar served by this RpcEnv. + * + * @param uri The jar file uri. + * @return Whether removed it or not. + */ + def removeJar(uri: URI): Boolean + /** Validates and normalizes the base URI for directories. */ protected def validateDirectoryUri(baseUri: String): String = { val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/") diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 780fadd5bda8e..080dbe9bd6eea 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.rpc.netty import java.io.File +import java.net.URI import java.util.concurrent.ConcurrentHashMap import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} @@ -88,4 +89,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) s"${rpcEnv.address.toSparkURL}$fixedBaseUri" } + override def removeJar(uri: URI): Boolean = { + val jarName = uri.getPath.split("/").last + val jarFile = jars.remove(jarName) + if (null != jarFile) { + jarFile.delete() + } + !jars.containsKey(jarName) + } + } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 735f4454e299e..3ac80adcdd6ea 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -310,6 +310,44 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.listJars().head should include (tmpJar.getName) } + test("add a local jar and remove this jar") { + val tmpDir = Utils.createTempDir() + val tmpJar = File.createTempFile("test-1.1.0", ".jar", tmpDir) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(tmpJar.getAbsolutePath) + sc.listJars().size should be (1) + sc.removeJar(sc.listJars().head) + sc.listJars().size should be (0) + + assert (sc.parallelize(Array(1, 2, 3)).count === 3) + } + + test("add a HDFS jar and remove this jar") { + val hdfsFile = "hdfs://nn:8020/jar/test-1.2.0.jar" + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(hdfsFile) + sc.listJars().size should be (1) + sc.removeJar(hdfsFile) + sc.listJars().size should be (0) + + assert (sc.parallelize(Array(1, 2, 3)).count === 3) + } + + test("remove a non exist jar") { + val tmpDir = Utils.createTempDir() + val tmpJar = File.createTempFile("test-1.1.0", ".jar", tmpDir) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(tmpJar.getAbsolutePath) + sc.listJars().size should be (1) + sc.removeJar(sc.listJars().head + "1") + sc.listJars().size should be (1) + + assert (sc.parallelize(Array(1, 2, 3)).count === 3) + } + test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))