From 705e95ac5d6d2980fd69e87466938d6481891821 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 9 Jul 2015 11:17:37 -0700 Subject: [PATCH 1/3] [SPARK-8957][SQL] Backport Hive 1.X support to Branch 1.4 --- .../org/apache/spark/deploy/SparkSubmit.scala | 33 +- .../scala/org/apache/spark/util/Utils.scala | 42 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 14 +- .../spark/sql/hive/client/ClientWrapper.scala | 180 ++++--- .../spark/sql/hive/client/HiveShim.scala | 449 ++++++++++++++++++ .../hive/client/IsolatedClientLoader.scala | 53 ++- .../spark/sql/hive/client/package.scala | 47 +- .../spark/sql/hive/client/VersionsSuite.scala | 25 +- 8 files changed, 691 insertions(+), 152 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 43631ee279f0a..cc6de4ea6314a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -884,11 +884,7 @@ private[spark] object SparkSubmitUtils { ivyConfName: String, md: DefaultModuleDescriptor): Unit = { // Add scala exclusion rule - val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") - val scalaDependencyExcludeRule = - new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) - scalaDependencyExcludeRule.addConfiguration(ivyConfName) - md.addExcludeRule(scalaDependencyExcludeRule) + md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName)) // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and // other spark-streaming utility components. Underscore is there to differentiate between @@ -897,13 +893,8 @@ private[spark] object SparkSubmitUtils { "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") components.foreach { comp => - val sparkArtifacts = - new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*") - val sparkDependencyExcludeRule = - new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) - sparkDependencyExcludeRule.addConfiguration(ivyConfName) - - md.addExcludeRule(sparkDependencyExcludeRule) + md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, + ivyConfName)) } } @@ -916,6 +907,7 @@ private[spark] object SparkSubmitUtils { * @param coordinates Comma-delimited string of maven coordinates * @param remoteRepos Comma-delimited string of remote repositories other than maven central * @param ivyPath The path to the local ivy repository + * @param exclusions Exclusions to apply when resolving transitive dependencies * @return The comma-delimited path to the jars of the given maven artifacts including their * transitive dependencies */ @@ -923,6 +915,7 @@ private[spark] object SparkSubmitUtils { coordinates: String, remoteRepos: Option[String], ivyPath: Option[String], + exclusions: Seq[String] = Nil, isTest: Boolean = false): String = { if (coordinates == null || coordinates.trim.isEmpty) { "" @@ -989,6 +982,10 @@ private[spark] object SparkSubmitUtils { // add all supplied maven artifacts as dependencies addDependenciesToIvy(md, artifacts, ivyConfName) + exclusions.foreach { e => + md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) + } + // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { @@ -1005,6 +1002,18 @@ private[spark] object SparkSubmitUtils { } } } + + private def createExclusion( + coords: String, + ivySettings: IvySettings, + ivyConfName: String): ExcludeRule = { + val c = extractMavenCoordinates(coords)(0) + val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") + val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) + rule.addConfiguration(ivyConfName) + rule + } + } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f132410540fd..f1f85f82cd4ba 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1805,15 +1805,10 @@ private[spark] object Utils extends Logging { lazy val isInInterpreter: Boolean = { try { - val interpClass = classForName("spark.repl.Main") + val interpClass = classForName("org.apache.spark.repl.Main") interpClass.getMethod("interp").invoke(null) != null } catch { - // Returning true seems to be a mistake. - // Currently changing it to false causes tests failures in Streaming. - // For a more detailed discussion, please, refer to - // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. - // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 - case _: ClassNotFoundException => true + case _: ClassNotFoundException => false } } @@ -2339,3 +2334,36 @@ private[spark] class RedirectThread( } } } + +/** + * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it + * in a circular buffer. The current contents of the buffer can be accessed using + * the toString method. + */ +private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](sizeInBytes) + + def write(i: Int): Unit = { + buffer(pos) = i + pos = (pos + 1) % buffer.length + } + + override def toString: String = { + val (end, start) = buffer.splitAt(pos) + val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 + } + val reader = new BufferedReader(new InputStreamReader(input)) + val stringBuilder = new StringBuilder + var line = reader.readLine() + while (line != null) { + stringBuilder.append(line) + stringBuilder.append("\n") + line = reader.readLine() + } + stringBuilder.toString() + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 55594708d827e..27c15e78c00e6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -106,7 +106,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { IvyTestUtils.withRepository(main, None, None) { repo => // end to end val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo), - Option(tempIvyPath), true) + Option(tempIvyPath), isTest = true) assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") } } @@ -116,7 +116,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val dep = "my.great.dep:mydep:0.5" // Local M2 repository IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true) + val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, Nil, true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } @@ -124,7 +124,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val settings = new IvySettings val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), true) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true) + val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, Nil, true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } @@ -134,7 +134,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, ivySettings = settings) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, - Some(tempIvyPath), true) + Some(tempIvyPath), Nil, true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -143,7 +143,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("dependency not found throws RuntimeException") { intercept[RuntimeException] { - SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, true) + SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true) } } @@ -155,12 +155,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" - val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true) + val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true) assert(path === "", "should return empty path") val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString, - Some(repo), None, true) + Some(repo), None, isTest = true) assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index cb96dc0c7cd55..cbd2bf6b5eede 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive.client import java.io.{BufferedReader, InputStreamReader, File, PrintStream} import java.net.URI import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet} +import javax.annotation.concurrent.GuardedBy + +import org.apache.spark.util.CircularBuffer import scala.collection.JavaConversions._ import scala.language.reflectiveCalls @@ -27,7 +30,7 @@ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.metastore.api.Database import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.TableType +import org.apache.hadoop.hive.metastore.{TableType => HTableType} import org.apache.hadoop.hive.metastore.api import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata @@ -62,35 +65,18 @@ private[hive] class ClientWrapper( config: Map[String, String], initClassLoader: ClassLoader) extends ClientInterface - with Logging - with ReflectionMagic { + with Logging { // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. - private val outputBuffer = new java.io.OutputStream { - var pos: Int = 0 - var buffer = new Array[Int](10240) - def write(i: Int): Unit = { - buffer(pos) = i - pos = (pos + 1) % buffer.size - } - - override def toString: String = { - val (end, start) = buffer.splitAt(pos) - val input = new java.io.InputStream { - val iterator = (start ++ end).iterator - - def read(): Int = if (iterator.hasNext) iterator.next() else -1 - } - val reader = new BufferedReader(new InputStreamReader(input)) - val stringBuilder = new StringBuilder - var line = reader.readLine() - while(line != null) { - stringBuilder.append(line) - stringBuilder.append("\n") - line = reader.readLine() - } - stringBuilder.toString() - } + private val outputBuffer = new CircularBuffer() + + private val shim = version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() } // Create an internal session state for this ClientWrapper. @@ -131,27 +117,69 @@ private[hive] class ClientWrapper( // TODO: should be a def?s // When we create this val client, the HiveConf of it (conf) is the one associated with state. - private val client = Hive.get(conf) + @GuardedBy("this") + private var client = Hive.get(conf) + + // We use hive's conf for compatibility. + private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES) + private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf) + + /** + * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable. + */ + private def retryLocked[A](f: => A): A = synchronized { + // Hive sometimes retries internally, so set a deadline to avoid compounding delays. + val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong + var numTries = 0 + var caughtException: Exception = null + do { + numTries += 1 + try { + return f + } catch { + case e: Exception if causedByThrift(e) => + caughtException = e + logWarning( + "HiveClientWrapper got thrift exception, destroying client and retrying " + + s"(${retryLimit - numTries} tries remaining)", e) + Thread.sleep(retryDelayMillis) + try { + client = Hive.get(state.getConf, true) + } catch { + case e: Exception if causedByThrift(e) => + logWarning("Failed to refresh hive client, will retry.", e) + } + } + } while (numTries <= retryLimit && System.nanoTime < deadline) + if (System.nanoTime > deadline) { + logWarning("Deadline exceeded") + } + throw caughtException + } + + private def causedByThrift(e: Throwable): Boolean = { + var target = e + while (target != null) { + val msg = target.getMessage() + if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) { + return true + } + target = target.getCause() + } + false + } /** * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. */ - private def withHiveState[A](f: => A): A = synchronized { + private def withHiveState[A](f: => A): A = retryLocked { val original = Thread.currentThread().getContextClassLoader // Set the thread local metastore client to the client associated with this ClientWrapper. Hive.set(client) - version match { - case hive.v12 => - // Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated - // with the HiveConf in `state` to override the context class loader of the current - // thread. So, for Hive 0.12, we add the same behavior. - Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader) - classOf[SessionState] - .callStatic[SessionState, SessionState]("start", state) - case hive.v13 => - classOf[SessionState] - .callStatic[SessionState, SessionState]("setCurrentSessionState", state) - } + // setCurrentSessionState will use the classLoader associated + // with the HiveConf in `state` to override the context class loader of the current + // thread. + shim.setCurrentSessionState(state) val ret = try f finally { Thread.currentThread().setContextClassLoader(original) } @@ -209,15 +237,12 @@ private[hive] class ClientWrapper( properties = h.getParameters.toMap, serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.toMap, tableType = h.getTableType match { - case TableType.MANAGED_TABLE => ManagedTable - case TableType.EXTERNAL_TABLE => ExternalTable - case TableType.VIRTUAL_VIEW => VirtualView - case TableType.INDEX_TABLE => IndexTable - }, - location = version match { - case hive.v12 => Option(h.call[URI]("getDataLocation")).map(_.toString) - case hive.v13 => Option(h.call[Path]("getDataLocation")).map(_.toString) + case HTableType.MANAGED_TABLE => ManagedTable + case HTableType.EXTERNAL_TABLE => ExternalTable + case HTableType.VIRTUAL_VIEW => VirtualView + case HTableType.INDEX_TABLE => IndexTable }, + location = shim.getDataLocation(h), inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), serde = Option(h.getSerializationLib), @@ -247,14 +272,7 @@ private[hive] class ClientWrapper( // set create time qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - version match { - case hive.v12 => - table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u)) - case hive.v13 => - table.location - .map(new org.apache.hadoop.fs.Path(_)) - .foreach(qlTable.call[Path, Unit]("setDataLocation", _)) - } + table.location.foreach { loc => shim.setDataLocation(qlTable, loc) } table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) table.serde.foreach(qlTable.setSerializationLib) @@ -295,13 +313,7 @@ private[hive] class ClientWrapper( override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { val qlTable = toQlTable(hTable) - val qlPartitions = version match { - case hive.v12 => - client.call[metadata.Table, JSet[metadata.Partition]]("getAllPartitionsForPruner", qlTable) - case hive.v13 => - client.call[metadata.Table, JSet[metadata.Partition]]("getAllPartitionsOf", qlTable) - } - qlPartitions.toSeq.map(toHivePartition) + shim.getAllPartitions(client, qlTable).map(toHivePartition) } override def listTables(dbName: String): Seq[String] = withHiveState { @@ -331,15 +343,7 @@ private[hive] class ClientWrapper( val tokens: Array[String] = cmd_trimmed.split("\\s+") // The remainder of the command. val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc: CommandProcessor = version match { - case hive.v12 => - classOf[CommandProcessorFactory] - .callStatic[String, HiveConf, CommandProcessor]("get", tokens(0), conf) - case hive.v13 => - classOf[CommandProcessorFactory] - .callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)), conf) - } - + val proc = shim.getCommandProcessor(tokens(0), conf) proc match { case driver: Driver => val response: CommandProcessorResponse = driver.run(cmd) @@ -350,21 +354,7 @@ private[hive] class ClientWrapper( } driver.setMaxRows(maxRows) - val results = version match { - case hive.v12 => - val res = new JArrayList[String] - driver.call[JArrayList[String], Boolean]("getResults", res) - res.toSeq - case hive.v13 => - val res = new JArrayList[Object] - driver.call[JList[Object], Boolean]("getResults", res) - res.map { r => - r match { - case s: String => s - case a: Array[Object] => a(0).asInstanceOf[String] - } - } - } + val results = shim.getDriverResults(driver) driver.close() results @@ -398,8 +388,8 @@ private[hive] class ClientWrapper( holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean): Unit = withHiveState { - - client.loadPartition( + shim.loadPartition( + client, new Path(loadPath), // TODO: Use URI tableName, partSpec, @@ -414,7 +404,8 @@ private[hive] class ClientWrapper( tableName: String, replace: Boolean, holdDDLTime: Boolean): Unit = withHiveState { - client.loadTable( + shim.loadTable( + client, new Path(loadPath), tableName, replace, @@ -429,7 +420,8 @@ private[hive] class ClientWrapper( numDP: Int, holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit = withHiveState { - client.loadDynamicPartitions( + shim.loadDynamicPartitions( + client, new Path(loadPath), tableName, partSpec, @@ -444,7 +436,7 @@ private[hive] class ClientWrapper( logDebug(s"Deleting table $t") val table = client.getTable("default", t) client.getIndexes("default", t, 255).foreach { index => - client.dropIndex("default", t, index.getIndexName, true) + shim.dropIndex(client, "default", t, index.getIndexName) } if (!table.isIndexTable) { client.dropTable("default", t) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala new file mode 100644 index 0000000000000..1fa9d278e2a57 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong} +import java.lang.reflect.{Method, Modifier} +import java.net.URI +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet} +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} +import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} +import org.apache.hadoop.hive.ql.session.SessionState + +/** + * A shim that defines the interface between ClientWrapper and the underlying Hive library used to + * talk to the metastore. Each Hive version has its own implementation of this class, defining + * version-specific version of needed functions. + * + * The guideline for writing shims is: + * - always extend from the previous version unless really not possible + * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to + * avoid runtime errors due to the above guideline. + */ +private[client] sealed abstract class Shim { + + /** + * Set the current SessionState to the given SessionState. Also, set the context classloader of + * the current thread to the one set in the HiveConf of this given `state`. + * @param state + */ + def setCurrentSessionState(state: SessionState): Unit + + /** + * This shim is necessary because the return type is different on different versions of Hive. + * All parameters are the same, though. + */ + def getDataLocation(table: Table): Option[String] + + def setDataLocation(table: Table, loc: String): Unit + + def getAllPartitions(hive: Hive, table: Table): Seq[Partition] + + def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor + + def getDriverResults(driver: Driver): Seq[String] + + def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long + + def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + + def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit + + def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit + + def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit + + protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = findMethod(klass, name, args: _*) + require(Modifier.isStatic(method.getModifiers()), + s"Method $name of class $klass is not static.") + method + } + + protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + klass.getMethod(name, args: _*) + } + +} + +private[client] class Shim_v0_12 extends Shim { + + private lazy val startMethod = + findStaticMethod( + classOf[SessionState], + "start", + classOf[SessionState]) + private lazy val getDataLocationMethod = findMethod(classOf[Table], "getDataLocation") + private lazy val setDataLocationMethod = + findMethod( + classOf[Table], + "setDataLocation", + classOf[URI]) + private lazy val getAllPartitionsMethod = + findMethod( + classOf[Hive], + "getAllPartitionsForPruner", + classOf[Table]) + private lazy val getCommandProcessorMethod = + findStaticMethod( + classOf[CommandProcessorFactory], + "get", + classOf[String], + classOf[HiveConf]) + private lazy val getDriverResultsMethod = + findMethod( + classOf[Driver], + "getResults", + classOf[JArrayList[String]]) + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val dropIndexMethod = + findMethod( + classOf[Hive], + "dropIndex", + classOf[String], + classOf[String], + classOf[String], + JBoolean.TYPE) + + override def setCurrentSessionState(state: SessionState): Unit = { + // Starting from Hive 0.13, setCurrentSessionState will internally override + // the context class loader of the current thread by the class loader set in + // the conf of the SessionState. So, for this Hive 0.12 shim, we add the same + // behavior and make shim.setCurrentSessionState of all Hive versions have the + // consistent behavior. + Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader) + startMethod.invoke(null, state) + } + + override def getDataLocation(table: Table): Option[String] = + Option(getDataLocationMethod.invoke(table)).map(_.toString()) + + override def setDataLocation(table: Table, loc: String): Unit = + setDataLocationMethod.invoke(table, new URI(loc)) + + override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + + override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] + + override def getDriverResults(driver: Driver): Seq[String] = { + val res = new JArrayList[String]() + getDriverResultsMethod.invoke(driver, res) + res.toSeq + } + + override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { + conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000 + } + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit = { + loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit = { + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean) + } + + override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { + dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean) + } + +} + +private[client] class Shim_v0_13 extends Shim_v0_12 { + + private lazy val setCurrentSessionStateMethod = + findStaticMethod( + classOf[SessionState], + "setCurrentSessionState", + classOf[SessionState]) + private lazy val setDataLocationMethod = + findMethod( + classOf[Table], + "setDataLocation", + classOf[Path]) + private lazy val getAllPartitionsMethod = + findMethod( + classOf[Hive], + "getAllPartitionsOf", + classOf[Table]) + private lazy val getCommandProcessorMethod = + findStaticMethod( + classOf[CommandProcessorFactory], + "get", + classOf[Array[String]], + classOf[HiveConf]) + private lazy val getDriverResultsMethod = + findMethod( + classOf[Driver], + "getResults", + classOf[JList[Object]]) + + override def setCurrentSessionState(state: SessionState): Unit = + setCurrentSessionStateMethod.invoke(null, state) + + override def setDataLocation(table: Table, loc: String): Unit = + setDataLocationMethod.invoke(table, new Path(loc)) + + override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + + override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor] + + override def getDriverResults(driver: Driver): Seq[String] = { + val res = new JArrayList[Object]() + getDriverResultsMethod.invoke(driver, res) + res.map { r => + r match { + case s: String => s + case a: Array[Object] => a(0).asInstanceOf[String] + } + } + } + +} + +private[client] class Shim_v0_14 extends Shim_v0_13 { + + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val getTimeVarMethod = + findMethod( + classOf[HiveConf], + "getTimeVar", + classOf[HiveConf.ConfVars], + classOf[TimeUnit]) + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + JBoolean.TRUE, JBoolean.FALSE) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit = { + loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean, + JBoolean.TRUE, JBoolean.FALSE, JBoolean.FALSE) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit = { + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE) + } + + override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { + getTimeVarMethod.invoke( + conf, + HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, + TimeUnit.MILLISECONDS).asInstanceOf[Long] + } +} + +private[client] class Shim_v1_0 extends Shim_v0_14 { + +} + +private[client] class Shim_v1_1 extends Shim_v1_0 { + + private lazy val dropIndexMethod = + findMethod( + classOf[Hive], + "dropIndex", + classOf[String], + classOf[String], + classOf[String], + JBoolean.TYPE, + JBoolean.TYPE) + + override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { + dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean) + } + +} + +private[client] class Shim_v1_2 extends Shim_v1_1 { + + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JLong.TYPE) + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit = { + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE, + 0: JLong) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index ae12b671d7956..3d609a66f3664 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.File +import java.lang.reflect.InvocationTargetException import java.net.{URL, URLClassLoader} import java.util @@ -28,6 +29,7 @@ import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.spark.Logging import org.apache.spark.deploy.SparkSubmitUtils +import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext @@ -39,38 +41,41 @@ private[hive] object IsolatedClientLoader { */ def forVersion( version: String, - config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized { + config: Map[String, String] = Map.empty, + ivyPath: Option[String] = None): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(version) - val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion)) + val files = resolvedVersions.getOrElseUpdate(resolvedVersion, + downloadVersion(resolvedVersion, ivyPath)) new IsolatedClientLoader(hiveVersion(version), files, config) } def hiveVersion(version: String): HiveVersion = version match { case "12" | "0.12" | "0.12.0" => hive.v12 case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13 + case "14" | "0.14" | "0.14.0" => hive.v14 + case "1.0" | "1.0.0" => hive.v1_0 + case "1.1" | "1.1.0" => hive.v1_1 + case "1.2" | "1.2.0" => hive.v1_2 } - private def downloadVersion(version: HiveVersion): Seq[URL] = { - val hiveArtifacts = - (Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++ - (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil)) - .map(a => s"org.apache.hive:$a:${version.fullVersion}") :+ - "com.google.guava:guava:14.0.1" :+ - "org.apache.hadoop:hadoop-client:2.4.0" + private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = { + val hiveArtifacts = version.extraDeps ++ + Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") + .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ + Seq("com.google.guava:guava:14.0.1", + "org.apache.hadoop:hadoop-client:2.4.0") val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), Some("http://www.datanucleus.org/downloads/maven2"), - None) + ivyPath, + exclusions = version.exclusions) } val allFiles = classpath.split(",").map(new File(_)).toSet // TODO: Remove copy logic. - val tempDir = File.createTempFile("hive", "v" + version.toString) - tempDir.delete() - tempDir.mkdir() - + val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) tempDir.listFiles().map(_.toURL) } @@ -90,14 +95,13 @@ private[hive] object IsolatedClientLoader { * `ClientInterface`, unless `isolationOn` is set to `false`. * * @param version The version of hive on the classpath. used to pick specific function signatures - * that are not compatibile accross versions. + * that are not compatible across versions. * @param execJars A collection of jar files that must include hive and hadoop. * @param config A set of options that will be added to the HiveConf of the constructed client. * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be * true unless loading the version of hive that is on Sparks classloader. * @param rootClassLoader The system root classloader. Must not know about Hive classes. * @param baseClassLoader The spark classloader that is used to load shared classes. - * */ private[hive] class IsolatedClientLoader( val version: HiveVersion, @@ -129,7 +133,7 @@ private[hive] class IsolatedClientLoader( /** True if `name` refers to a spark class that must see specific version of Hive. */ protected def isBarrierClass(name: String): Boolean = name.startsWith(classOf[ClientWrapper].getName) || - name.startsWith(classOf[ReflectionMagic].getName) || + name.startsWith(classOf[Shim].getName) || barrierPrefixes.exists(name.startsWith) protected def classToPath(name: String): String = @@ -172,11 +176,16 @@ private[hive] class IsolatedClientLoader( .newInstance(version, config, classLoader) .asInstanceOf[ClientInterface] } catch { - case ReflectionException(cnf: NoClassDefFoundError) => - throw new ClassNotFoundException( - s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + - "Please make sure that jars for your version of hive and hadoop are included in the " + - s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.") + case e: InvocationTargetException => + if (e.getCause().isInstanceOf[NoClassDefFoundError]) { + val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] + throw new ClassNotFoundException( + s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + + "Please make sure that jars for your version of hive and hadoop are included in the " + + s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.") + } else { + throw e + } } finally { Thread.currentThread.setContextClassLoader(baseClassLoader) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 410d9881ac214..b48082fe4b363 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -19,15 +19,50 @@ package org.apache.spark.sql.hive /** Support for interacting with different versions of the HiveMetastoreClient */ package object client { - private[client] abstract class HiveVersion(val fullVersion: String, val hasBuiltinsJar: Boolean) + private[client] abstract class HiveVersion( + val fullVersion: String, + val extraDeps: Seq[String] = Nil, + val exclusions: Seq[String] = Nil) // scalastyle:off private[client] object hive { - case object v10 extends HiveVersion("0.10.0", true) - case object v11 extends HiveVersion("0.11.0", false) - case object v12 extends HiveVersion("0.12.0", false) - case object v13 extends HiveVersion("0.13.1", false) + case object v12 extends HiveVersion("0.12.0") + case object v13 extends HiveVersion("0.13.1") + + // Hive 0.14 depends on calcite 0.9.2-incubating-SNAPSHOT which does not exist in + // maven central anymore, so override those with a version that exists. + // + // The other excluded dependencies are also nowhere to be found, so exclude them explicitly. If + // they're needed by the metastore client, users will have to dig them out of somewhere and use + // configuration to point Spark at the correct jars. + case object v14 extends HiveVersion("0.14.0", + extraDeps = Seq("org.apache.calcite:calcite-core:1.3.0-incubating", + "org.apache.calcite:calcite-avatica:1.3.0-incubating"), + exclusions = Seq("org.pentaho:pentaho-aggdesigner-algorithm")) + + case object v1_0 extends HiveVersion("1.0.0", + exclusions = Seq("eigenbase:eigenbase-properties", + "org.pentaho:pentaho-aggdesigner-algorithm", + "net.hydromatic:linq4j", + "net.hydromatic:quidem")) + + // The curator dependency was added to the exclusions here because it seems to confuse the ivy + // library. org.apache.curator:curator is a pom dependency but ivy tries to find the jar for it, + // and fails. + case object v1_1 extends HiveVersion("1.1.0", + exclusions = Seq("eigenbase:eigenbase-properties", + "org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm", + "net.hydromatic:linq4j", + "net.hydromatic:quidem")) + + case object v1_2 extends HiveVersion("1.2.0", + exclusions = Seq("eigenbase:eigenbase-properties", + "org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm", + "net.hydromatic:linq4j", + "net.hydromatic:quidem")) } // scalastyle:on -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 7eb4842726665..d52e162acbd04 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.client +import java.io.File + import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.util.Utils @@ -28,6 +30,12 @@ import org.apache.spark.util.Utils * is not fully tested. */ class VersionsSuite extends SparkFunSuite with Logging { + + // Do not use a temp path here to speed up subsequent executions of the unit test during + // development. + private val ivyPath = Some( + new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath()) + private def buildConf() = { lazy val warehousePath = Utils.createTempDir() lazy val metastorePath = Utils.createTempDir() @@ -38,7 +46,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } test("success sanity check") { - val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client + val badClient = IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client val db = new HiveDatabase("default", "") badClient.createDatabase(db) } @@ -67,19 +75,21 @@ class VersionsSuite extends SparkFunSuite with Logging { // TODO: currently only works on mysql where we manually create the schema... ignore("failure sanity check") { val e = intercept[Throwable] { - val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client } + val badClient = quietly { + IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client + } } assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") } - private val versions = Seq("12", "13") + private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0") private var client: ClientInterface = null versions.foreach { version => test(s"$version: create client") { client = null - client = IsolatedClientLoader.forVersion(version, buildConf()).client + client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).client } test(s"$version: createDatabase") { @@ -170,5 +180,12 @@ class VersionsSuite extends SparkFunSuite with Logging { false, false) } + + test(s"$version: create index and reset") { + client.runSqlHive("CREATE TABLE indexed_table (key INT)") + client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + + "as 'COMPACT' WITH DEFERRED REBUILD") + client.reset() + } } } From 249090bc00ab97c667041789e19e291f4c0fa040 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 9 Jul 2015 16:51:46 -0700 Subject: [PATCH 2/3] Update Utils.scala --- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f1f85f82cd4ba..32b7a6c8a42dc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1805,10 +1805,15 @@ private[spark] object Utils extends Logging { lazy val isInInterpreter: Boolean = { try { - val interpClass = classForName("org.apache.spark.repl.Main") + val interpClass = classForName("spark.repl.Main") interpClass.getMethod("interp").invoke(null) != null } catch { - case _: ClassNotFoundException => false + // Returning true seems to be a mistake. + // Currently changing it to false causes tests failures in Streaming. + // For a more detailed discussion, please, refer to + // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. + // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 + case _: ClassNotFoundException => true } } From dadfd60b4f65b93014e83f82b011f8981103604d Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 9 Jul 2015 16:52:19 -0700 Subject: [PATCH 3/3] Update Utils.scala --- .../src/main/scala/org/apache/spark/util/Utils.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 32b7a6c8a42dc..aea4210ad1a57 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1808,12 +1808,12 @@ private[spark] object Utils extends Logging { val interpClass = classForName("spark.repl.Main") interpClass.getMethod("interp").invoke(null) != null } catch { - // Returning true seems to be a mistake. - // Currently changing it to false causes tests failures in Streaming. - // For a more detailed discussion, please, refer to - // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. - // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 - case _: ClassNotFoundException => true + // Returning true seems to be a mistake. + // Currently changing it to false causes tests failures in Streaming. + // For a more detailed discussion, please, refer to + // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. + // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 + case _: ClassNotFoundException => true } }