Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -39,7 +39,11 @@ import org.apache.spark.sql.catalyst.util.StringUtils
*
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends ExternalCatalog {
class InMemoryCatalog(
conf: SparkConf = new SparkConf,
hadoopConfig: Configuration = new Configuration)
extends ExternalCatalog {

import CatalogTypes.TablePartitionSpec

private class TableDesc(var table: CatalogTable) {
Expand Down
14 changes: 1 addition & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ class SparkSession private(
*/
@transient
private[sql] lazy val sharedState: SharedState = {
existingSharedState.getOrElse(
SparkSession.reflect[SharedState, SparkContext](
SparkSession.sharedStateClassName(sparkContext.conf),
sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext))
}

/**
Expand Down Expand Up @@ -913,16 +910,8 @@ object SparkSession {
/** Reference to the root SparkSession. */
private val defaultSession = new AtomicReference[SparkSession]

private val HIVE_SHARED_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSharedState"
private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState"

private def sharedStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SHARED_STATE_CLASS_NAME
case "in-memory" => classOf[SharedState].getCanonicalName
}
}

private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_CLASS_NAME
Expand Down Expand Up @@ -953,7 +942,6 @@ object SparkSession {
private[spark] def hiveClassesArePresent: Boolean = {
try {
Utils.classForName(HIVE_SESSION_STATE_CLASS_NAME)
Utils.classForName(HIVE_SHARED_STATE_CLASS_NAME)
Utils.classForName("org.apache.hadoop.hive.conf.HiveConf")
true
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.spark.sql.internal

import org.apache.spark.SparkContext
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
Expand Down Expand Up @@ -51,7 +57,11 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog(sparkContext.hadoopConfiguration)
lazy val externalCatalog: ExternalCatalog =
SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)

/**
* A classloader used to load all user-added jar.
Expand Down Expand Up @@ -98,6 +108,39 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
}

object SharedState {

private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"

private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
}

/**
* Helper method to create an instance of [[T]] using a single-arg constructor that
* accepts an [[Arg1]] and an [[Arg2]].
*/
private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef](
className: String,
ctorArg1: Arg1,
ctorArg2: Arg2)(
implicit ctorArgTag1: ClassTag[Arg1],
ctorArgTag2: ClassTag[Arg2]): T = {
try {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass)
val args = Array[AnyRef](ctorArg1, ctorArg2)
ctor.newInstance(args: _*).asInstanceOf[T]
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
}


/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
sparkSession.sessionState.asInstanceOf[HiveSessionState]
}

protected[sql] override def sharedState: HiveSharedState = {
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -41,13 +42,20 @@ import org.apache.spark.sql.types.{DataType, StructType}
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
*/
private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration)
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends ExternalCatalog with Logging {

import CatalogTypes.TablePartitionSpec
import HiveExternalCatalog._
import CatalogTableType._

/**
* A Hive client used to interact with the metastore.
*/
val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}

// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(
classOf[HiveException].getCanonicalName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
private val client =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,18 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)

self =>

private lazy val sharedState: HiveSharedState = {
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}

/**
* A Hive client used for interacting with the metastore.
*/
lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession()
lazy val metadataHive: HiveClient =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()

/**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
new HiveSessionCatalog(
sharedState.externalCatalog,
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
metadataHive,
sparkSession,
functionResourceLoader,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.util.{ShutdownHookManager, Utils}

// SPARK-3729: Test key required to check for initialization errors with config.
Expand Down Expand Up @@ -108,13 +108,13 @@ class TestHiveContext(
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
* @param existingSharedState optional [[HiveSharedState]]
* @param existingSharedState optional [[SharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[HiveSharedState],
@transient private val existingSharedState: Option[SharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>

Expand All @@ -139,14 +139,13 @@ private[hive] class TestHiveSparkSession(

assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")

// TODO: Let's remove HiveSharedState and TestHiveSessionState. Otherwise,
// we are not really testing the reflection logic based on the setting of
// CATALOG_IMPLEMENTATION.
@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is TestHiveSessionState already removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Let me move it back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override lazy val sharedState: HiveSharedState = {
existingSharedState.getOrElse(new HiveSharedState(sc))
override lazy val sharedState: SharedState = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to override this?

Copy link
Member Author

@gatorsmile gatorsmile Aug 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sharedState in SparkSession is unable to access the existingSharedState defined in TestHiveSparkSession. We are unable to override it because it is private. Thus, I did not remove it. Feel free to let me know if you have some other ways to remove it. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is existingSharedState.getOrElse(new SharedState(sc)) same with what the parent do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although existingSharedState is using the same name, we did not override the one in SharedState. Thus, they are different.

Actually, I tried it. It breaks one test case

BTW, it works if TestHiveSparkSession overrides both sparkContext and existingSharedState. However, we have to change existingSharedState in SparkSession to non-private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, thanks

existingSharedState.getOrElse(new SharedState(sc))
}

// TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection
// logic based on the setting of CATALOG_IMPLEMENTATION.
@transient
override lazy val sessionState: TestHiveSessionState =
new TestHiveSessionState(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton {
}

test("SPARK-15887: hive-site.xml should be loaded") {
val hiveClient = spark.sharedState.asInstanceOf[HiveSharedState].metadataHive
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveSharedState is removed, and thus, we have to do it. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I misread the code, metadataHive only exist in HiveContext now.

assert(hiveClient.getConf("hive.in.test", "") == "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.HiveClient

/**
* Test suite for the [[HiveExternalCatalog]].
*/
class HiveExternalCatalogSuite extends ExternalCatalogSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the changes in this file make hive client connect to a metastore that already has data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the PR, what HiveExternalCatalogSuite uses is a HiveUtils.newClientForExecution . The newClientForExecution's configuration is newTemporaryConfiguration, which makes a new path for metastore. Thus, we can say it is pointing to a different metastore.


private val client: HiveClient = {
// We create a metastore at a temp location to avoid any potential
// conflict of having multiple connections to a single derby instance.
HiveUtils.newClientForExecution(new SparkConf, new Configuration)
private val externalCatalog: HiveExternalCatalog = {
val catalog = new HiveExternalCatalog(new SparkConf, new Configuration)
catalog.client.reset()
catalog
}

protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
override def newEmptyCatalog(): ExternalCatalog =
new HiveExternalCatalog(client, new Configuration())
override def newEmptyCatalog(): ExternalCatalog = externalCatalog
}

protected override def resetState(): Unit = client.reset()
protected override def resetState(): Unit = {
externalCatalog.client.reset()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,9 @@ object SetMetastoreURLTest extends Logging {
s"spark.sql.test.expectedMetastoreURL should be set.")
}

// HiveSharedState is used when Hive support is enabled.
// HiveExternalCatalog is used when Hive support is enabled.
val actualMetastoreURL =
spark.sharedState.asInstanceOf[HiveSharedState]
.metadataHive
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the above answer.

.getConf("javax.jdo.option.ConnectionURL", "this_is_a_wrong_URL")
logInfo(s"javax.jdo.option.ConnectionURL is $actualMetastoreURL")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv

// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
// columns and bucket specification are still in table properties) from hive client.
private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive
private def hiveClient: HiveClient =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the above answer.

sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

test("persistent JSON table") {
withTable("jsonTable") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}

private def createRawHiveTable(ddl: String): Unit = {
hiveContext.sharedState.asInstanceOf[HiveSharedState].metadataHive.runSqlHive(ddl)
hiveContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(ddl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the above answer.

}

private def checkCreateTable(table: String): Unit = {
Expand Down