Skip to content

Commit

Permalink
Initial support for running without maven
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 6, 2015
1 parent 1d8ae44 commit 81711c4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 20 deletions.
80 changes: 67 additions & 13 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* this does not necessarily need to be the same version of Hive that is used internally by
* Spark SQL for execution.
*/
protected[hive] def hiveVersion: String =
getConf("spark.sql.hive.version", "0.13.1")
protected[hive] def hiveMetastoreVersion: String =
getConf("spark.sql.hive.metastore.version", "0.13.1")

/**
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
* property can be one of three option:
* - a comma-separated list of jar files that could be passed to a URLClassLoader
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
* option is only valid when using the execution version of Hive.
* - maven - download the correct version of hive on demand from maven.
*/
protected[hive] def hiveMetastoreJars: String =
getConf("spark.sql.hive.metastore.jars", "builtin")

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
Expand All @@ -121,6 +132,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
executionConf.set(
"javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true")

/** The version of hive used internally by Spark SQL. */
lazy val hiveExecutionVersion: String = "0.13.1"

/**
* The copy of the hive client that is used for execution. Currently this must always be
* Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
Expand All @@ -129,31 +143,71 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* for storing peristent metadata, and only point to a dummy metastore in a temporary directory.
*/
@transient
protected[hive] lazy val executionHive: ClientWrapper =
new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion("13"),
isolationOn = false,
protected[hive] lazy val executionHive: ClientWrapper = {
logInfo(s"Initilizing execution hive, version $hiveExecutionVersion")
new ClientWrapper(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
config = Map(
"javax.jdo.option.ConnectionURL" ->
s"jdbc:derby:;databaseName=$localMetastore;create=true"),
rootClassLoader = Utils.getContextOrSparkClassLoader).client.asInstanceOf[ClientWrapper]
s"jdbc:derby:;databaseName=$localMetastore;create=true"))
}
SessionState.setCurrentSessionState(executionHive.state)

/**
* The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. This
* The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
@transient
protected[hive] lazy val metadataHive: ClientInterface = {
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
val metadataConf = new HiveConf()
val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap
// `configure` goes second to override other settings.
val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure

val isolatedLoader = if (hiveMetastoreJars == "builtin") {
if (hiveExecutionVersion != hiveMetastoreVersion) {
throw new IllegalArgumentException(
"Builtin jars can only be used when hive execution version == hive metastore version. " +
s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
"Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " +
s"or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}.")
}
val jars = getClass.getClassLoader match {
case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
case other =>
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore " +
s"using classloader ${other.getClass.getName}. " +
"Please set spark.sql.hive.metastore.jars")
}

// Config goes second to override other settings.
// TODO: Support for loading the jars from an already downloaded location.
IsolatedClientLoader.forVersion(hiveVersion, allConfig ++ configure).client
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
} else {
val jars = hiveMetastoreJars.split(",").map(new java.net.URL(_))
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars")
new IsolatedClientLoader(
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true)
}
isolatedLoader.client
}

protected[sql] override def parseSql(sql: String): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,12 @@ class ClientWrapper(
try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
// The remainder of the command.
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = version match {
case hive.v12 =>
classOf[CommandProcessorFactory]
.callStatic[String, HiveConf, CommandProcessor]("get", cmd_1, conf)
.callStatic[String, HiveConf, CommandProcessor]("get", tokens(0), conf)
case hive.v13 =>
classOf[CommandProcessorFactory]
.callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)), conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.client

import java.io.File
import java.net.URLClassLoader
import java.net.{URL, URLClassLoader}
import java.util

import scala.language.reflectiveCalls
Expand Down Expand Up @@ -49,7 +49,7 @@ object IsolatedClientLoader {
case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
}

private def downloadVersion(version: HiveVersion): Seq[File] = {
private def downloadVersion(version: HiveVersion): Seq[URL] = {
val hiveArtifacts =
(Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
Expand All @@ -72,10 +72,10 @@ object IsolatedClientLoader {
tempDir.mkdir()

allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
tempDir.listFiles()
tempDir.listFiles().map(_.toURL)
}

private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]]
private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
}

/**
Expand All @@ -101,7 +101,7 @@ object IsolatedClientLoader {
*/
class IsolatedClientLoader(
val version: HiveVersion,
val execJars: Seq[File] = Seq.empty,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
Expand All @@ -112,7 +112,7 @@ class IsolatedClientLoader(
assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure)

/** All jars used by the hive specific classloader. */
protected def allJars = execJars.map(_.toURI.toURL).toArray
protected def allJars = execJars.toArray

protected def isSharedClass(name: String): Boolean =
name.contains("slf4j") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,9 @@ class VersionsSuite extends FunSuite with Logging {
test(s"$version: getTable") {
client.getTable("default", "src")
}

test(s"$version: set command") {
client.runSqlHive("SET spark.sql.test.key=1")
}
}
}

0 comments on commit 81711c4

Please sign in to comment.