From 2837e73e00374fe20c2713138815cb539468996f Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Wed, 15 Feb 2017 16:28:54 -0800 Subject: [PATCH] Refactor SessionState to remove passing of base SessionState, and initialize all fields directly instead. Same change for HiveSessionState. --- .../org/apache/spark/sql/SparkSession.scala | 17 +- .../spark/sql/internal/SessionState.scala | 245 +++++++++--------- .../spark/sql/test/TestSQLContext.scala | 43 ++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +- .../spark/sql/hive/HiveSessionCatalog.scala | 18 ++ .../spark/sql/hive/HiveSessionState.scala | 110 +++++--- .../apache/spark/sql/hive/test/TestHive.scala | 50 +++- 7 files changed, 313 insertions(+), 174 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f0ddff1e2a4ed..01e59de199e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -22,7 +22,6 @@ import java.io.Closeable import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ -import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -114,7 +113,7 @@ class SparkSession private( private[sql] lazy val sessionState: SessionState = { existingSessionState .map(_.copy(this)) - .getOrElse(SparkSession.reflect[SessionState, SparkSession]( + .getOrElse(SparkSession.instantiateSessionState( SparkSession.sessionStateClassName(sparkContext.conf), self)) } @@ -994,16 +993,18 @@ object SparkSession { } /** - * Helper method to create an instance of [[T]] using a single-arg constructor that - * accepts an [[Arg]]. + * Helper method to create an instance of `SessionState` + * The result is either `SessionState` or `HiveSessionState` */ - private def reflect[T, Arg <: AnyRef]( + private def instantiateSessionState( className: String, - ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = { + sparkSession: SparkSession): SessionState = { + try { + // get `SessionState.apply(SparkSession)` val clazz = Utils.classForName(className) - val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass) - ctor.newInstance(ctorArg).asInstanceOf[T] + val method = clazz.getMethod("apply", sparkSession.getClass) + method.invoke(null, sparkSession).asInstanceOf[SessionState] } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Error while instantiating '$className':", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f80555946bb8c..2148ba45d85e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -32,105 +32,28 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager} +import org.apache.spark.sql.streaming.{StreamingQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager /** * A class that holds all session-specific state in a given [[SparkSession]]. - * If an `existingSessionState` is supplied, then its members will be copied over. */ private[sql] class SessionState( sparkSession: SparkSession, - parentSessionState: Option[SessionState]) { + val conf: SQLConf, + val experimentalMethods: ExperimentalMethods, + val functionRegistry: FunctionRegistry, + val catalog: SessionCatalog, + val sqlParser: ParserInterface) { - private[sql] def this(sparkSession: SparkSession) = { - this(sparkSession, None) - } - - // Note: Many of these vals depend on each other (e.g. conf) and should be initialized - // with an early initializer if we want subclasses to override some of the fields. - // Otherwise, we would get a lot of NPEs. - - /** - * SQL-specific key-value configurations. - */ - val conf: SQLConf = { - parentSessionState.map(_.conf.copy).getOrElse(new SQLConf) - } - - def newHadoopConf(): Configuration = { - val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) - conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } - hadoopConf - } - - def newHadoopConfWithOptions(options: Map[String, String]): Configuration = { - val hadoopConf = newHadoopConf() - options.foreach { case (k, v) => - if ((v ne null) && k != "path" && k != "paths") { - hadoopConf.set(k, v) - } - } - hadoopConf - } - - val experimentalMethods: ExperimentalMethods = { - parentSessionState - .map(_.experimentalMethods.copy) - .getOrElse(new ExperimentalMethods) - } - - /** - * Internal catalog for managing functions registered by the user. - */ - val functionRegistry: FunctionRegistry = { - parentSessionState.map(_.functionRegistry.copy).getOrElse(FunctionRegistry.builtin.copy) - } - - /** - * A class for loading resources specified by a function. - */ - val functionResourceLoader: FunctionResourceLoader = { - new FunctionResourceLoader { - override def loadResource(resource: FunctionResource): Unit = { - resource.resourceType match { - case JarResource => addJar(resource.uri) - case FileResource => sparkSession.sparkContext.addFile(resource.uri) - case ArchiveResource => - throw new AnalysisException( - "Archive is not allowed to be loaded. If YARN mode is used, " + - "please use --archives options while calling spark-submit.") - } - } - } - } - - /** - * Internal catalog for managing table and database states. - */ - val catalog: SessionCatalog = { - parentSessionState - .map(_.catalog.copy) - .getOrElse(new SessionCatalog( - sparkSession.sharedState.externalCatalog, - sparkSession.sharedState.globalTempViewManager, - functionResourceLoader, - functionRegistry, - conf, - newHadoopConf(), - sqlParser)) - } - - /** + /* * Interface exposed to the user for registering user-defined functions. * Note that the user-defined functions must be deterministic. */ val udf: UDFRegistration = new UDFRegistration(functionRegistry) - /** - * Logical query plan analyzer for resolving unresolved attributes and relations. - */ + // Logical query plan analyzer for resolving unresolved attributes and relations. val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = @@ -147,37 +70,17 @@ private[sql] class SessionState( } } - /** - * Logical query plan optimizer. - */ + // Logical query plan optimizer. val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) - /** - * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - */ - val sqlParser: ParserInterface = new SparkSqlParser(conf) - - /** - * Planner that converts optimized logical plans to physical plans. - */ - def planner: SparkPlanner = - new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) - - /** + /* * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s * that listen for execution metrics. */ val listenerManager: ExecutionListenerManager = new ExecutionListenerManager - /** - * Interface to start and stop [[StreamingQuery]]s. - */ - val streamingQueryManager: StreamingQueryManager = { - new StreamingQueryManager(sparkSession) - } - - private val jarClassLoader: NonClosableMutableURLClassLoader = - sparkSession.sharedState.jarClassLoader + // Interface to start and stop [[StreamingQuery]]s. + val streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(sparkSession) // Automatically extract all entries and put it in our SQLConf // We need to call it after all of vals have been initialized. @@ -185,11 +88,38 @@ private[sql] class SessionState( conf.setConfString(k, v) } + /** + * Planner that converts optimized logical plans to physical plans. + */ + def planner: SparkPlanner = + new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) + + def newHadoopConf(): Configuration = SessionState.newHadoopConf(sparkSession, conf) + + def newHadoopConfWithOptions(options: Map[String, String]): Configuration = { + val hadoopConf = newHadoopConf() + options.foreach { case (k, v) => + if ((v ne null) && k != "path" && k != "paths") { + hadoopConf.set(k, v) + } + } + hadoopConf + } + /** * Get an identical copy of the `SessionState` and associate it with the given `SparkSession` */ def copy(associatedSparkSession: SparkSession): SessionState = { - new SessionState(associatedSparkSession, Some(this)) + val sqlConf = conf.copy + val sqlParser: ParserInterface = new SparkSqlParser(sqlConf) + + new SessionState( + sparkSession, + sqlConf, + experimentalMethods.copy, + functionRegistry.copy, + catalog.copy, + sqlParser) } // ------------------------------------------------------ @@ -202,7 +132,89 @@ private[sql] class SessionState( catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } - def addJar(path: String): Unit = { + private val jarClassLoader: NonClosableMutableURLClassLoader = + sparkSession.sharedState.jarClassLoader + + def addJar(path: String): Unit = SessionState.addJar(sparkSession, path, jarClassLoader) + + /** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + */ + def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { + AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) + } + +} + + +object SessionState { + + def apply(sparkSession: SparkSession): SessionState = { + apply(sparkSession, None) + } + + def apply( + sparkSession: SparkSession, + conf: Option[SQLConf]): SessionState = { + + // SQL-specific key-value configurations. + val sqlConf = conf.getOrElse(new SQLConf) + + // Internal catalog for managing functions registered by the user. + val functionRegistry = FunctionRegistry.builtin.copy + + val jarClassLoader: NonClosableMutableURLClassLoader = sparkSession.sharedState.jarClassLoader + + // A class for loading resources specified by a function. + val functionResourceLoader: FunctionResourceLoader = + createFunctionResourceLoader(sparkSession, jarClassLoader) + + // Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + val sqlParser: ParserInterface = new SparkSqlParser(sqlConf) + + // Internal catalog for managing table and database states. + val catalog = new SessionCatalog( + sparkSession.sharedState.externalCatalog, + sparkSession.sharedState.globalTempViewManager, + functionResourceLoader, + functionRegistry, + sqlConf, + newHadoopConf(sparkSession, sqlConf), + sqlParser) + + new SessionState( + sparkSession, + sqlConf, + new ExperimentalMethods, + functionRegistry, + catalog, + sqlParser) + } + + def createFunctionResourceLoader( + sparkSession: SparkSession, + jarClassLoader: NonClosableMutableURLClassLoader): FunctionResourceLoader = { + + new FunctionResourceLoader { + override def loadResource(resource: FunctionResource): Unit = { + resource.resourceType match { + case JarResource => addJar(sparkSession, resource.uri, jarClassLoader) + case FileResource => sparkSession.sparkContext.addFile(resource.uri) + case ArchiveResource => + throw new AnalysisException( + "Archive is not allowed to be loaded. If YARN mode is used, " + + "please use --archives options while calling spark-submit.") + } + } + } + } + + def addJar( + sparkSession: SparkSession, + path: String, + jarClassLoader: NonClosableMutableURLClassLoader): Unit = { + sparkSession.sparkContext.addJar(path) val uri = new Path(path).toUri @@ -217,11 +229,10 @@ private[sql] class SessionState( Thread.currentThread().setContextClassLoader(jarClassLoader) } - /** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - */ - def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { - AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) + def newHadoopConf(sparkSession: SparkSession, conf: SQLConf): Configuration = { + val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) + conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } + hadoopConf } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index ca619333674be..da00320d84877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{ExperimentalMethods, SparkSession} +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.internal.{SessionState, SQLConf} /** @@ -34,8 +37,24 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { this(new SparkConf) } - class TestSessionState(sparkSession: SparkSession) extends { - override val conf: SQLConf = { + class TestSessionState( + sparkSession: SparkSession, + conf: SQLConf, + experimentalMethods: ExperimentalMethods, + functionRegistry: FunctionRegistry, + catalog: SessionCatalog, + sqlParser: ParserInterface) + extends SessionState( + sparkSession, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser) {} + + object TestSessionState { + + def makeTestConf: SQLConf = { new SQLConf { clear() override def clear(): Unit = { @@ -45,10 +64,24 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { } } } - } with SessionState(sparkSession) + + def apply(sparkSession: SparkSession): TestSessionState = { + val conf = makeTestConf + val copyHelper = SessionState(sparkSession, Some(conf)) + + new TestSessionState( + sparkSession, + conf, + copyHelper.experimentalMethods, + copyHelper.functionRegistry, + copyHelper.catalog, + copyHelper.sqlParser + ) + } + } @transient - protected[sql] override lazy val sessionState: SessionState = new TestSessionState(this) + protected[sql] override lazy val sessionState: SessionState = TestSessionState(this) // Needed for Java tests def loadTestData(): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index faa76b73fde4b..c6624e01e5a1f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -38,8 +38,8 @@ import org.apache.spark.sql.types._ * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { - private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache + private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] + private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 44ef5cce2ee05..d05781918e5a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -75,6 +75,24 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.hiveDefaultTableFilePath(name) } + def copy(associatedSparkSession: SparkSession): HiveSessionCatalog = { + val catalog = new HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + associatedSparkSession, + functionResourceLoader, + functionRegistry, + conf, + hadoopConf, + parser) + + catalog.currentDb = currentDb + // copy over temporary tables + tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) + + catalog + } + // For testing only private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { val key = metastoreCatalog.getQualifiedTableName(table) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 5a4057173dfc6..4f909ca15e385 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.execution.{SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.internal.{NonClosableMutableURLClassLoader, SessionState, SQLConf} /** @@ -30,38 +32,22 @@ import org.apache.spark.sql.internal.SessionState */ private[hive] class HiveSessionState( sparkSession: SparkSession, - parentHiveSessionState: Option[HiveSessionState]) - extends SessionState(sparkSession, parentHiveSessionState) { + conf: SQLConf, + experimentalMethods: ExperimentalMethods, + functionRegistry: FunctionRegistry, + override val catalog: HiveSessionCatalog, + sqlParser: ParserInterface, + val metadataHive: HiveClient) + extends SessionState( + sparkSession, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser) { self => - private[hive] def this(associatedSparkSession: SparkSession) = { - this(associatedSparkSession, None) - } - - /** - * A Hive client used for interacting with the metastore. - */ - val metadataHive: HiveClient = - parentHiveSessionState.map(_.metadataHive).getOrElse( - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - .newSession()) - - /** - * Internal catalog for managing table and database states. - */ - override val catalog = { - new HiveSessionCatalog( - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], - sparkSession.sharedState.globalTempViewManager, - sparkSession, - functionResourceLoader, - functionRegistry, - conf, - newHadoopConf(), - sqlParser) - } - /** * An analyzer that uses the Hive metastore. */ @@ -156,7 +142,65 @@ private[hive] class HiveSessionState( } override def copy(associatedSparkSession: SparkSession): HiveSessionState = { - new HiveSessionState(associatedSparkSession, Some(this.asInstanceOf[HiveSessionState])) + val sqlParser: ParserInterface = new SparkSqlParser(conf) + + new HiveSessionState( + associatedSparkSession, + conf.copy, + experimentalMethods.copy, + functionRegistry.copy, + catalog.copy(associatedSparkSession), + sqlParser, + metadataHive) + } + +} + +object HiveSessionState { + + def apply(sparkSession: SparkSession): HiveSessionState = { + apply(sparkSession, None) + } + + def apply( + sparkSession: SparkSession, + conf: Option[SQLConf]): HiveSessionState = { + + val sqlConf = conf.getOrElse(new SQLConf) + + val functionRegistry = FunctionRegistry.builtin.copy + + val jarClassLoader: NonClosableMutableURLClassLoader = sparkSession.sharedState.jarClassLoader + + val functionResourceLoader: FunctionResourceLoader = + SessionState.createFunctionResourceLoader(sparkSession, jarClassLoader) + + val sqlParser: ParserInterface = new SparkSqlParser(sqlConf) + + val catalog = new HiveSessionCatalog( + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], + sparkSession.sharedState.globalTempViewManager, + sparkSession, + functionResourceLoader, + functionRegistry, + sqlConf, + SessionState.newHadoopConf(sparkSession, sqlConf), + sqlParser) + + + // A Hive client used for interacting with the metastore. + val metadataHive: HiveClient = + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + .newSession() + + new HiveSessionState( + sparkSession, + sqlConf, + new ExperimentalMethods, + functionRegistry, + catalog, + sqlParser, + metadataHive) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index c48f5e464944e..23120bd595026 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -30,12 +30,14 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.{ExperimentalMethods, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{SharedState, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -145,8 +147,7 @@ private[hive] class TestHiveSparkSession( // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection // logic based on the setting of CATALOG_IMPLEMENTATION. @transient - override lazy val sessionState: TestHiveSessionState = - new TestHiveSessionState(self) + override lazy val sessionState: TestHiveSessionState = TestHiveSessionState(self) override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession(sc, Some(sharedState), loadTestTables) @@ -491,10 +492,30 @@ private[hive] class TestHiveQueryExecution( } private[hive] class TestHiveSessionState( - sparkSession: TestHiveSparkSession) - extends HiveSessionState(sparkSession) { self => + sparkSession: TestHiveSparkSession, + conf: SQLConf, + experimentalMethods: ExperimentalMethods, + functionRegistry: org.apache.spark.sql.catalyst.analysis.FunctionRegistry, + catalog: HiveSessionCatalog, + sqlParser: ParserInterface, + metadataHive: HiveClient) + extends HiveSessionState( + sparkSession, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser, + metadataHive) { + + override def executePlan(plan: LogicalPlan): TestHiveQueryExecution = { + new TestHiveQueryExecution(sparkSession, plan) + } +} + +private[hive] object TestHiveSessionState { - override lazy val conf: SQLConf = { + def makeTestConf: SQLConf = { new SQLConf { clear() override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) @@ -505,9 +526,20 @@ private[hive] class TestHiveSessionState( } } - override def executePlan(plan: LogicalPlan): TestHiveQueryExecution = { - new TestHiveQueryExecution(sparkSession, plan) + def apply(sparkSession: TestHiveSparkSession): TestHiveSessionState = { + val sqlConf = makeTestConf + val copyHelper = HiveSessionState(sparkSession, Some(sqlConf)) + + new TestHiveSessionState( + sparkSession, + sqlConf, + copyHelper.experimentalMethods, + copyHelper.functionRegistry, + copyHelper.catalog, + copyHelper.sqlParser, + copyHelper.metadataHive) } + }